You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by sz...@apache.org on 2023/03/05 01:50:10 UTC
[nifi-minifi-cpp] 01/04: MINIFICPP-2054 Periodically run rocksdb compaction
This is an automated email from the ASF dual-hosted git repository.
szaszm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
commit 7425983a118ef9d81e87e17b764f4ab57b44b2b7
Author: Adam Debreceni <ad...@apache.org>
AuthorDate: Sat Mar 4 23:33:02 2023 +0100
MINIFICPP-2054 Periodically run rocksdb compaction
Some SSTs seem to get stuck in some use cases, so this change implements
a periodically triggered manual compaction.
Closes #1516
Signed-off-by: Marton Szasz <sz...@apache.org>
---
CONFIGURE.md | 9 ++++
conf/minifi.properties | 4 ++
.../rocksdb-repos/DatabaseContentRepository.cpp | 43 +++++++++++++++-
.../rocksdb-repos/DatabaseContentRepository.h | 16 +++++-
extensions/rocksdb-repos/FlowFileRepository.cpp | 35 +++++++++++++
extensions/rocksdb-repos/FlowFileRepository.h | 9 ++++
extensions/rocksdb-repos/database/OpenRocksDb.cpp | 20 ++++----
extensions/rocksdb-repos/database/OpenRocksDb.h | 14 ++---
libminifi/include/core/ContentRepository.h | 3 ++
libminifi/include/properties/Configuration.h | 5 ++
libminifi/include/utils/StoppableThread.h | 60 ++++++++++++++++++++++
libminifi/src/Configuration.cpp | 2 +
libminifi/src/FlowController.cpp | 2 +
libminifi/src/utils/StoppableThread.cpp | 39 ++++++++++++++
.../rocksdb-tests/DBContentRepositoryTests.cpp | 38 ++++++++------
15 files changed, 261 insertions(+), 38 deletions(-)
diff --git a/CONFIGURE.md b/CONFIGURE.md
index c9beff2c1..06e66962d 100644
--- a/CONFIGURE.md
+++ b/CONFIGURE.md
@@ -171,6 +171,15 @@ If content repository or flow file repository is set to use the rocksdb database
nifi.flowfile.repository.rocksdb.compression=zlib
nifi.content.repository.rocksdb.compression=auto
+
+### Configuring compaction for rocksdb database
+
+Rocksdb has an option to run compaction at specific intervals not just when needed.
+
+ in minifi.properties
+ nifi.flowfile.repository.rocksdb.compaction.period=2 min
+ nifi.database.content.repository.rocksdb.compaction.period=2 min
+
#### Shared database
It is also possible to use a single database to store multiple repositories with the `minifidb://` scheme.
diff --git a/conf/minifi.properties b/conf/minifi.properties
index 884a03f34..1a041f294 100644
--- a/conf/minifi.properties
+++ b/conf/minifi.properties
@@ -34,6 +34,10 @@ nifi.provenance.repository.class.name=NoOpRepository
nifi.content.repository.class.name=DatabaseContentRepository
# nifi.content.repository.rocksdb.compression=auto
+## Relates to the internal workings of the rocksdb backend
+# nifi.flowfile.repository.rocksdb.compaction.period=2 min
+# nifi.database.content.repository.rocksdb.compaction.period=2 min
+
#nifi.remote.input.secure=true
#nifi.security.need.ClientAuth=
#nifi.security.client.certificate=
diff --git a/extensions/rocksdb-repos/DatabaseContentRepository.cpp b/extensions/rocksdb-repos/DatabaseContentRepository.cpp
index 9beeb2498..020c60fca 100644
--- a/extensions/rocksdb-repos/DatabaseContentRepository.cpp
+++ b/extensions/rocksdb-repos/DatabaseContentRepository.cpp
@@ -21,6 +21,7 @@
#include <string>
#include <utility>
#include <vector>
+#include <cinttypes>
#include "encryption/RocksDbEncryptionProvider.h"
#include "RocksDbStream.h"
@@ -42,6 +43,8 @@ bool DatabaseContentRepository::initialize(const std::shared_ptr<minifi::Configu
const auto encrypted_env = createEncryptingEnv(utils::crypto::EncryptionManager{configuration->getHome()}, DbEncryptionOptions{directory_, ENCRYPTION_KEY_NAME});
logger_->log_info("Using %s DatabaseContentRepository", encrypted_env ? "encrypted" : "plaintext");
+ setCompactionPeriod(configuration);
+
auto set_db_opts = [encrypted_env] (minifi::internal::Writable<rocksdb::DBOptions>& db_opts) {
db_opts.set(&rocksdb::DBOptions::create_if_missing, true);
db_opts.set(&rocksdb::DBOptions::use_direct_io_for_flush_and_compaction, true);
@@ -72,14 +75,52 @@ bool DatabaseContentRepository::initialize(const std::shared_ptr<minifi::Configu
return is_valid_;
}
+void DatabaseContentRepository::setCompactionPeriod(const std::shared_ptr<minifi::Configure> &configuration) {
+ compaction_period_ = DEFAULT_COMPACTION_PERIOD;
+ if (auto compaction_period_str = configuration->get(Configure::nifi_dbcontent_repository_rocksdb_compaction_period)) {
+ if (auto compaction_period = TimePeriodValue::fromString(compaction_period_str.value())) {
+ compaction_period_ = compaction_period->getMilliseconds();
+ if (compaction_period_.count() == 0) {
+ logger_->log_warn("Setting '%s' to 0 disables forced compaction", Configure::nifi_dbcontent_repository_rocksdb_compaction_period);
+ }
+ } else {
+ logger_->log_error("Malformed property '%s', expected time period, using default", Configure::nifi_dbcontent_repository_rocksdb_compaction_period);
+ }
+ } else {
+ logger_->log_debug("Using default compaction period of %" PRId64 " ms", int64_t{compaction_period_.count()});
+ }
+}
+
+void DatabaseContentRepository::runCompaction() {
+ do {
+ if (auto opendb = db_->open()) {
+ auto status = opendb->RunCompaction();
+ logger_->log_trace("Compaction triggered: %s", status.ToString());
+ } else {
+ logger_->log_error("Failed to open database for compaction");
+ }
+ } while (!utils::StoppableThread::waitForStopRequest(compaction_period_));
+}
+
+void DatabaseContentRepository::start() {
+ if (!db_ || !is_valid_) {
+ return;
+ }
+ if (compaction_period_.count() != 0) {
+ compaction_thread_ = std::make_unique<utils::StoppableThread>([this] () {
+ runCompaction();
+ });
+ }
+}
+
void DatabaseContentRepository::stop() {
if (db_) {
auto opendb = db_->open();
if (opendb) {
opendb->FlushWAL(true);
}
+ compaction_thread_.reset();
}
- db_.reset();
}
DatabaseContentRepository::Session::Session(std::shared_ptr<ContentRepository> repository) : BufferedContentSession(std::move(repository)) {}
diff --git a/extensions/rocksdb-repos/DatabaseContentRepository.h b/extensions/rocksdb-repos/DatabaseContentRepository.h
index 98f3acb79..7e1bac61c 100644
--- a/extensions/rocksdb-repos/DatabaseContentRepository.h
+++ b/extensions/rocksdb-repos/DatabaseContentRepository.h
@@ -19,6 +19,7 @@
#include <memory>
#include <string>
#include <utility>
+#include <thread>
#include "core/ContentRepository.h"
#include "core/BufferedContentSession.h"
@@ -26,6 +27,7 @@
#include "core/Property.h"
#include "database/RocksDatabase.h"
#include "properties/Configure.h"
+#include "utils/StoppableThread.h"
namespace org::apache::nifi::minifi::core::repository {
@@ -37,6 +39,8 @@ class DatabaseContentRepository : public core::ContentRepository {
void commit() override;
};
+ static constexpr std::chrono::milliseconds DEFAULT_COMPACTION_PERIOD = std::chrono::minutes{2};
+
public:
static constexpr const char* ENCRYPTION_KEY_NAME = "nifi.database.content.repository.encryption.key";
@@ -56,7 +60,6 @@ class DatabaseContentRepository : public core::ContentRepository {
std::shared_ptr<ContentSession> createSession() override;
bool initialize(const std::shared_ptr<minifi::Configure> &configuration) override;
- void stop();
std::shared_ptr<io::BaseStream> write(const minifi::ResourceClaim &claim, bool append = false) override;
std::shared_ptr<io::BaseStream> read(const minifi::ResourceClaim &claim) override;
@@ -69,12 +72,21 @@ class DatabaseContentRepository : public core::ContentRepository {
void clearOrphans() override;
- private:
+ void start() override;
+ void stop() override;
+
+ protected:
std::shared_ptr<io::BaseStream> write(const minifi::ResourceClaim &claim, bool append, minifi::internal::WriteBatch* batch);
+ void runCompaction();
+ void setCompactionPeriod(const std::shared_ptr<minifi::Configure> &configuration);
+
bool is_valid_;
std::unique_ptr<minifi::internal::RocksDatabase> db_;
std::shared_ptr<logging::Logger> logger_;
+
+ std::chrono::milliseconds compaction_period_{DEFAULT_COMPACTION_PERIOD};
+ std::unique_ptr<utils::StoppableThread> compaction_thread_;
};
} // namespace org::apache::nifi::minifi::core::repository
diff --git a/extensions/rocksdb-repos/FlowFileRepository.cpp b/extensions/rocksdb-repos/FlowFileRepository.cpp
index 8637b4282..7568201e3 100644
--- a/extensions/rocksdb-repos/FlowFileRepository.cpp
+++ b/extensions/rocksdb-repos/FlowFileRepository.cpp
@@ -200,6 +200,8 @@ bool FlowFileRepository::initialize(const std::shared_ptr<Configure> &configure)
}
logger_->log_debug("NiFi FlowFile Repository Directory %s", directory_);
+ setCompactionPeriod(configure);
+
const auto encrypted_env = createEncryptingEnv(utils::crypto::EncryptionManager{configure->getHome()}, DbEncryptionOptions{directory_, ENCRYPTION_KEY_NAME});
logger_->log_info("Using %s FlowFileRepository", encrypted_env ? "encrypted" : "plaintext");
@@ -239,6 +241,22 @@ bool FlowFileRepository::initialize(const std::shared_ptr<Configure> &configure)
}
}
+void FlowFileRepository::setCompactionPeriod(const std::shared_ptr<Configure> &configure) {
+ compaction_period_ = DEFAULT_COMPACTION_PERIOD;
+ if (auto compaction_period_str = configure->get(Configure::nifi_flowfile_repository_rocksdb_compaction_period)) {
+ if (auto compaction_period = TimePeriodValue::fromString(compaction_period_str.value())) {
+ compaction_period_ = compaction_period->getMilliseconds();
+ if (compaction_period_.count() == 0) {
+ logger_->log_warn("Setting '%s' to 0 disables forced compaction", Configure::nifi_flowfile_repository_rocksdb_compaction_period);
+ }
+ } else {
+ logger_->log_error("Malformed property '%s', expected time period, using default", Configure::nifi_flowfile_repository_rocksdb_compaction_period);
+ }
+ } else {
+ logger_->log_debug("Using default compaction period of %" PRId64 " ms", int64_t{compaction_period_.count()});
+ }
+}
+
bool FlowFileRepository::Put(const std::string& key, const uint8_t *buf, size_t bufLen) {
// persistent to the DB
auto opendb = db_->open();
@@ -281,15 +299,32 @@ bool FlowFileRepository::Get(const std::string &key, std::string &value) {
return opendb->Get(rocksdb::ReadOptions(), key, &value).ok();
}
+void FlowFileRepository::runCompaction() {
+ do {
+ if (auto opendb = db_->open()) {
+ auto status = opendb->RunCompaction();
+ logger_->log_trace("Compaction triggered: %s", status.ToString());
+ } else {
+ logger_->log_error("Failed to open database for compaction");
+ }
+ } while (!utils::StoppableThread::waitForStopRequest(compaction_period_));
+}
+
bool FlowFileRepository::start() {
const bool ret = ThreadedRepository::start();
if (swap_loader_) {
swap_loader_->start();
}
+ if (compaction_period_.count() != 0) {
+ compaction_thread_ = std::make_unique<utils::StoppableThread>([this] () {
+ runCompaction();
+ });
+ }
return ret;
}
bool FlowFileRepository::stop() {
+ compaction_thread_.reset();
if (swap_loader_) {
swap_loader_->stop();
}
diff --git a/extensions/rocksdb-repos/FlowFileRepository.h b/extensions/rocksdb-repos/FlowFileRepository.h
index 832fde642..e90081221 100644
--- a/extensions/rocksdb-repos/FlowFileRepository.h
+++ b/extensions/rocksdb-repos/FlowFileRepository.h
@@ -38,6 +38,7 @@
#include "FlowFileLoader.h"
#include "range/v3/algorithm/all_of.hpp"
#include "utils/Literals.h"
+#include "utils/StoppableThread.h"
namespace org::apache::nifi::minifi::core::repository {
@@ -58,6 +59,8 @@ constexpr auto FLOWFILE_REPOSITORY_RETRY_INTERVAL_INCREMENTS = std::chrono::mill
* Design: Extends Repository and implements the run function, using rocksdb as the primary substrate.
*/
class FlowFileRepository : public ThreadedRepository, public SwapManager {
+ static constexpr std::chrono::milliseconds DEFAULT_COMPACTION_PERIOD = std::chrono::minutes{2};
+
public:
static constexpr const char* ENCRYPTION_KEY_NAME = "nifi.flowfile.repository.encryption.key";
@@ -106,6 +109,9 @@ class FlowFileRepository : public ThreadedRepository, public SwapManager {
private:
void run() override;
+ void runCompaction();
+ void setCompactionPeriod(const std::shared_ptr<Configure> &configure);
+
bool ExecuteWithRetry(const std::function<rocksdb::Status()>& operation);
void initialize_repository();
@@ -121,6 +127,9 @@ class FlowFileRepository : public ThreadedRepository, public SwapManager {
std::shared_ptr<logging::Logger> logger_;
std::shared_ptr<minifi::Configure> config_;
std::thread thread_;
+
+ std::chrono::milliseconds compaction_period_;
+ std::unique_ptr<utils::StoppableThread> compaction_thread_;
};
} // namespace org::apache::nifi::minifi::core::repository
diff --git a/extensions/rocksdb-repos/database/OpenRocksDb.cpp b/extensions/rocksdb-repos/database/OpenRocksDb.cpp
index 9bf2746f7..be5f1960c 100644
--- a/extensions/rocksdb-repos/database/OpenRocksDb.cpp
+++ b/extensions/rocksdb-repos/database/OpenRocksDb.cpp
@@ -22,11 +22,7 @@
#include "ColumnHandle.h"
#include "RocksDbInstance.h"
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace internal {
+namespace org::apache::nifi::minifi::internal {
OpenRocksDb::OpenRocksDb(RocksDbInstance& db, gsl::not_null<std::shared_ptr<rocksdb::DB>> impl, gsl::not_null<std::shared_ptr<ColumnHandle>> column)
: db_(&db), impl_(std::move(impl)), column_(std::move(column)) {}
@@ -95,6 +91,14 @@ rocksdb::Status OpenRocksDb::FlushWAL(bool sync) {
return result;
}
+rocksdb::Status OpenRocksDb::RunCompaction() {
+ rocksdb::Status result = impl_->CompactRange(rocksdb::CompactRangeOptions{
+ .bottommost_level_compaction = rocksdb::BottommostLevelCompaction::kForce
+ }, nullptr, nullptr);
+ handleResult(result);
+ return result;
+}
+
void OpenRocksDb::handleResult(const rocksdb::Status& result) {
if (result == rocksdb::Status::NoSpace()) {
db_->invalidate();
@@ -118,8 +122,4 @@ rocksdb::DB* OpenRocksDb::get() {
return impl_.get();
}
-} // namespace internal
-} // namespace minifi
-} // namespace nifi
-} // namespace apache
-} // namespace org
+} // namespace org::apache::nifi::minifi::internal
diff --git a/extensions/rocksdb-repos/database/OpenRocksDb.h b/extensions/rocksdb-repos/database/OpenRocksDb.h
index 33d26d50a..ebe25aacc 100644
--- a/extensions/rocksdb-repos/database/OpenRocksDb.h
+++ b/extensions/rocksdb-repos/database/OpenRocksDb.h
@@ -27,11 +27,7 @@
#include "rocksdb/utilities/checkpoint.h"
#include "WriteBatch.h"
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace internal {
+namespace org::apache::nifi::minifi::internal {
class RocksDbInstance;
struct ColumnHandle;
@@ -71,6 +67,8 @@ class OpenRocksDb {
rocksdb::Status FlushWAL(bool sync);
+ rocksdb::Status RunCompaction();
+
rocksdb::DB* get();
private:
@@ -82,8 +80,4 @@ class OpenRocksDb {
gsl::not_null<std::shared_ptr<ColumnHandle>> column_;
};
-} // namespace internal
-} // namespace minifi
-} // namespace nifi
-} // namespace apache
-} // namespace org
+} // namespace org::apache::nifi::minifi::internal
diff --git a/libminifi/include/core/ContentRepository.h b/libminifi/include/core/ContentRepository.h
index 7d92634fb..ce5a0756c 100644
--- a/libminifi/include/core/ContentRepository.h
+++ b/libminifi/include/core/ContentRepository.h
@@ -53,6 +53,9 @@ class ContentRepository : public StreamManager<minifi::ResourceClaim>, public ut
virtual void clearOrphans() = 0;
+ virtual void start() {}
+ virtual void stop() {}
+
protected:
std::string directory_;
std::mutex count_map_mutex_;
diff --git a/libminifi/include/properties/Configuration.h b/libminifi/include/properties/Configuration.h
index 978600bb3..c3b450325 100644
--- a/libminifi/include/properties/Configuration.h
+++ b/libminifi/include/properties/Configuration.h
@@ -69,6 +69,11 @@ class Configuration : public Properties {
static constexpr const char *nifi_provenance_repository_directory_default = "nifi.provenance.repository.directory.default";
static constexpr const char *nifi_flowfile_repository_directory_default = "nifi.flowfile.repository.directory.default";
static constexpr const char *nifi_dbcontent_repository_directory_default = "nifi.database.content.repository.directory.default";
+
+ // these are internal properties related to the rocksdb backend
+ static constexpr const char *nifi_flowfile_repository_rocksdb_compaction_period = "nifi.flowfile.repository.rocksdb.compaction.period";
+ static constexpr const char *nifi_dbcontent_repository_rocksdb_compaction_period = "nifi.database.content.repository.rocksdb.compaction.period";
+
static constexpr const char *nifi_remote_input_secure = "nifi.remote.input.secure";
static constexpr const char *nifi_security_need_ClientAuth = "nifi.security.need.ClientAuth";
static constexpr const char *nifi_sensitive_props_additional_keys = "nifi.sensitive.props.additional.keys";
diff --git a/libminifi/include/utils/StoppableThread.h b/libminifi/include/utils/StoppableThread.h
new file mode 100644
index 000000000..c21ca2633
--- /dev/null
+++ b/libminifi/include/utils/StoppableThread.h
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <mutex>
+#include <thread>
+#include <condition_variable>
+#include <atomic>
+#include <functional>
+
+namespace org::apache::nifi::minifi::utils {
+
+// mimics some aspects of std::jthread
+// unfortunately clang's jthread support is lacking
+// TODO(adebreceni): replace this with std::jthread
+class StoppableThread {
+ public:
+ explicit StoppableThread(std::function<void()> fn);
+
+ void stopAndJoin() {
+ running_ = false;
+ {
+ std::unique_lock lock(mtx_);
+ cv_.notify_all();
+ }
+ if (thread_.joinable()) {
+ thread_.join();
+ }
+ }
+
+ ~StoppableThread() {
+ stopAndJoin();
+ }
+
+ // return true if stop was requested
+ static bool waitForStopRequest(std::chrono::milliseconds time);
+
+ private:
+ std::atomic_bool running_{true};
+ std::mutex mtx_;
+ std::condition_variable cv_;
+ std::thread thread_;
+};
+
+} // namespace org::apache::nifi::minifi::utils
diff --git a/libminifi/src/Configuration.cpp b/libminifi/src/Configuration.cpp
index c584cb574..ab4bc2670 100644
--- a/libminifi/src/Configuration.cpp
+++ b/libminifi/src/Configuration.cpp
@@ -54,6 +54,8 @@ const std::vector<core::ConfigurationProperty> Configuration::CONFIGURATION_PROP
core::ConfigurationProperty{Configuration::nifi_provenance_repository_directory_default},
core::ConfigurationProperty{Configuration::nifi_flowfile_repository_directory_default},
core::ConfigurationProperty{Configuration::nifi_dbcontent_repository_directory_default},
+ core::ConfigurationProperty{Configuration::nifi_flowfile_repository_rocksdb_compaction_period, gsl::make_not_null(core::StandardValidators::get().TIME_PERIOD_VALIDATOR.get())},
+ core::ConfigurationProperty{Configuration::nifi_dbcontent_repository_rocksdb_compaction_period, gsl::make_not_null(core::StandardValidators::get().TIME_PERIOD_VALIDATOR.get())},
core::ConfigurationProperty{Configuration::nifi_remote_input_secure, gsl::make_not_null(core::StandardValidators::get().BOOLEAN_VALIDATOR.get())},
core::ConfigurationProperty{Configuration::nifi_security_need_ClientAuth, gsl::make_not_null(core::StandardValidators::get().BOOLEAN_VALIDATOR.get())},
core::ConfigurationProperty{Configuration::nifi_sensitive_props_additional_keys},
diff --git a/libminifi/src/FlowController.cpp b/libminifi/src/FlowController.cpp
index 9d0d62e15..a31e6536b 100644
--- a/libminifi/src/FlowController.cpp
+++ b/libminifi/src/FlowController.cpp
@@ -210,6 +210,7 @@ int16_t FlowController::stop() {
}
this->flow_file_repo_->stop();
this->provenance_repo_->stop();
+ this->content_repo_->stop();
// stop the ControllerServices
disableAllControllerServices();
running_ = false;
@@ -385,6 +386,7 @@ int16_t FlowController::start() {
core::logging::LoggerConfiguration::getConfiguration().initializeAlertSinks(this, configuration_);
running_ = true;
this->protocol_->start();
+ this->content_repo_->start();
this->provenance_repo_->start();
this->flow_file_repo_->start();
logger_->log_info("Started Flow Controller");
diff --git a/libminifi/src/utils/StoppableThread.cpp b/libminifi/src/utils/StoppableThread.cpp
new file mode 100644
index 000000000..8fea65a6d
--- /dev/null
+++ b/libminifi/src/utils/StoppableThread.cpp
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "utils/StoppableThread.h"
+#include "utils/gsl.h"
+
+namespace org::apache::nifi::minifi::utils {
+
+static thread_local StoppableThread* current_thread;
+
+StoppableThread::StoppableThread(std::function<void()> fn) {
+ thread_ = std::thread{[fn = std::move(fn), this] {
+ current_thread = this;
+ fn();
+ }};
+}
+
+bool StoppableThread::waitForStopRequest(std::chrono::milliseconds time) {
+ gsl_Expects(current_thread);
+ std::unique_lock lock(current_thread->mtx_);
+ // wait_for returns false if the predicate is still false, i.e. the thread is running
+ return current_thread->cv_.wait_for(lock, time, [&] {return !current_thread->running_;});
+}
+
+} // namespace org::apache::nifi::minifi::utils
diff --git a/libminifi/test/rocksdb-tests/DBContentRepositoryTests.cpp b/libminifi/test/rocksdb-tests/DBContentRepositoryTests.cpp
index 942f1544e..ac84cd4b0 100644
--- a/libminifi/test/rocksdb-tests/DBContentRepositoryTests.cpp
+++ b/libminifi/test/rocksdb-tests/DBContentRepositoryTests.cpp
@@ -29,10 +29,18 @@
#include "../unit/ProvenanceTestHelper.h"
#include "../unit/ContentRepositoryDependentTests.h"
+class TestDatabaseContentRepository : public core::repository::DatabaseContentRepository {
+ public:
+ void invalidate() {
+ stop();
+ db_.reset();
+ }
+};
+
TEST_CASE("Write Claim", "[TestDBCR1]") {
TestController testController;
auto dir = testController.createTempDirectory();
- auto content_repo = std::make_shared<core::repository::DatabaseContentRepository>();
+ auto content_repo = std::make_shared<TestDatabaseContentRepository>();
auto configuration = std::make_shared<org::apache::nifi::minifi::Configure>();
configuration->set(minifi::Configure::nifi_dbcontent_repository_directory_default, dir.string());
@@ -46,11 +54,11 @@ TEST_CASE("Write Claim", "[TestDBCR1]") {
stream->close();
- content_repo->stop();
+ content_repo->invalidate();
// reclaim the memory
content_repo = nullptr;
- content_repo = std::make_shared<core::repository::DatabaseContentRepository>();
+ content_repo = std::make_shared<TestDatabaseContentRepository>();
configuration = std::make_shared<org::apache::nifi::minifi::Configure>();
configuration->set(minifi::Configure::nifi_dbcontent_repository_directory_default, dir.string());
@@ -72,7 +80,7 @@ TEST_CASE("Write Claim", "[TestDBCR1]") {
TEST_CASE("Delete Claim", "[TestDBCR2]") {
TestController testController;
auto dir = testController.createTempDirectory();
- auto content_repo = std::make_shared<core::repository::DatabaseContentRepository>();
+ auto content_repo = std::make_shared<TestDatabaseContentRepository>();
auto configuration = std::make_shared<org::apache::nifi::minifi::Configure>();
configuration->set(minifi::Configure::nifi_dbcontent_repository_directory_default, dir.string());
@@ -86,12 +94,12 @@ TEST_CASE("Delete Claim", "[TestDBCR2]") {
stream->close();
- content_repo->stop();
+ content_repo->invalidate();
// reclaim the memory
content_repo = nullptr;
- content_repo = std::make_shared<core::repository::DatabaseContentRepository>();
+ content_repo = std::make_shared<TestDatabaseContentRepository>();
configuration = std::make_shared<org::apache::nifi::minifi::Configure>();
configuration->set(minifi::Configure::nifi_dbcontent_repository_directory_default, dir.string());
@@ -110,7 +118,7 @@ TEST_CASE("Delete Claim", "[TestDBCR2]") {
TEST_CASE("Test Empty Claim", "[TestDBCR3]") {
TestController testController;
auto dir = testController.createTempDirectory();
- auto content_repo = std::make_shared<core::repository::DatabaseContentRepository>();
+ auto content_repo = std::make_shared<TestDatabaseContentRepository>();
auto configuration = std::make_shared<org::apache::nifi::minifi::Configure>();
configuration->set(minifi::Configure::nifi_dbcontent_repository_directory_default, dir.string());
@@ -123,12 +131,12 @@ TEST_CASE("Test Empty Claim", "[TestDBCR3]") {
stream->close();
- content_repo->stop();
+ content_repo->invalidate();
// reclaim the memory
content_repo = nullptr;
- content_repo = std::make_shared<core::repository::DatabaseContentRepository>();
+ content_repo = std::make_shared<TestDatabaseContentRepository>();
configuration = std::make_shared<org::apache::nifi::minifi::Configure>();
configuration->set(minifi::Configure::nifi_dbcontent_repository_directory_default, dir.string());
@@ -145,7 +153,7 @@ TEST_CASE("Test Empty Claim", "[TestDBCR3]") {
TEST_CASE("Delete NonExistent Claim", "[TestDBCR4]") {
TestController testController;
auto dir = testController.createTempDirectory();
- auto content_repo = std::make_shared<core::repository::DatabaseContentRepository>();
+ auto content_repo = std::make_shared<TestDatabaseContentRepository>();
auto configuration = std::make_shared<org::apache::nifi::minifi::Configure>();
configuration->set(minifi::Configure::nifi_dbcontent_repository_directory_default, dir.string());
@@ -159,12 +167,12 @@ TEST_CASE("Delete NonExistent Claim", "[TestDBCR4]") {
stream->close();
- content_repo->stop();
+ content_repo->invalidate();
// reclaim the memory
content_repo = nullptr;
- content_repo = std::make_shared<core::repository::DatabaseContentRepository>();
+ content_repo = std::make_shared<TestDatabaseContentRepository>();
configuration = std::make_shared<org::apache::nifi::minifi::Configure>();
configuration->set(minifi::Configure::nifi_dbcontent_repository_directory_default, dir.string());
@@ -185,7 +193,7 @@ TEST_CASE("Delete NonExistent Claim", "[TestDBCR4]") {
TEST_CASE("Delete Remove Count Claim", "[TestDBCR5]") {
TestController testController;
auto dir = testController.createTempDirectory();
- auto content_repo = std::make_shared<core::repository::DatabaseContentRepository>();
+ auto content_repo = std::make_shared<TestDatabaseContentRepository>();
auto configuration = std::make_shared<org::apache::nifi::minifi::Configure>();
configuration->set(minifi::Configure::nifi_dbcontent_repository_directory_default, dir.string());
@@ -199,12 +207,12 @@ TEST_CASE("Delete Remove Count Claim", "[TestDBCR5]") {
stream->close();
- content_repo->stop();
+ content_repo->invalidate();
// reclaim the memory
content_repo = nullptr;
- content_repo = std::make_shared<core::repository::DatabaseContentRepository>();
+ content_repo = std::make_shared<TestDatabaseContentRepository>();
configuration = std::make_shared<org::apache::nifi::minifi::Configure>();
configuration->set(minifi::Configure::nifi_dbcontent_repository_directory_default, dir.string());