You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by sz...@apache.org on 2023/02/17 00:21:49 UTC

[nifi-minifi-cpp] 03/04: MINIFICPP-2045 Synchronous flow file reloading

This is an automated email from the ASF dual-hosted git repository.

szaszm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit 7e8c5c49c0987bf3e2dcf1e0e16599c5cd7e69ec
Author: Adam Debreceni <ad...@apache.org>
AuthorDate: Fri Feb 17 01:14:07 2023 +0100

    MINIFICPP-2045 Synchronous flow file reloading
    
    ... and orphan content cleanup at startup
    
    Closes #1509
    Signed-off-by: Marton Szasz <sz...@apache.org>
---
 CONFIGURE.md                                       |   1 -
 conf/minifi.properties                             |   1 -
 .../rocksdb-repos/DatabaseContentRepository.cpp    |  33 ++++++
 .../rocksdb-repos/DatabaseContentRepository.h      |   2 +
 extensions/rocksdb-repos/FlowFileRepository.cpp    | 116 ++++----------------
 extensions/rocksdb-repos/FlowFileRepository.h      |  12 ---
 libminifi/include/core/ContentRepository.h         |   2 +
 .../include/core/repository/FileSystemRepository.h |   2 +
 .../core/repository/VolatileContentRepository.h    |   4 +
 libminifi/include/properties/Configuration.h       |   1 -
 libminifi/src/Configuration.cpp                    |   1 -
 .../src/core/repository/FileSystemRepository.cpp   |  13 +++
 libminifi/test/flow-tests/SessionTests.cpp         |   8 +-
 .../test/persistence-tests/PersistenceTests.cpp    |  10 +-
 .../rocksdb-tests/DBContentRepositoryTests.cpp     |  39 +++++++
 libminifi/test/rocksdb-tests/EncryptionTests.cpp   |   9 +-
 libminifi/test/rocksdb-tests/RepoTests.cpp         | 120 ++++++++++++++++-----
 libminifi/test/unit/FileSystemRepositoryTests.cpp  |  24 +++++
 18 files changed, 242 insertions(+), 156 deletions(-)

diff --git a/CONFIGURE.md b/CONFIGURE.md
index a56504f96..304407ecb 100644
--- a/CONFIGURE.md
+++ b/CONFIGURE.md
@@ -160,7 +160,6 @@ folder. You may specify your own path in place of these defaults.
     in minifi.properties
     nifi.provenance.repository.directory.default=${MINIFI_HOME}/provenance_repository
     nifi.flowfile.repository.directory.default=${MINIFI_HOME}/flowfile_repository
-    nifi.flowfile.checkpoint.directory.default=${MINIFI_HOME}/flowfile_checkpoint
     nifi.database.content.repository.directory.default=${MINIFI_HOME}/content_repository
 
 #### Shared database
diff --git a/conf/minifi.properties b/conf/minifi.properties
index dca8e39fb..491a94016 100644
--- a/conf/minifi.properties
+++ b/conf/minifi.properties
@@ -28,7 +28,6 @@ nifi.provenance.repository.directory.default=${MINIFI_HOME}/provenance_repositor
 nifi.provenance.repository.max.storage.time=1 MIN
 nifi.provenance.repository.max.storage.size=1 MB
 nifi.flowfile.repository.directory.default=${MINIFI_HOME}/flowfile_repository
-nifi.flowfile.checkpoint.directory.default=${MINIFI_HOME}/flowfile_checkpoint
 nifi.database.content.repository.directory.default=${MINIFI_HOME}/content_repository
 nifi.provenance.repository.class.name=NoOpRepository
 nifi.content.repository.class.name=DatabaseContentRepository
diff --git a/extensions/rocksdb-repos/DatabaseContentRepository.cpp b/extensions/rocksdb-repos/DatabaseContentRepository.cpp
index 0924534f0..881de1085 100644
--- a/extensions/rocksdb-repos/DatabaseContentRepository.cpp
+++ b/extensions/rocksdb-repos/DatabaseContentRepository.cpp
@@ -20,6 +20,7 @@
 #include <memory>
 #include <string>
 #include <utility>
+#include <vector>
 
 #include "encryption/RocksDbEncryptionProvider.h"
 #include "RocksDbStream.h"
@@ -179,6 +180,38 @@ std::shared_ptr<io::BaseStream> DatabaseContentRepository::write(const minifi::R
   return std::make_shared<io::RocksDbStream>(claim.getContentFullPath(), gsl::make_not_null<minifi::internal::RocksDatabase*>(db_.get()), true, batch);
 }
 
+void DatabaseContentRepository::clearOrphans() {
+  if (!is_valid_ || !db_) {
+    logger_->log_error("Cannot delete orphan content entries, repository is invalid");
+    return;
+  }
+  auto opendb = db_->open();
+  if (!opendb) {
+    logger_->log_error("Cannot delete orphan content entries, could not open repository");
+    return;
+  }
+  std::vector<std::string> keys_to_be_deleted;
+  auto it = opendb->NewIterator(rocksdb::ReadOptions());
+  for (it->SeekToFirst(); it->Valid(); it->Next()) {
+    auto key = it->key().ToString();
+    auto claim_it = count_map_.find(key);
+    if (claim_it == count_map_.end() || claim_it->second == 0) {
+      logger_->log_error("Deleting orphan resource %s", key);
+      keys_to_be_deleted.push_back(key);
+    }
+  }
+  auto batch = opendb->createWriteBatch();
+  for (auto& key : keys_to_be_deleted) {
+    batch.Delete(key);
+  }
+
+  rocksdb::Status status = opendb->Write(rocksdb::WriteOptions(), &batch);
+
+  if (!status.ok()) {
+    logger_->log_error("Could not delete orphan contents from rocksdb database: %s", status.ToString());
+  }
+}
+
 REGISTER_RESOURCE_AS(DatabaseContentRepository, InternalResource, ("DatabaseContentRepository", "databasecontentrepository"));
 
 }  // namespace org::apache::nifi::minifi::core::repository
diff --git a/extensions/rocksdb-repos/DatabaseContentRepository.h b/extensions/rocksdb-repos/DatabaseContentRepository.h
index 9961d2607..98f3acb79 100644
--- a/extensions/rocksdb-repos/DatabaseContentRepository.h
+++ b/extensions/rocksdb-repos/DatabaseContentRepository.h
@@ -67,6 +67,8 @@ class DatabaseContentRepository : public core::ContentRepository {
   bool remove(const minifi::ResourceClaim &claim) override;
   bool exists(const minifi::ResourceClaim &streamId) override;
 
+  void clearOrphans() override;
+
  private:
   std::shared_ptr<io::BaseStream> write(const minifi::ResourceClaim &claim, bool append, minifi::internal::WriteBatch* batch);
 
diff --git a/extensions/rocksdb-repos/FlowFileRepository.cpp b/extensions/rocksdb-repos/FlowFileRepository.cpp
index 690a797a8..ecb9a1bcc 100644
--- a/extensions/rocksdb-repos/FlowFileRepository.cpp
+++ b/extensions/rocksdb-repos/FlowFileRepository.cpp
@@ -111,9 +111,6 @@ void FlowFileRepository::printStats() {
 
 void FlowFileRepository::run() {
   auto last = std::chrono::steady_clock::now();
-  if (isRunning()) {
-    prune_stored_flowfiles();
-  }
   while (isRunning()) {
     std::this_thread::sleep_for(purge_period_);
     flush();
@@ -126,38 +123,29 @@ void FlowFileRepository::run() {
   flush();
 }
 
-void FlowFileRepository::prune_stored_flowfiles() {
-  const auto encrypted_env = createEncryptingEnv(utils::crypto::EncryptionManager{config_->getHome()}, DbEncryptionOptions{checkpoint_dir_.string(), ENCRYPTION_KEY_NAME});
-  logger_->log_info("Using %s FlowFileRepository checkpoint", encrypted_env ? "encrypted" : "plaintext");
-
-  auto set_db_opts = [encrypted_env] (minifi::internal::Writable<rocksdb::DBOptions>& db_opts) {
-    db_opts.set(&rocksdb::DBOptions::create_if_missing, true);
-    db_opts.set(&rocksdb::DBOptions::use_direct_io_for_flush_and_compaction, true);
-    db_opts.set(&rocksdb::DBOptions::use_direct_reads, true);
-    if (encrypted_env) {
-      db_opts.set(&rocksdb::DBOptions::env, encrypted_env.get(), EncryptionEq{});
-    } else {
-      db_opts.set(&rocksdb::DBOptions::env, rocksdb::Env::Default());
-    }
-  };
-  auto checkpointDB = minifi::internal::RocksDatabase::create(set_db_opts, {}, checkpoint_dir_.string(), minifi::internal::RocksDbMode::ReadOnly);
-  std::optional<minifi::internal::OpenRocksDb> opendb;
-  if (nullptr != checkpoint_) {
-    opendb = checkpointDB->open();
-    if (opendb) {
-      logger_->log_trace("Successfully opened checkpoint database at '%s'", checkpoint_dir_.string());
-    } else {
-      logger_->log_error("Couldn't open checkpoint database at '%s' using live database", checkpoint_dir_.string());
-      opendb = db_->open();
-    }
-    if (!opendb) {
-      logger_->log_trace("Could not open neither the checkpoint nor the live database.");
-      return;
+bool FlowFileRepository::ExecuteWithRetry(const std::function<rocksdb::Status()>& operation) {
+  constexpr int RETRY_COUNT = 3;
+  std::chrono::milliseconds wait_time = 0ms;
+  for (int i=0; i < RETRY_COUNT; ++i) {
+    auto status = operation();
+    if (status.ok()) {
+      logger_->log_trace("Rocksdb operation executed successfully");
+      return true;
     }
-  } else {
-    logger_->log_trace("Could not open checkpoint as object doesn't exist. Likely not needed or file system error.");
+    logger_->log_error("Rocksdb operation failed: %s", status.ToString());
+    wait_time += FLOWFILE_REPOSITORY_RETRY_INTERVAL_INCREMENTS;
+    std::this_thread::sleep_for(wait_time);
+  }
+  return false;
+}
+
+void FlowFileRepository::initialize_repository() {
+  auto opendb = db_->open();
+  if (!opendb) {
+    logger_->log_trace("Couldn't open database to load existing flow files");
     return;
   }
+  logger_->log_info("Reading existing flow files from database");
 
   auto it = opendb->NewIterator(rocksdb::ReadOptions());
   for (it->SeekToFirst(); it->Valid(); it->Next()) {
@@ -191,62 +179,8 @@ void FlowFileRepository::prune_stored_flowfiles() {
       keys_to_delete.enqueue(key);
     }
   }
-}
-
-bool FlowFileRepository::ExecuteWithRetry(const std::function<rocksdb::Status()>& operation) {
-  std::chrono::milliseconds waitTime = 0ms;
-  for (int i=0; i < 3; ++i) {
-    auto status = operation();
-    if (status.ok()) {
-      logger_->log_trace("Rocksdb operation executed successfully");
-      return true;
-    }
-    logger_->log_error("Rocksdb operation failed: %s", status.ToString());
-    waitTime += FLOWFILE_REPOSITORY_RETRY_INTERVAL_INCREMENTS;
-    std::this_thread::sleep_for(waitTime);
-  }
-  return false;
-}
-
-/**
- * Returns True if there is data to interrogate.
- * @return true if our db has data stored.
- */
-bool FlowFileRepository::need_checkpoint(minifi::internal::OpenRocksDb& opendb) {
-  auto it = opendb.NewIterator(rocksdb::ReadOptions());
-  it->SeekToFirst();
-  return it->Valid();
-}
-void FlowFileRepository::initialize_repository() {
-  checkpoint_.reset();
-  auto opendb = db_->open();
-  if (!opendb) {
-    logger_->log_trace("Couldn't open database, no way to checkpoint");
-    return;
-  }
-  // first we need to establish a checkpoint iff it is needed.
-  if (!need_checkpoint(*opendb)) {
-    logger_->log_trace("Do not need checkpoint");
-    return;
-  }
-  // delete any previous copy
-  if (utils::file::delete_dir(checkpoint_dir_) < 0) {
-    logger_->log_error("Could not delete existing checkpoint directory '%s'", checkpoint_dir_.string());
-    return;
-  }
-  std::unique_ptr<rocksdb::Checkpoint> checkpoint;
-  rocksdb::Status checkpoint_status = opendb->NewCheckpoint(checkpoint);
-  if (!checkpoint_status.ok()) {
-    logger_->log_error("Could not create checkpoint object: %s", checkpoint_status.ToString());
-    return;
-  }
-  checkpoint_status = checkpoint->CreateCheckpoint(checkpoint_dir_.string());
-  if (!checkpoint_status.ok()) {
-    logger_->log_error("Could not initialize checkpoint: %s", checkpoint_status.ToString());
-    return;
-  }
-  checkpoint_ = std::move(checkpoint);
-  logger_->log_trace("Created checkpoint in directory '%s'", checkpoint_dir_.string());
+  flush();
+  content_repo_->clearOrphans();
 }
 
 void FlowFileRepository::loadComponent(const std::shared_ptr<core::ContentRepository> &content_repo) {
@@ -266,12 +200,6 @@ bool FlowFileRepository::initialize(const std::shared_ptr<Configure> &configure)
   }
   logger_->log_debug("NiFi FlowFile Repository Directory %s", directory_);
 
-  value.clear();
-  if (configure->get(Configure::nifi_flowfile_checkpoint_directory_default, value) && !value.empty()) {
-    checkpoint_dir_ = value;
-  }
-  logger_->log_debug("NiFi FlowFile Checkpoint Directory %s", checkpoint_dir_.string());
-
   const auto encrypted_env = createEncryptingEnv(utils::crypto::EncryptionManager{configure->getHome()}, DbEncryptionOptions{directory_, ENCRYPTION_KEY_NAME});
   logger_->log_info("Using %s FlowFileRepository", encrypted_env ? "encrypted" : "plaintext");
 
diff --git a/extensions/rocksdb-repos/FlowFileRepository.h b/extensions/rocksdb-repos/FlowFileRepository.h
index aeda5af9c..832fde642 100644
--- a/extensions/rocksdb-repos/FlowFileRepository.h
+++ b/extensions/rocksdb-repos/FlowFileRepository.h
@@ -66,13 +66,11 @@ class FlowFileRepository : public ThreadedRepository, public SwapManager {
   }
 
   explicit FlowFileRepository(const std::string& repo_name = "",
-                              std::filesystem::path checkpoint_dir = FLOWFILE_CHECKPOINT_DIRECTORY,
                               std::string directory = FLOWFILE_REPOSITORY_DIRECTORY,
                               std::chrono::milliseconds maxPartitionMillis = MAX_FLOWFILE_REPOSITORY_ENTRY_LIFE_TIME,
                               int64_t maxPartitionBytes = MAX_FLOWFILE_REPOSITORY_STORAGE_SIZE,
                               std::chrono::milliseconds purgePeriod = FLOWFILE_REPOSITORY_PURGE_PERIOD)
     : ThreadedRepository(repo_name.length() > 0 ? std::move(repo_name) : core::getClassName<FlowFileRepository>(), std::move(directory), maxPartitionMillis, maxPartitionBytes, purgePeriod),
-      checkpoint_dir_(std::move(checkpoint_dir)),
       logger_(logging::LoggerFactory<FlowFileRepository>::getLogger()) {
   }
 
@@ -112,23 +110,13 @@ class FlowFileRepository : public ThreadedRepository, public SwapManager {
 
   void initialize_repository();
 
-  /**
-   * Returns true if a checkpoint is needed at startup
-   * @return true if a checkpoint is needed.
-   */
-  static bool need_checkpoint(minifi::internal::OpenRocksDb& opendb);
-
-  void prune_stored_flowfiles();
-
   std::thread& getThread() override {
     return thread_;
   }
 
-  std::filesystem::path checkpoint_dir_;
   moodycamel::ConcurrentQueue<std::string> keys_to_delete;
   std::shared_ptr<core::ContentRepository> content_repo_;
   std::unique_ptr<minifi::internal::RocksDatabase> db_;
-  std::unique_ptr<rocksdb::Checkpoint> checkpoint_;
   std::unique_ptr<FlowFileLoader> swap_loader_;
   std::shared_ptr<logging::Logger> logger_;
   std::shared_ptr<minifi::Configure> config_;
diff --git a/libminifi/include/core/ContentRepository.h b/libminifi/include/core/ContentRepository.h
index 84ac4eded..7d92634fb 100644
--- a/libminifi/include/core/ContentRepository.h
+++ b/libminifi/include/core/ContentRepository.h
@@ -51,6 +51,8 @@ class ContentRepository : public StreamManager<minifi::ResourceClaim>, public ut
   void incrementStreamCount(const minifi::ResourceClaim &streamId) override;
   StreamState decrementStreamCount(const minifi::ResourceClaim &streamId) override;
 
+  virtual void clearOrphans() = 0;
+
  protected:
   std::string directory_;
   std::mutex count_map_mutex_;
diff --git a/libminifi/include/core/repository/FileSystemRepository.h b/libminifi/include/core/repository/FileSystemRepository.h
index 66cec39ef..01926f5bd 100644
--- a/libminifi/include/core/repository/FileSystemRepository.h
+++ b/libminifi/include/core/repository/FileSystemRepository.h
@@ -49,6 +49,8 @@ class FileSystemRepository : public core::ContentRepository {
   bool remove(const minifi::ResourceClaim& claim) override;
   std::shared_ptr<ContentSession> createSession() override;
 
+  void clearOrphans() override;
+
  private:
   std::shared_ptr<logging::Logger> logger_;
 };
diff --git a/libminifi/include/core/repository/VolatileContentRepository.h b/libminifi/include/core/repository/VolatileContentRepository.h
index 54c1dcc6f..62e9fd80b 100644
--- a/libminifi/include/core/repository/VolatileContentRepository.h
+++ b/libminifi/include/core/repository/VolatileContentRepository.h
@@ -94,6 +94,10 @@ class VolatileContentRepository : public core::ContentRepository {
    */
   bool remove(const minifi::ResourceClaim &claim) override;
 
+  void clearOrphans() override {
+    // there are no persisted orphans to delete
+  }
+
  private:
   VolatileRepositoryData repo_data_;
   bool minimize_locking_;
diff --git a/libminifi/include/properties/Configuration.h b/libminifi/include/properties/Configuration.h
index eb2f598dc..25e6b5ebc 100644
--- a/libminifi/include/properties/Configuration.h
+++ b/libminifi/include/properties/Configuration.h
@@ -66,7 +66,6 @@ class Configuration : public Properties {
   static constexpr const char *nifi_provenance_repository_max_storage_time = "nifi.provenance.repository.max.storage.time";
   static constexpr const char *nifi_provenance_repository_directory_default = "nifi.provenance.repository.directory.default";
   static constexpr const char *nifi_flowfile_repository_directory_default = "nifi.flowfile.repository.directory.default";
-  static constexpr const char *nifi_flowfile_checkpoint_directory_default = "nifi.flowfile.checkpoint.directory.default";
   static constexpr const char *nifi_dbcontent_repository_directory_default = "nifi.database.content.repository.directory.default";
   static constexpr const char *nifi_remote_input_secure = "nifi.remote.input.secure";
   static constexpr const char *nifi_security_need_ClientAuth = "nifi.security.need.ClientAuth";
diff --git a/libminifi/src/Configuration.cpp b/libminifi/src/Configuration.cpp
index 69b24e9e8..78ae790f7 100644
--- a/libminifi/src/Configuration.cpp
+++ b/libminifi/src/Configuration.cpp
@@ -51,7 +51,6 @@ const std::vector<core::ConfigurationProperty> Configuration::CONFIGURATION_PROP
   core::ConfigurationProperty{Configuration::nifi_provenance_repository_max_storage_time, gsl::make_not_null(core::StandardValidators::get().TIME_PERIOD_VALIDATOR.get())},
   core::ConfigurationProperty{Configuration::nifi_provenance_repository_directory_default},
   core::ConfigurationProperty{Configuration::nifi_flowfile_repository_directory_default},
-  core::ConfigurationProperty{Configuration::nifi_flowfile_checkpoint_directory_default},
   core::ConfigurationProperty{Configuration::nifi_dbcontent_repository_directory_default},
   core::ConfigurationProperty{Configuration::nifi_remote_input_secure, gsl::make_not_null(core::StandardValidators::get().BOOLEAN_VALIDATOR.get())},
   core::ConfigurationProperty{Configuration::nifi_security_need_ClientAuth, gsl::make_not_null(core::StandardValidators::get().BOOLEAN_VALIDATOR.get())},
diff --git a/libminifi/src/core/repository/FileSystemRepository.cpp b/libminifi/src/core/repository/FileSystemRepository.cpp
index b41cbcfaf..034e9c36a 100644
--- a/libminifi/src/core/repository/FileSystemRepository.cpp
+++ b/libminifi/src/core/repository/FileSystemRepository.cpp
@@ -59,4 +59,17 @@ std::shared_ptr<ContentSession> FileSystemRepository::createSession() {
   return std::make_shared<ForwardingContentSession>(sharedFromThis());
 }
 
+void FileSystemRepository::clearOrphans() {
+  std::lock_guard<std::mutex> lock(count_map_mutex_);
+  utils::file::list_dir(directory_, [&] (auto& /*dir*/, auto& filename) {
+    auto path = directory_ +  "/" + filename.string();
+    auto it = count_map_.find(path);
+    if (it == count_map_.end() || it->second == 0) {
+      logger_->log_debug("Deleting orphan resource %s", path);
+      std::remove(path.c_str());
+    }
+    return true;
+  }, logger_, false);
+}
+
 }  // namespace org::apache::nifi::minifi::core::repository
diff --git a/libminifi/test/flow-tests/SessionTests.cpp b/libminifi/test/flow-tests/SessionTests.cpp
index 59cd8e26b..b722e93b1 100644
--- a/libminifi/test/flow-tests/SessionTests.cpp
+++ b/libminifi/test/flow-tests/SessionTests.cpp
@@ -46,12 +46,6 @@ class TestProcessor : public minifi::core::Processor {
   ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_PROCESSORS
 };
 
-#ifdef WIN32
-const std::string SESSIONTEST_FLOWFILE_CHECKPOINT_DIR = ".\\sessiontest_flowfile_checkpoint";
-#else
-const std::string SESSIONTEST_FLOWFILE_CHECKPOINT_DIR = "./sessiontest_flowfile_checkpoint";
-#endif
-
 TEST_CASE("Import null data") {
   TestController testController;
   LogTestController::getInstance().setDebug<core::ContentRepository>();
@@ -69,7 +63,7 @@ TEST_CASE("Import null data") {
   config->set(minifi::Configure::nifi_flowfile_repository_directory_default, (dir / "flowfile_repository").string());
 
   std::shared_ptr<core::Repository> prov_repo = core::createRepository("nooprepository");
-  std::shared_ptr<core::Repository> ff_repository = std::make_shared<core::repository::FlowFileRepository>("flowFileRepository", SESSIONTEST_FLOWFILE_CHECKPOINT_DIR);
+  std::shared_ptr<core::Repository> ff_repository = std::make_shared<core::repository::FlowFileRepository>("flowFileRepository");
   std::shared_ptr<core::ContentRepository> content_repo;
   SECTION("VolatileContentRepository") {
     testController.getLogger()->log_info("Using VolatileContentRepository");
diff --git a/libminifi/test/persistence-tests/PersistenceTests.cpp b/libminifi/test/persistence-tests/PersistenceTests.cpp
index 530aa06c6..e417ae417 100644
--- a/libminifi/test/persistence-tests/PersistenceTests.cpp
+++ b/libminifi/test/persistence-tests/PersistenceTests.cpp
@@ -53,12 +53,6 @@ class TestProcessor : public minifi::core::Processor {
   ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_PROCESSORS
 };
 
-#ifdef WIN32
-const std::string PERSISTENCETEST_FLOWFILE_CHECKPOINT_DIR = ".\\persistencetest_flowfile_checkpoint";
-#else
-const std::string PERSISTENCETEST_FLOWFILE_CHECKPOINT_DIR = "./persistencetest_flowfile_checkpoint";
-#endif
-
 struct TestFlow{
   TestFlow(const std::shared_ptr<core::Repository>& ff_repository, const std::shared_ptr<core::ContentRepository>& content_repo, const std::shared_ptr<core::Repository>& prov_repo,
         const std::function<std::unique_ptr<core::Processor>(utils::Identifier&)>& processorGenerator, const core::Relationship& relationshipToOutput)
@@ -179,7 +173,7 @@ TEST_CASE("Processors Can Store FlowFiles", "[TestP1]") {
   config->set(minifi::Configure::nifi_flowfile_repository_directory_default, (dir / "flowfile_repository").string());
 
   std::shared_ptr<core::Repository> prov_repo = std::make_shared<TestThreadedRepository>();
-  auto ff_repository = std::make_shared<core::repository::FlowFileRepository>("flowFileRepository", PERSISTENCETEST_FLOWFILE_CHECKPOINT_DIR);
+  auto ff_repository = std::make_shared<core::repository::FlowFileRepository>("flowFileRepository");
   std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::FileSystemRepository>();
   ff_repository->initialize(config);
   content_repo->initialize(config);
@@ -286,7 +280,7 @@ TEST_CASE("Persisted flowFiles are updated on modification", "[TestP1]") {
   config->set(minifi::Configure::nifi_flowfile_repository_directory_default, (dir / "flowfile_repository").string());
 
   std::shared_ptr<core::Repository> prov_repo = std::make_shared<TestThreadedRepository>();
-  std::shared_ptr<core::Repository> ff_repository = std::make_shared<core::repository::FlowFileRepository>("flowFileRepository", PERSISTENCETEST_FLOWFILE_CHECKPOINT_DIR);
+  std::shared_ptr<core::Repository> ff_repository = std::make_shared<core::repository::FlowFileRepository>("flowFileRepository");
   std::shared_ptr<core::ContentRepository> content_repo;
   SECTION("VolatileContentRepository") {
     testController.getLogger()->log_info("Using VolatileContentRepository");
diff --git a/libminifi/test/rocksdb-tests/DBContentRepositoryTests.cpp b/libminifi/test/rocksdb-tests/DBContentRepositoryTests.cpp
index 0a50cd129..942f1544e 100644
--- a/libminifi/test/rocksdb-tests/DBContentRepositoryTests.cpp
+++ b/libminifi/test/rocksdb-tests/DBContentRepositoryTests.cpp
@@ -242,3 +242,42 @@ TEST_CASE("ProcessSession::append should append to the flowfile and set its size
 TEST_CASE("ProcessSession::read can read zero length flowfiles without crash (RocksDB)", "[zerolengthread]") {
   ContentRepositoryDependentTests::testReadFromZeroLengthFlowFile(std::make_shared<core::repository::DatabaseContentRepository>());
 }
+
+size_t getDbSize(const std::filesystem::path& dir) {
+  auto db = minifi::internal::RocksDatabase::create({}, {}, dir.string());
+  auto opendb = db->open();
+  REQUIRE(opendb);
+
+  size_t count = 0;
+  auto it = opendb->NewIterator({});
+  for (it->SeekToFirst(); it->Valid(); it->Next()) {
+    ++count;
+  }
+  return count;
+}
+
+TEST_CASE("DBContentRepository can clear orphan entries") {
+  TestController testController;
+  auto dir = testController.createTempDirectory();
+  auto configuration = std::make_shared<org::apache::nifi::minifi::Configure>();
+  configuration->set(minifi::Configure::nifi_dbcontent_repository_directory_default, dir.string());
+  {
+    auto content_repo = std::make_shared<core::repository::DatabaseContentRepository>();
+    REQUIRE(content_repo->initialize(configuration));
+
+    minifi::ResourceClaim claim(content_repo);
+    content_repo->write(claim)->write("hi");
+    // ensure that the content is not deleted during resource claim destruction
+    content_repo->incrementStreamCount(claim);
+  }
+
+  REQUIRE(getDbSize(dir) == 1);
+
+  {
+    auto content_repo = std::make_shared<core::repository::DatabaseContentRepository>();
+    REQUIRE(content_repo->initialize(configuration));
+    content_repo->clearOrphans();
+  }
+
+  REQUIRE(getDbSize(dir) == 0);
+}
diff --git a/libminifi/test/rocksdb-tests/EncryptionTests.cpp b/libminifi/test/rocksdb-tests/EncryptionTests.cpp
index 2fff68980..b544d7016 100644
--- a/libminifi/test/rocksdb-tests/EncryptionTests.cpp
+++ b/libminifi/test/rocksdb-tests/EncryptionTests.cpp
@@ -33,7 +33,6 @@ class FFRepoFixture : public TestController {
     LogTestController::getInstance().setTrace<FlowFileRepository>();
     home_ = createTempDirectory();
     repo_dir_ = home_ / "flowfile_repo";
-    checkpoint_dir_ = home_ / "checkpoint_dir";
     config_ = std::make_shared<minifi::Configure>();
     config_->setHome(home_);
     container_ = std::make_unique<minifi::Connection>(nullptr, nullptr, "container");
@@ -50,7 +49,7 @@ class FFRepoFixture : public TestController {
 
   template<typename Fn>
   void runWithNewRepository(Fn&& fn) {
-    auto repository = std::make_shared<FlowFileRepository>("ff", checkpoint_dir_, repo_dir_.string());
+    auto repository = std::make_shared<FlowFileRepository>("ff", repo_dir_.string());
     repository->initialize(config_);
     std::map<std::string, core::Connectable*> container_map;
     container_map[container_->getUUIDStr()] = container_.get();
@@ -65,7 +64,6 @@ class FFRepoFixture : public TestController {
   std::unique_ptr<minifi::Connection> container_;
   std::filesystem::path home_;
   std::filesystem::path repo_dir_;
-  std::filesystem::path checkpoint_dir_;
   std::shared_ptr<minifi::Configure> config_;
   std::shared_ptr<core::repository::VolatileContentRepository> content_repo_;
 };
@@ -93,14 +91,11 @@ TEST_CASE_METHOD(FFRepoFixture, "FlowFileRepository creates checkpoint and loads
   REQUIRE(container_->isEmpty());
 
   runWithNewRepository([&] (const std::shared_ptr<core::repository::FlowFileRepository>& /*repo*/) {
-    // wait for the flowfiles to be loaded from the checkpoint
+    // wait for the flowfiles to be loaded
     bool success = utils::verifyEventHappenedInPollTime(std::chrono::seconds{5}, [&] {
       return !container_->isEmpty();
     });
     REQUIRE(success);
-    REQUIRE(utils::verifyLogLinePresenceInPollTime(
-        std::chrono::seconds{5},
-        "Successfully opened checkpoint database at '" + checkpoint_dir_.string() + "'"));
     std::set<std::shared_ptr<core::FlowFile>> expired;
     auto flowfile = container_->poll(expired);
     REQUIRE(expired.empty());
diff --git a/libminifi/test/rocksdb-tests/RepoTests.cpp b/libminifi/test/rocksdb-tests/RepoTests.cpp
index e280c0432..ef4d073bf 100644
--- a/libminifi/test/rocksdb-tests/RepoTests.cpp
+++ b/libminifi/test/rocksdb-tests/RepoTests.cpp
@@ -39,12 +39,6 @@ using namespace std::literals::chrono_literals;
 
 namespace {
 
-#ifdef WIN32
-const std::string REPOTEST_FLOWFILE_CHECKPOINT_DIR = ".\\repotest_flowfile_checkpoint";
-#else
-const std::string REPOTEST_FLOWFILE_CHECKPOINT_DIR = "./repotest_flowfile_checkpoint";
-#endif
-
 namespace {
 class TestProcessor : public minifi::core::Processor {
  public:
@@ -72,7 +66,7 @@ TEST_CASE("Test Repo Empty Value Attribute", "[TestFFR1]") {
   LogTestController::getInstance().setDebug<core::repository::FlowFileRepository>();
   TestController testController;
   auto dir = testController.createTempDirectory();
-  std::shared_ptr<core::repository::FlowFileRepository> repository = std::make_shared<core::repository::FlowFileRepository>("ff", REPOTEST_FLOWFILE_CHECKPOINT_DIR, dir.string(), 0ms, 0, 1ms);
+  std::shared_ptr<core::repository::FlowFileRepository> repository = std::make_shared<core::repository::FlowFileRepository>("ff", dir.string(), 0ms, 0, 1ms);
 
   repository->initialize(std::make_shared<minifi::Configure>());
 
@@ -83,8 +77,6 @@ TEST_CASE("Test Repo Empty Value Attribute", "[TestFFR1]") {
 
   REQUIRE(true == file->Persist(repository));
 
-  utils::file::FileUtils::delete_dir(REPOTEST_FLOWFILE_CHECKPOINT_DIR, true);
-
   repository->stop();
 }
 
@@ -94,7 +86,7 @@ TEST_CASE("Test Repo Empty Key Attribute ", "[TestFFR2]") {
   LogTestController::getInstance().setDebug<core::repository::FlowFileRepository>();
   TestController testController;
   auto dir = testController.createTempDirectory();
-  std::shared_ptr<core::repository::FlowFileRepository> repository = std::make_shared<core::repository::FlowFileRepository>("ff", REPOTEST_FLOWFILE_CHECKPOINT_DIR, dir.string(), 0ms, 0, 1ms);
+  std::shared_ptr<core::repository::FlowFileRepository> repository = std::make_shared<core::repository::FlowFileRepository>("ff", dir.string(), 0ms, 0, 1ms);
 
   repository->initialize(std::make_shared<minifi::Configure>());
   std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
@@ -106,8 +98,6 @@ TEST_CASE("Test Repo Empty Key Attribute ", "[TestFFR2]") {
 
   REQUIRE(true == file->Persist(repository));
 
-  utils::file::FileUtils::delete_dir(REPOTEST_FLOWFILE_CHECKPOINT_DIR, true);
-
   repository->stop();
 }
 
@@ -117,7 +107,7 @@ TEST_CASE("Test Repo Key Attribute Verify ", "[TestFFR3]") {
   LogTestController::getInstance().setDebug<core::repository::FlowFileRepository>();
   TestController testController;
   auto dir = testController.createTempDirectory();
-  std::shared_ptr<core::repository::FlowFileRepository> repository = std::make_shared<core::repository::FlowFileRepository>("ff", REPOTEST_FLOWFILE_CHECKPOINT_DIR, dir.string(), 0ms, 0, 1ms);
+  std::shared_ptr<core::repository::FlowFileRepository> repository = std::make_shared<core::repository::FlowFileRepository>("ff", dir.string(), 0ms, 0, 1ms);
 
   repository->initialize(std::make_shared<org::apache::nifi::minifi::Configure>());
 
@@ -154,8 +144,6 @@ TEST_CASE("Test Repo Key Attribute Verify ", "[TestFFR3]") {
 
   REQUIRE(record2->getAttribute("keyB", value));
   REQUIRE(value.empty());
-
-  utils::file::FileUtils::delete_dir(REPOTEST_FLOWFILE_CHECKPOINT_DIR, true);
 }
 
 TEST_CASE("Test Delete Content ", "[TestFFR4]") {
@@ -167,7 +155,7 @@ TEST_CASE("Test Delete Content ", "[TestFFR4]") {
 
   auto dir = testController.createTempDirectory();
 
-  std::shared_ptr<core::repository::FlowFileRepository> repository = std::make_shared<core::repository::FlowFileRepository>("ff", REPOTEST_FLOWFILE_CHECKPOINT_DIR, dir.string(), 0ms, 0, 1ms);
+  std::shared_ptr<core::repository::FlowFileRepository> repository = std::make_shared<core::repository::FlowFileRepository>("ff", dir.string(), 0ms, 0, 1ms);
 
   std::fstream file;
   file.open(dir / "tstFile.ext", std::ios::out);
@@ -203,14 +191,11 @@ TEST_CASE("Test Delete Content ", "[TestFFR4]") {
   std::ifstream fileopen(dir / "tstFile.ext", std::ios::in);
   REQUIRE(!fileopen.good());
 
-  utils::file::FileUtils::delete_dir(REPOTEST_FLOWFILE_CHECKPOINT_DIR, true);
-
   LogTestController::getInstance().reset();
 }
 
 TEST_CASE("Test Validate Checkpoint ", "[TestFFR5]") {
   TestController testController;
-  utils::file::FileUtils::delete_dir(REPOTEST_FLOWFILE_CHECKPOINT_DIR, true);
 
   LogTestController::getInstance().setDebug<core::ContentRepository>();
   LogTestController::getInstance().setTrace<core::repository::FileSystemRepository>();
@@ -219,7 +204,7 @@ TEST_CASE("Test Validate Checkpoint ", "[TestFFR5]") {
 
   auto dir = testController.createTempDirectory();
 
-  std::shared_ptr<core::repository::FlowFileRepository> repository = std::make_shared<core::repository::FlowFileRepository>("ff", REPOTEST_FLOWFILE_CHECKPOINT_DIR, dir.string(), 0ms, 0, 1ms);
+  std::shared_ptr<core::repository::FlowFileRepository> repository = std::make_shared<core::repository::FlowFileRepository>("ff", dir.string(), 0ms, 0, 1ms);
 
   std::fstream file;
   file.open(dir / "tstFile.ext", std::ios::out);
@@ -264,8 +249,6 @@ TEST_CASE("Test Validate Checkpoint ", "[TestFFR5]") {
   std::ifstream fileopen(dir / "tstFile.ext", std::ios::in);
   REQUIRE(fileopen.fail());
 
-  utils::file::FileUtils::delete_dir(REPOTEST_FLOWFILE_CHECKPOINT_DIR, true);
-
   LogTestController::getInstance().reset();
 }
 
@@ -284,7 +267,7 @@ TEST_CASE("Test FlowFile Restore", "[TestFFR6]") {
   config->set(minifi::Configure::nifi_flowfile_repository_directory_default, (dir / "flowfile_repository").string());
 
   std::shared_ptr<core::Repository> prov_repo = std::make_shared<TestThreadedRepository>();
-  auto ff_repository = std::make_shared<core::repository::FlowFileRepository>("flowFileRepository", REPOTEST_FLOWFILE_CHECKPOINT_DIR);
+  auto ff_repository = std::make_shared<core::repository::FlowFileRepository>("flowFileRepository");
   std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::FileSystemRepository>();
   ff_repository->initialize(config);
   content_repo->initialize(config);
@@ -361,7 +344,7 @@ TEST_CASE("Flush deleted flowfiles before shutdown", "[TestFFR7]") {
   class TestFlowFileRepository: public core::repository::FlowFileRepository{
    public:
     explicit TestFlowFileRepository(const std::string& name)
-      : FlowFileRepository(name, REPOTEST_FLOWFILE_CHECKPOINT_DIR, core::repository::FLOWFILE_REPOSITORY_DIRECTORY,
+      : FlowFileRepository(name, core::repository::FLOWFILE_REPOSITORY_DIRECTORY,
                            10min, core::repository::MAX_FLOWFILE_REPOSITORY_STORAGE_SIZE, 1ms) {}
 
     void flush() override {
@@ -438,4 +421,93 @@ TEST_CASE("Flush deleted flowfiles before shutdown", "[TestFFR7]") {
   }
 }
 
+TEST_CASE("FlowFileRepository triggers content repo orphan clear") {
+  LogTestController::getInstance().setDebug<core::ContentRepository>();
+  LogTestController::getInstance().setDebug<core::repository::FileSystemRepository>();
+  LogTestController::getInstance().setDebug<core::repository::FlowFileRepository>();
+  TestController testController;
+  auto ff_dir = testController.createTempDirectory();
+  auto content_dir = testController.createTempDirectory();
+
+  auto config = std::make_shared<minifi::Configure>();
+  config->set(minifi::Configure::nifi_flowfile_repository_directory_default, ff_dir.string());
+  config->set(minifi::Configure::nifi_dbcontent_repository_directory_default, content_dir.string());
+
+  {
+    auto content_repo = std::make_shared<core::repository::FileSystemRepository>();
+    REQUIRE(content_repo->initialize(config));
+    minifi::ResourceClaim claim(content_repo);
+    content_repo->write(claim)->write("hi");
+    // ensure that the content is not deleted during resource claim destruction
+    content_repo->incrementStreamCount(claim);
+  }
+
+  REQUIRE(utils::file::list_dir_all(content_dir, testController.getLogger()).size() == 1);
+
+  auto ff_repo = std::make_shared<core::repository::FlowFileRepository>();
+  REQUIRE(ff_repo->initialize(config));
+  auto content_repo = std::make_shared<core::repository::FileSystemRepository>();
+  REQUIRE(content_repo->initialize(config));
+
+  ff_repo->loadComponent(content_repo);
+
+  REQUIRE(utils::file::list_dir_all(content_dir, testController.getLogger()).empty());
+}
+
+TEST_CASE("FlowFileRepository synchronously pushes existing flow files") {
+  LogTestController::getInstance().setDebug<core::ContentRepository>();
+  LogTestController::getInstance().setDebug<core::repository::FileSystemRepository>();
+  LogTestController::getInstance().setDebug<core::repository::FlowFileRepository>();
+  TestController testController;
+  auto ff_dir = testController.createTempDirectory();
+  auto content_dir = testController.createTempDirectory();
+
+  auto config = std::make_shared<minifi::Configure>();
+  config->set(minifi::Configure::nifi_flowfile_repository_directory_default, ff_dir.string());
+  config->set(minifi::Configure::nifi_dbcontent_repository_directory_default, content_dir.string());
+
+
+  utils::Identifier ff_id;
+  auto connection_id = utils::IdGenerator::getIdGenerator()->generate();
+
+  {
+    auto ff_repo = std::make_shared<core::repository::FlowFileRepository>();
+    REQUIRE(ff_repo->initialize(config));
+    auto content_repo = std::make_shared<core::repository::FileSystemRepository>();
+    REQUIRE(content_repo->initialize(config));
+    auto conn = std::make_shared<minifi::Connection>(ff_repo, content_repo, "TestConnection", connection_id);
+
+    auto claim = std::make_shared<minifi::ResourceClaim>(content_repo);
+
+    std::vector<std::pair<std::string, std::unique_ptr<minifi::io::BufferStream>>> flow_data;
+    auto ff = std::make_shared<minifi::FlowFileRecord>();
+    ff_id = ff->getUUID();
+    ff->setConnection(conn.get());
+    content_repo->write(*claim)->write("hello");
+    ff->setResourceClaim(claim);
+    auto stream = std::make_unique<minifi::io::BufferStream>();
+    ff->Serialize(*stream);
+    flow_data.emplace_back(ff->getUUIDStr(), std::move(stream));
+
+    REQUIRE(ff_repo->MultiPut(flow_data));
+  }
+
+  {
+    auto ff_repo = std::make_shared<core::repository::FlowFileRepository>();
+    REQUIRE(ff_repo->initialize(config));
+    auto content_repo = std::make_shared<core::repository::FileSystemRepository>();
+    REQUIRE(content_repo->initialize(config));
+    auto conn = std::make_shared<minifi::Connection>(ff_repo, content_repo, "TestConnection", connection_id);
+
+    ff_repo->setConnectionMap({{connection_id.to_string(), conn.get()}});
+    ff_repo->loadComponent(content_repo);
+
+    std::set<std::shared_ptr<core::FlowFile>> expired;
+    std::shared_ptr<core::FlowFile> ff = conn->poll(expired);
+    REQUIRE(expired.empty());
+    REQUIRE(ff);
+    REQUIRE(ff->getUUID() == ff_id);
+  }
+}
+
 }  // namespace
diff --git a/libminifi/test/unit/FileSystemRepositoryTests.cpp b/libminifi/test/unit/FileSystemRepositoryTests.cpp
index b49fe60c5..c61579dd9 100644
--- a/libminifi/test/unit/FileSystemRepositoryTests.cpp
+++ b/libminifi/test/unit/FileSystemRepositoryTests.cpp
@@ -58,3 +58,27 @@ TEST_CASE("Test Physical memory usage", "[testphysicalmemoryusage]") {
       return end_memory < start_memory + int64_t{5_MB};
     }, 100ms));
 }
+
+TEST_CASE("FileSystemRepository can clear orphan entries") {
+  TestController testController;
+  auto dir = testController.createTempDirectory();
+  auto configuration = std::make_shared<org::apache::nifi::minifi::Configure>();
+  configuration->set(minifi::Configure::nifi_dbcontent_repository_directory_default, dir.string());
+  {
+    auto content_repo = std::make_shared<core::repository::FileSystemRepository>();
+    REQUIRE(content_repo->initialize(configuration));
+
+    minifi::ResourceClaim claim(content_repo);
+    content_repo->write(claim)->write("hi");
+    // ensure that the content is not deleted during resource claim destruction
+    content_repo->incrementStreamCount(claim);
+  }
+
+  REQUIRE(utils::file::list_dir_all(dir, testController.getLogger()).size() == 1);
+
+  auto content_repo = std::make_shared<core::repository::FileSystemRepository>();
+  REQUIRE(content_repo->initialize(configuration));
+  content_repo->clearOrphans();
+
+  REQUIRE(utils::file::list_dir_all(dir, testController.getLogger()).empty());
+}