You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by sz...@apache.org on 2023/03/05 01:50:10 UTC

[nifi-minifi-cpp] 01/04: MINIFICPP-2054 Periodically run rocksdb compaction

This is an automated email from the ASF dual-hosted git repository.

szaszm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit 7425983a118ef9d81e87e17b764f4ab57b44b2b7
Author: Adam Debreceni <ad...@apache.org>
AuthorDate: Sat Mar 4 23:33:02 2023 +0100

    MINIFICPP-2054 Periodically run rocksdb compaction
    
    Some SSTs seem to get stuck in some use cases, so this change implements
    a periodically triggered manual compaction.
    
    Closes #1516
    Signed-off-by: Marton Szasz <sz...@apache.org>
---
 CONFIGURE.md                                       |  9 ++++
 conf/minifi.properties                             |  4 ++
 .../rocksdb-repos/DatabaseContentRepository.cpp    | 43 +++++++++++++++-
 .../rocksdb-repos/DatabaseContentRepository.h      | 16 +++++-
 extensions/rocksdb-repos/FlowFileRepository.cpp    | 35 +++++++++++++
 extensions/rocksdb-repos/FlowFileRepository.h      |  9 ++++
 extensions/rocksdb-repos/database/OpenRocksDb.cpp  | 20 ++++----
 extensions/rocksdb-repos/database/OpenRocksDb.h    | 14 ++---
 libminifi/include/core/ContentRepository.h         |  3 ++
 libminifi/include/properties/Configuration.h       |  5 ++
 libminifi/include/utils/StoppableThread.h          | 60 ++++++++++++++++++++++
 libminifi/src/Configuration.cpp                    |  2 +
 libminifi/src/FlowController.cpp                   |  2 +
 libminifi/src/utils/StoppableThread.cpp            | 39 ++++++++++++++
 .../rocksdb-tests/DBContentRepositoryTests.cpp     | 38 ++++++++------
 15 files changed, 261 insertions(+), 38 deletions(-)

diff --git a/CONFIGURE.md b/CONFIGURE.md
index c9beff2c1..06e66962d 100644
--- a/CONFIGURE.md
+++ b/CONFIGURE.md
@@ -171,6 +171,15 @@ If content repository or flow file repository is set to use the rocksdb database
      nifi.flowfile.repository.rocksdb.compression=zlib
      nifi.content.repository.rocksdb.compression=auto
 
+
+### Configuring compaction for rocksdb database
+
+Rocksdb has an option to run compaction at specific intervals not just when needed.
+
+     in minifi.properties
+     nifi.flowfile.repository.rocksdb.compaction.period=2 min
+     nifi.database.content.repository.rocksdb.compaction.period=2 min
+
 #### Shared database
 
 It is also possible to use a single database to store multiple repositories with the `minifidb://` scheme.
diff --git a/conf/minifi.properties b/conf/minifi.properties
index 884a03f34..1a041f294 100644
--- a/conf/minifi.properties
+++ b/conf/minifi.properties
@@ -34,6 +34,10 @@ nifi.provenance.repository.class.name=NoOpRepository
 nifi.content.repository.class.name=DatabaseContentRepository
 # nifi.content.repository.rocksdb.compression=auto
 
+## Relates to the internal workings of the rocksdb backend
+# nifi.flowfile.repository.rocksdb.compaction.period=2 min
+# nifi.database.content.repository.rocksdb.compaction.period=2 min
+
 #nifi.remote.input.secure=true
 #nifi.security.need.ClientAuth=
 #nifi.security.client.certificate=
diff --git a/extensions/rocksdb-repos/DatabaseContentRepository.cpp b/extensions/rocksdb-repos/DatabaseContentRepository.cpp
index 9beeb2498..020c60fca 100644
--- a/extensions/rocksdb-repos/DatabaseContentRepository.cpp
+++ b/extensions/rocksdb-repos/DatabaseContentRepository.cpp
@@ -21,6 +21,7 @@
 #include <string>
 #include <utility>
 #include <vector>
+#include <cinttypes>
 
 #include "encryption/RocksDbEncryptionProvider.h"
 #include "RocksDbStream.h"
@@ -42,6 +43,8 @@ bool DatabaseContentRepository::initialize(const std::shared_ptr<minifi::Configu
   const auto encrypted_env = createEncryptingEnv(utils::crypto::EncryptionManager{configuration->getHome()}, DbEncryptionOptions{directory_, ENCRYPTION_KEY_NAME});
   logger_->log_info("Using %s DatabaseContentRepository", encrypted_env ? "encrypted" : "plaintext");
 
+  setCompactionPeriod(configuration);
+
   auto set_db_opts = [encrypted_env] (minifi::internal::Writable<rocksdb::DBOptions>& db_opts) {
     db_opts.set(&rocksdb::DBOptions::create_if_missing, true);
     db_opts.set(&rocksdb::DBOptions::use_direct_io_for_flush_and_compaction, true);
@@ -72,14 +75,52 @@ bool DatabaseContentRepository::initialize(const std::shared_ptr<minifi::Configu
   return is_valid_;
 }
 
+void DatabaseContentRepository::setCompactionPeriod(const std::shared_ptr<minifi::Configure> &configuration) {
+  compaction_period_ = DEFAULT_COMPACTION_PERIOD;
+  if (auto compaction_period_str = configuration->get(Configure::nifi_dbcontent_repository_rocksdb_compaction_period)) {
+    if (auto compaction_period = TimePeriodValue::fromString(compaction_period_str.value())) {
+      compaction_period_ = compaction_period->getMilliseconds();
+      if (compaction_period_.count() == 0) {
+        logger_->log_warn("Setting '%s' to 0 disables forced compaction", Configure::nifi_dbcontent_repository_rocksdb_compaction_period);
+      }
+    } else {
+      logger_->log_error("Malformed property '%s', expected time period, using default", Configure::nifi_dbcontent_repository_rocksdb_compaction_period);
+    }
+  } else {
+    logger_->log_debug("Using default compaction period of %" PRId64 " ms", int64_t{compaction_period_.count()});
+  }
+}
+
+void DatabaseContentRepository::runCompaction() {
+  do {
+    if (auto opendb = db_->open()) {
+      auto status = opendb->RunCompaction();
+      logger_->log_trace("Compaction triggered: %s", status.ToString());
+    } else {
+      logger_->log_error("Failed to open database for compaction");
+    }
+  } while (!utils::StoppableThread::waitForStopRequest(compaction_period_));
+}
+
+void DatabaseContentRepository::start() {
+  if (!db_ || !is_valid_) {
+    return;
+  }
+  if (compaction_period_.count() != 0) {
+    compaction_thread_ = std::make_unique<utils::StoppableThread>([this] () {
+      runCompaction();
+    });
+  }
+}
+
 void DatabaseContentRepository::stop() {
   if (db_) {
     auto opendb = db_->open();
     if (opendb) {
       opendb->FlushWAL(true);
     }
+    compaction_thread_.reset();
   }
-  db_.reset();
 }
 
 DatabaseContentRepository::Session::Session(std::shared_ptr<ContentRepository> repository) : BufferedContentSession(std::move(repository)) {}
diff --git a/extensions/rocksdb-repos/DatabaseContentRepository.h b/extensions/rocksdb-repos/DatabaseContentRepository.h
index 98f3acb79..7e1bac61c 100644
--- a/extensions/rocksdb-repos/DatabaseContentRepository.h
+++ b/extensions/rocksdb-repos/DatabaseContentRepository.h
@@ -19,6 +19,7 @@
 #include <memory>
 #include <string>
 #include <utility>
+#include <thread>
 
 #include "core/ContentRepository.h"
 #include "core/BufferedContentSession.h"
@@ -26,6 +27,7 @@
 #include "core/Property.h"
 #include "database/RocksDatabase.h"
 #include "properties/Configure.h"
+#include "utils/StoppableThread.h"
 
 namespace org::apache::nifi::minifi::core::repository {
 
@@ -37,6 +39,8 @@ class DatabaseContentRepository : public core::ContentRepository {
     void commit() override;
   };
 
+  static constexpr std::chrono::milliseconds DEFAULT_COMPACTION_PERIOD = std::chrono::minutes{2};
+
  public:
   static constexpr const char* ENCRYPTION_KEY_NAME = "nifi.database.content.repository.encryption.key";
 
@@ -56,7 +60,6 @@ class DatabaseContentRepository : public core::ContentRepository {
 
   std::shared_ptr<ContentSession> createSession() override;
   bool initialize(const std::shared_ptr<minifi::Configure> &configuration) override;
-  void stop();
   std::shared_ptr<io::BaseStream> write(const minifi::ResourceClaim &claim, bool append = false) override;
   std::shared_ptr<io::BaseStream> read(const minifi::ResourceClaim &claim) override;
 
@@ -69,12 +72,21 @@ class DatabaseContentRepository : public core::ContentRepository {
 
   void clearOrphans() override;
 
- private:
+  void start() override;
+  void stop() override;
+
+ protected:
   std::shared_ptr<io::BaseStream> write(const minifi::ResourceClaim &claim, bool append, minifi::internal::WriteBatch* batch);
 
+  void runCompaction();
+  void setCompactionPeriod(const std::shared_ptr<minifi::Configure> &configuration);
+
   bool is_valid_;
   std::unique_ptr<minifi::internal::RocksDatabase> db_;
   std::shared_ptr<logging::Logger> logger_;
+
+  std::chrono::milliseconds compaction_period_{DEFAULT_COMPACTION_PERIOD};
+  std::unique_ptr<utils::StoppableThread> compaction_thread_;
 };
 
 }  // namespace org::apache::nifi::minifi::core::repository
diff --git a/extensions/rocksdb-repos/FlowFileRepository.cpp b/extensions/rocksdb-repos/FlowFileRepository.cpp
index 8637b4282..7568201e3 100644
--- a/extensions/rocksdb-repos/FlowFileRepository.cpp
+++ b/extensions/rocksdb-repos/FlowFileRepository.cpp
@@ -200,6 +200,8 @@ bool FlowFileRepository::initialize(const std::shared_ptr<Configure> &configure)
   }
   logger_->log_debug("NiFi FlowFile Repository Directory %s", directory_);
 
+  setCompactionPeriod(configure);
+
   const auto encrypted_env = createEncryptingEnv(utils::crypto::EncryptionManager{configure->getHome()}, DbEncryptionOptions{directory_, ENCRYPTION_KEY_NAME});
   logger_->log_info("Using %s FlowFileRepository", encrypted_env ? "encrypted" : "plaintext");
 
@@ -239,6 +241,22 @@ bool FlowFileRepository::initialize(const std::shared_ptr<Configure> &configure)
   }
 }
 
+void FlowFileRepository::setCompactionPeriod(const std::shared_ptr<Configure> &configure) {
+  compaction_period_ = DEFAULT_COMPACTION_PERIOD;
+  if (auto compaction_period_str = configure->get(Configure::nifi_flowfile_repository_rocksdb_compaction_period)) {
+    if (auto compaction_period = TimePeriodValue::fromString(compaction_period_str.value())) {
+      compaction_period_ = compaction_period->getMilliseconds();
+      if (compaction_period_.count() == 0) {
+        logger_->log_warn("Setting '%s' to 0 disables forced compaction", Configure::nifi_flowfile_repository_rocksdb_compaction_period);
+      }
+    } else {
+      logger_->log_error("Malformed property '%s', expected time period, using default", Configure::nifi_flowfile_repository_rocksdb_compaction_period);
+    }
+  } else {
+    logger_->log_debug("Using default compaction period of %" PRId64 " ms", int64_t{compaction_period_.count()});
+  }
+}
+
 bool FlowFileRepository::Put(const std::string& key, const uint8_t *buf, size_t bufLen) {
   // persistent to the DB
   auto opendb = db_->open();
@@ -281,15 +299,32 @@ bool FlowFileRepository::Get(const std::string &key, std::string &value) {
   return opendb->Get(rocksdb::ReadOptions(), key, &value).ok();
 }
 
+void FlowFileRepository::runCompaction() {
+  do {
+    if (auto opendb = db_->open()) {
+      auto status = opendb->RunCompaction();
+      logger_->log_trace("Compaction triggered: %s", status.ToString());
+    } else {
+      logger_->log_error("Failed to open database for compaction");
+    }
+  } while (!utils::StoppableThread::waitForStopRequest(compaction_period_));
+}
+
 bool FlowFileRepository::start() {
   const bool ret = ThreadedRepository::start();
   if (swap_loader_) {
     swap_loader_->start();
   }
+  if (compaction_period_.count() != 0) {
+    compaction_thread_ = std::make_unique<utils::StoppableThread>([this] () {
+      runCompaction();
+    });
+  }
   return ret;
 }
 
 bool FlowFileRepository::stop() {
+  compaction_thread_.reset();
   if (swap_loader_) {
     swap_loader_->stop();
   }
diff --git a/extensions/rocksdb-repos/FlowFileRepository.h b/extensions/rocksdb-repos/FlowFileRepository.h
index 832fde642..e90081221 100644
--- a/extensions/rocksdb-repos/FlowFileRepository.h
+++ b/extensions/rocksdb-repos/FlowFileRepository.h
@@ -38,6 +38,7 @@
 #include "FlowFileLoader.h"
 #include "range/v3/algorithm/all_of.hpp"
 #include "utils/Literals.h"
+#include "utils/StoppableThread.h"
 
 namespace org::apache::nifi::minifi::core::repository {
 
@@ -58,6 +59,8 @@ constexpr auto FLOWFILE_REPOSITORY_RETRY_INTERVAL_INCREMENTS = std::chrono::mill
  * Design: Extends Repository and implements the run function, using rocksdb as the primary substrate.
  */
 class FlowFileRepository : public ThreadedRepository, public SwapManager {
+  static constexpr std::chrono::milliseconds DEFAULT_COMPACTION_PERIOD = std::chrono::minutes{2};
+
  public:
   static constexpr const char* ENCRYPTION_KEY_NAME = "nifi.flowfile.repository.encryption.key";
 
@@ -106,6 +109,9 @@ class FlowFileRepository : public ThreadedRepository, public SwapManager {
  private:
   void run() override;
 
+  void runCompaction();
+  void setCompactionPeriod(const std::shared_ptr<Configure> &configure);
+
   bool ExecuteWithRetry(const std::function<rocksdb::Status()>& operation);
 
   void initialize_repository();
@@ -121,6 +127,9 @@ class FlowFileRepository : public ThreadedRepository, public SwapManager {
   std::shared_ptr<logging::Logger> logger_;
   std::shared_ptr<minifi::Configure> config_;
   std::thread thread_;
+
+  std::chrono::milliseconds compaction_period_;
+  std::unique_ptr<utils::StoppableThread> compaction_thread_;
 };
 
 }  // namespace org::apache::nifi::minifi::core::repository
diff --git a/extensions/rocksdb-repos/database/OpenRocksDb.cpp b/extensions/rocksdb-repos/database/OpenRocksDb.cpp
index 9bf2746f7..be5f1960c 100644
--- a/extensions/rocksdb-repos/database/OpenRocksDb.cpp
+++ b/extensions/rocksdb-repos/database/OpenRocksDb.cpp
@@ -22,11 +22,7 @@
 #include "ColumnHandle.h"
 #include "RocksDbInstance.h"
 
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace internal {
+namespace org::apache::nifi::minifi::internal {
 
 OpenRocksDb::OpenRocksDb(RocksDbInstance& db, gsl::not_null<std::shared_ptr<rocksdb::DB>> impl, gsl::not_null<std::shared_ptr<ColumnHandle>> column)
     : db_(&db), impl_(std::move(impl)), column_(std::move(column)) {}
@@ -95,6 +91,14 @@ rocksdb::Status OpenRocksDb::FlushWAL(bool sync) {
   return result;
 }
 
+rocksdb::Status OpenRocksDb::RunCompaction() {
+  rocksdb::Status result = impl_->CompactRange(rocksdb::CompactRangeOptions{
+    .bottommost_level_compaction = rocksdb::BottommostLevelCompaction::kForce
+  }, nullptr, nullptr);
+  handleResult(result);
+  return result;
+}
+
 void OpenRocksDb::handleResult(const rocksdb::Status& result) {
   if (result == rocksdb::Status::NoSpace()) {
     db_->invalidate();
@@ -118,8 +122,4 @@ rocksdb::DB* OpenRocksDb::get() {
   return impl_.get();
 }
 
-}  // namespace internal
-}  // namespace minifi
-}  // namespace nifi
-}  // namespace apache
-}  // namespace org
+}  // namespace org::apache::nifi::minifi::internal
diff --git a/extensions/rocksdb-repos/database/OpenRocksDb.h b/extensions/rocksdb-repos/database/OpenRocksDb.h
index 33d26d50a..ebe25aacc 100644
--- a/extensions/rocksdb-repos/database/OpenRocksDb.h
+++ b/extensions/rocksdb-repos/database/OpenRocksDb.h
@@ -27,11 +27,7 @@
 #include "rocksdb/utilities/checkpoint.h"
 #include "WriteBatch.h"
 
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace internal {
+namespace org::apache::nifi::minifi::internal {
 
 class RocksDbInstance;
 struct ColumnHandle;
@@ -71,6 +67,8 @@ class OpenRocksDb {
 
   rocksdb::Status FlushWAL(bool sync);
 
+  rocksdb::Status RunCompaction();
+
   rocksdb::DB* get();
 
  private:
@@ -82,8 +80,4 @@ class OpenRocksDb {
   gsl::not_null<std::shared_ptr<ColumnHandle>> column_;
 };
 
-}  // namespace internal
-}  // namespace minifi
-}  // namespace nifi
-}  // namespace apache
-}  // namespace org
+}  // namespace org::apache::nifi::minifi::internal
diff --git a/libminifi/include/core/ContentRepository.h b/libminifi/include/core/ContentRepository.h
index 7d92634fb..ce5a0756c 100644
--- a/libminifi/include/core/ContentRepository.h
+++ b/libminifi/include/core/ContentRepository.h
@@ -53,6 +53,9 @@ class ContentRepository : public StreamManager<minifi::ResourceClaim>, public ut
 
   virtual void clearOrphans() = 0;
 
+  virtual void start() {}
+  virtual void stop() {}
+
  protected:
   std::string directory_;
   std::mutex count_map_mutex_;
diff --git a/libminifi/include/properties/Configuration.h b/libminifi/include/properties/Configuration.h
index 978600bb3..c3b450325 100644
--- a/libminifi/include/properties/Configuration.h
+++ b/libminifi/include/properties/Configuration.h
@@ -69,6 +69,11 @@ class Configuration : public Properties {
   static constexpr const char *nifi_provenance_repository_directory_default = "nifi.provenance.repository.directory.default";
   static constexpr const char *nifi_flowfile_repository_directory_default = "nifi.flowfile.repository.directory.default";
   static constexpr const char *nifi_dbcontent_repository_directory_default = "nifi.database.content.repository.directory.default";
+
+  // these are internal properties related to the rocksdb backend
+  static constexpr const char *nifi_flowfile_repository_rocksdb_compaction_period = "nifi.flowfile.repository.rocksdb.compaction.period";
+  static constexpr const char *nifi_dbcontent_repository_rocksdb_compaction_period = "nifi.database.content.repository.rocksdb.compaction.period";
+
   static constexpr const char *nifi_remote_input_secure = "nifi.remote.input.secure";
   static constexpr const char *nifi_security_need_ClientAuth = "nifi.security.need.ClientAuth";
   static constexpr const char *nifi_sensitive_props_additional_keys = "nifi.sensitive.props.additional.keys";
diff --git a/libminifi/include/utils/StoppableThread.h b/libminifi/include/utils/StoppableThread.h
new file mode 100644
index 000000000..c21ca2633
--- /dev/null
+++ b/libminifi/include/utils/StoppableThread.h
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <mutex>
+#include <thread>
+#include <condition_variable>
+#include <atomic>
+#include <functional>
+
+namespace org::apache::nifi::minifi::utils {
+
+// mimics some aspects of std::jthread
+// unfortunately clang's jthread support is lacking
+// TODO(adebreceni): replace this with std::jthread
+class StoppableThread {
+ public:
+  explicit StoppableThread(std::function<void()> fn);
+
+  void stopAndJoin() {
+    running_ = false;
+    {
+      std::unique_lock lock(mtx_);
+      cv_.notify_all();
+    }
+    if (thread_.joinable()) {
+      thread_.join();
+    }
+  }
+
+  ~StoppableThread() {
+    stopAndJoin();
+  }
+
+  // return true if stop was requested
+  static bool waitForStopRequest(std::chrono::milliseconds time);
+
+ private:
+  std::atomic_bool running_{true};
+  std::mutex mtx_;
+  std::condition_variable cv_;
+  std::thread thread_;
+};
+
+}  // namespace org::apache::nifi::minifi::utils
diff --git a/libminifi/src/Configuration.cpp b/libminifi/src/Configuration.cpp
index c584cb574..ab4bc2670 100644
--- a/libminifi/src/Configuration.cpp
+++ b/libminifi/src/Configuration.cpp
@@ -54,6 +54,8 @@ const std::vector<core::ConfigurationProperty> Configuration::CONFIGURATION_PROP
   core::ConfigurationProperty{Configuration::nifi_provenance_repository_directory_default},
   core::ConfigurationProperty{Configuration::nifi_flowfile_repository_directory_default},
   core::ConfigurationProperty{Configuration::nifi_dbcontent_repository_directory_default},
+  core::ConfigurationProperty{Configuration::nifi_flowfile_repository_rocksdb_compaction_period, gsl::make_not_null(core::StandardValidators::get().TIME_PERIOD_VALIDATOR.get())},
+  core::ConfigurationProperty{Configuration::nifi_dbcontent_repository_rocksdb_compaction_period, gsl::make_not_null(core::StandardValidators::get().TIME_PERIOD_VALIDATOR.get())},
   core::ConfigurationProperty{Configuration::nifi_remote_input_secure, gsl::make_not_null(core::StandardValidators::get().BOOLEAN_VALIDATOR.get())},
   core::ConfigurationProperty{Configuration::nifi_security_need_ClientAuth, gsl::make_not_null(core::StandardValidators::get().BOOLEAN_VALIDATOR.get())},
   core::ConfigurationProperty{Configuration::nifi_sensitive_props_additional_keys},
diff --git a/libminifi/src/FlowController.cpp b/libminifi/src/FlowController.cpp
index 9d0d62e15..a31e6536b 100644
--- a/libminifi/src/FlowController.cpp
+++ b/libminifi/src/FlowController.cpp
@@ -210,6 +210,7 @@ int16_t FlowController::stop() {
     }
     this->flow_file_repo_->stop();
     this->provenance_repo_->stop();
+    this->content_repo_->stop();
     // stop the ControllerServices
     disableAllControllerServices();
     running_ = false;
@@ -385,6 +386,7 @@ int16_t FlowController::start() {
       core::logging::LoggerConfiguration::getConfiguration().initializeAlertSinks(this, configuration_);
       running_ = true;
       this->protocol_->start();
+      this->content_repo_->start();
       this->provenance_repo_->start();
       this->flow_file_repo_->start();
       logger_->log_info("Started Flow Controller");
diff --git a/libminifi/src/utils/StoppableThread.cpp b/libminifi/src/utils/StoppableThread.cpp
new file mode 100644
index 000000000..8fea65a6d
--- /dev/null
+++ b/libminifi/src/utils/StoppableThread.cpp
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "utils/StoppableThread.h"
+#include "utils/gsl.h"
+
+namespace org::apache::nifi::minifi::utils {
+
+static thread_local StoppableThread* current_thread;
+
+StoppableThread::StoppableThread(std::function<void()> fn) {
+  thread_ = std::thread{[fn = std::move(fn), this] {
+    current_thread = this;
+    fn();
+  }};
+}
+
+bool StoppableThread::waitForStopRequest(std::chrono::milliseconds time) {
+  gsl_Expects(current_thread);
+  std::unique_lock lock(current_thread->mtx_);
+  // wait_for returns false if the predicate is still false, i.e. the thread is running
+  return current_thread->cv_.wait_for(lock, time, [&] {return !current_thread->running_;});
+}
+
+}  // namespace org::apache::nifi::minifi::utils
diff --git a/libminifi/test/rocksdb-tests/DBContentRepositoryTests.cpp b/libminifi/test/rocksdb-tests/DBContentRepositoryTests.cpp
index 942f1544e..ac84cd4b0 100644
--- a/libminifi/test/rocksdb-tests/DBContentRepositoryTests.cpp
+++ b/libminifi/test/rocksdb-tests/DBContentRepositoryTests.cpp
@@ -29,10 +29,18 @@
 #include "../unit/ProvenanceTestHelper.h"
 #include "../unit/ContentRepositoryDependentTests.h"
 
+class TestDatabaseContentRepository : public core::repository::DatabaseContentRepository {
+ public:
+  void invalidate() {
+    stop();
+    db_.reset();
+  }
+};
+
 TEST_CASE("Write Claim", "[TestDBCR1]") {
   TestController testController;
   auto dir = testController.createTempDirectory();
-  auto content_repo = std::make_shared<core::repository::DatabaseContentRepository>();
+  auto content_repo = std::make_shared<TestDatabaseContentRepository>();
 
   auto configuration = std::make_shared<org::apache::nifi::minifi::Configure>();
   configuration->set(minifi::Configure::nifi_dbcontent_repository_directory_default, dir.string());
@@ -46,11 +54,11 @@ TEST_CASE("Write Claim", "[TestDBCR1]") {
 
   stream->close();
 
-  content_repo->stop();
+  content_repo->invalidate();
   // reclaim the memory
   content_repo = nullptr;
 
-  content_repo = std::make_shared<core::repository::DatabaseContentRepository>();
+  content_repo = std::make_shared<TestDatabaseContentRepository>();
 
   configuration = std::make_shared<org::apache::nifi::minifi::Configure>();
   configuration->set(minifi::Configure::nifi_dbcontent_repository_directory_default, dir.string());
@@ -72,7 +80,7 @@ TEST_CASE("Write Claim", "[TestDBCR1]") {
 TEST_CASE("Delete Claim", "[TestDBCR2]") {
   TestController testController;
   auto dir = testController.createTempDirectory();
-  auto content_repo = std::make_shared<core::repository::DatabaseContentRepository>();
+  auto content_repo = std::make_shared<TestDatabaseContentRepository>();
 
   auto configuration = std::make_shared<org::apache::nifi::minifi::Configure>();
   configuration->set(minifi::Configure::nifi_dbcontent_repository_directory_default, dir.string());
@@ -86,12 +94,12 @@ TEST_CASE("Delete Claim", "[TestDBCR2]") {
 
   stream->close();
 
-  content_repo->stop();
+  content_repo->invalidate();
 
   // reclaim the memory
   content_repo = nullptr;
 
-  content_repo = std::make_shared<core::repository::DatabaseContentRepository>();
+  content_repo = std::make_shared<TestDatabaseContentRepository>();
 
   configuration = std::make_shared<org::apache::nifi::minifi::Configure>();
   configuration->set(minifi::Configure::nifi_dbcontent_repository_directory_default, dir.string());
@@ -110,7 +118,7 @@ TEST_CASE("Delete Claim", "[TestDBCR2]") {
 TEST_CASE("Test Empty Claim", "[TestDBCR3]") {
   TestController testController;
   auto dir = testController.createTempDirectory();
-  auto content_repo = std::make_shared<core::repository::DatabaseContentRepository>();
+  auto content_repo = std::make_shared<TestDatabaseContentRepository>();
 
   auto configuration = std::make_shared<org::apache::nifi::minifi::Configure>();
   configuration->set(minifi::Configure::nifi_dbcontent_repository_directory_default, dir.string());
@@ -123,12 +131,12 @@ TEST_CASE("Test Empty Claim", "[TestDBCR3]") {
 
   stream->close();
 
-  content_repo->stop();
+  content_repo->invalidate();
 
   // reclaim the memory
   content_repo = nullptr;
 
-  content_repo = std::make_shared<core::repository::DatabaseContentRepository>();
+  content_repo = std::make_shared<TestDatabaseContentRepository>();
 
   configuration = std::make_shared<org::apache::nifi::minifi::Configure>();
   configuration->set(minifi::Configure::nifi_dbcontent_repository_directory_default, dir.string());
@@ -145,7 +153,7 @@ TEST_CASE("Test Empty Claim", "[TestDBCR3]") {
 TEST_CASE("Delete NonExistent Claim", "[TestDBCR4]") {
   TestController testController;
   auto dir = testController.createTempDirectory();
-  auto content_repo = std::make_shared<core::repository::DatabaseContentRepository>();
+  auto content_repo = std::make_shared<TestDatabaseContentRepository>();
 
   auto configuration = std::make_shared<org::apache::nifi::minifi::Configure>();
   configuration->set(minifi::Configure::nifi_dbcontent_repository_directory_default, dir.string());
@@ -159,12 +167,12 @@ TEST_CASE("Delete NonExistent Claim", "[TestDBCR4]") {
 
   stream->close();
 
-  content_repo->stop();
+  content_repo->invalidate();
 
   // reclaim the memory
   content_repo = nullptr;
 
-  content_repo = std::make_shared<core::repository::DatabaseContentRepository>();
+  content_repo = std::make_shared<TestDatabaseContentRepository>();
 
   configuration = std::make_shared<org::apache::nifi::minifi::Configure>();
   configuration->set(minifi::Configure::nifi_dbcontent_repository_directory_default, dir.string());
@@ -185,7 +193,7 @@ TEST_CASE("Delete NonExistent Claim", "[TestDBCR4]") {
 TEST_CASE("Delete Remove Count Claim", "[TestDBCR5]") {
   TestController testController;
   auto dir = testController.createTempDirectory();
-  auto content_repo = std::make_shared<core::repository::DatabaseContentRepository>();
+  auto content_repo = std::make_shared<TestDatabaseContentRepository>();
 
   auto configuration = std::make_shared<org::apache::nifi::minifi::Configure>();
   configuration->set(minifi::Configure::nifi_dbcontent_repository_directory_default, dir.string());
@@ -199,12 +207,12 @@ TEST_CASE("Delete Remove Count Claim", "[TestDBCR5]") {
 
   stream->close();
 
-  content_repo->stop();
+  content_repo->invalidate();
 
   // reclaim the memory
   content_repo = nullptr;
 
-  content_repo = std::make_shared<core::repository::DatabaseContentRepository>();
+  content_repo = std::make_shared<TestDatabaseContentRepository>();
 
   configuration = std::make_shared<org::apache::nifi::minifi::Configure>();
   configuration->set(minifi::Configure::nifi_dbcontent_repository_directory_default, dir.string());