You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by sz...@apache.org on 2023/02/17 00:21:49 UTC
[nifi-minifi-cpp] 03/04: MINIFICPP-2045 Synchronous flow file reloading
This is an automated email from the ASF dual-hosted git repository.
szaszm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
commit 7e8c5c49c0987bf3e2dcf1e0e16599c5cd7e69ec
Author: Adam Debreceni <ad...@apache.org>
AuthorDate: Fri Feb 17 01:14:07 2023 +0100
MINIFICPP-2045 Synchronous flow file reloading
... and orphan content cleanup at startup
Closes #1509
Signed-off-by: Marton Szasz <sz...@apache.org>
---
CONFIGURE.md | 1 -
conf/minifi.properties | 1 -
.../rocksdb-repos/DatabaseContentRepository.cpp | 33 ++++++
.../rocksdb-repos/DatabaseContentRepository.h | 2 +
extensions/rocksdb-repos/FlowFileRepository.cpp | 116 ++++----------------
extensions/rocksdb-repos/FlowFileRepository.h | 12 ---
libminifi/include/core/ContentRepository.h | 2 +
.../include/core/repository/FileSystemRepository.h | 2 +
.../core/repository/VolatileContentRepository.h | 4 +
libminifi/include/properties/Configuration.h | 1 -
libminifi/src/Configuration.cpp | 1 -
.../src/core/repository/FileSystemRepository.cpp | 13 +++
libminifi/test/flow-tests/SessionTests.cpp | 8 +-
.../test/persistence-tests/PersistenceTests.cpp | 10 +-
.../rocksdb-tests/DBContentRepositoryTests.cpp | 39 +++++++
libminifi/test/rocksdb-tests/EncryptionTests.cpp | 9 +-
libminifi/test/rocksdb-tests/RepoTests.cpp | 120 ++++++++++++++++-----
libminifi/test/unit/FileSystemRepositoryTests.cpp | 24 +++++
18 files changed, 242 insertions(+), 156 deletions(-)
diff --git a/CONFIGURE.md b/CONFIGURE.md
index a56504f96..304407ecb 100644
--- a/CONFIGURE.md
+++ b/CONFIGURE.md
@@ -160,7 +160,6 @@ folder. You may specify your own path in place of these defaults.
in minifi.properties
nifi.provenance.repository.directory.default=${MINIFI_HOME}/provenance_repository
nifi.flowfile.repository.directory.default=${MINIFI_HOME}/flowfile_repository
- nifi.flowfile.checkpoint.directory.default=${MINIFI_HOME}/flowfile_checkpoint
nifi.database.content.repository.directory.default=${MINIFI_HOME}/content_repository
#### Shared database
diff --git a/conf/minifi.properties b/conf/minifi.properties
index dca8e39fb..491a94016 100644
--- a/conf/minifi.properties
+++ b/conf/minifi.properties
@@ -28,7 +28,6 @@ nifi.provenance.repository.directory.default=${MINIFI_HOME}/provenance_repositor
nifi.provenance.repository.max.storage.time=1 MIN
nifi.provenance.repository.max.storage.size=1 MB
nifi.flowfile.repository.directory.default=${MINIFI_HOME}/flowfile_repository
-nifi.flowfile.checkpoint.directory.default=${MINIFI_HOME}/flowfile_checkpoint
nifi.database.content.repository.directory.default=${MINIFI_HOME}/content_repository
nifi.provenance.repository.class.name=NoOpRepository
nifi.content.repository.class.name=DatabaseContentRepository
diff --git a/extensions/rocksdb-repos/DatabaseContentRepository.cpp b/extensions/rocksdb-repos/DatabaseContentRepository.cpp
index 0924534f0..881de1085 100644
--- a/extensions/rocksdb-repos/DatabaseContentRepository.cpp
+++ b/extensions/rocksdb-repos/DatabaseContentRepository.cpp
@@ -20,6 +20,7 @@
#include <memory>
#include <string>
#include <utility>
+#include <vector>
#include "encryption/RocksDbEncryptionProvider.h"
#include "RocksDbStream.h"
@@ -179,6 +180,38 @@ std::shared_ptr<io::BaseStream> DatabaseContentRepository::write(const minifi::R
return std::make_shared<io::RocksDbStream>(claim.getContentFullPath(), gsl::make_not_null<minifi::internal::RocksDatabase*>(db_.get()), true, batch);
}
+void DatabaseContentRepository::clearOrphans() {
+ if (!is_valid_ || !db_) {
+ logger_->log_error("Cannot delete orphan content entries, repository is invalid");
+ return;
+ }
+ auto opendb = db_->open();
+ if (!opendb) {
+ logger_->log_error("Cannot delete orphan content entries, could not open repository");
+ return;
+ }
+ std::vector<std::string> keys_to_be_deleted;
+ auto it = opendb->NewIterator(rocksdb::ReadOptions());
+ for (it->SeekToFirst(); it->Valid(); it->Next()) {
+ auto key = it->key().ToString();
+ auto claim_it = count_map_.find(key);
+ if (claim_it == count_map_.end() || claim_it->second == 0) {
+ logger_->log_error("Deleting orphan resource %s", key);
+ keys_to_be_deleted.push_back(key);
+ }
+ }
+ auto batch = opendb->createWriteBatch();
+ for (auto& key : keys_to_be_deleted) {
+ batch.Delete(key);
+ }
+
+ rocksdb::Status status = opendb->Write(rocksdb::WriteOptions(), &batch);
+
+ if (!status.ok()) {
+ logger_->log_error("Could not delete orphan contents from rocksdb database: %s", status.ToString());
+ }
+}
+
REGISTER_RESOURCE_AS(DatabaseContentRepository, InternalResource, ("DatabaseContentRepository", "databasecontentrepository"));
} // namespace org::apache::nifi::minifi::core::repository
diff --git a/extensions/rocksdb-repos/DatabaseContentRepository.h b/extensions/rocksdb-repos/DatabaseContentRepository.h
index 9961d2607..98f3acb79 100644
--- a/extensions/rocksdb-repos/DatabaseContentRepository.h
+++ b/extensions/rocksdb-repos/DatabaseContentRepository.h
@@ -67,6 +67,8 @@ class DatabaseContentRepository : public core::ContentRepository {
bool remove(const minifi::ResourceClaim &claim) override;
bool exists(const minifi::ResourceClaim &streamId) override;
+ void clearOrphans() override;
+
private:
std::shared_ptr<io::BaseStream> write(const minifi::ResourceClaim &claim, bool append, minifi::internal::WriteBatch* batch);
diff --git a/extensions/rocksdb-repos/FlowFileRepository.cpp b/extensions/rocksdb-repos/FlowFileRepository.cpp
index 690a797a8..ecb9a1bcc 100644
--- a/extensions/rocksdb-repos/FlowFileRepository.cpp
+++ b/extensions/rocksdb-repos/FlowFileRepository.cpp
@@ -111,9 +111,6 @@ void FlowFileRepository::printStats() {
void FlowFileRepository::run() {
auto last = std::chrono::steady_clock::now();
- if (isRunning()) {
- prune_stored_flowfiles();
- }
while (isRunning()) {
std::this_thread::sleep_for(purge_period_);
flush();
@@ -126,38 +123,29 @@ void FlowFileRepository::run() {
flush();
}
-void FlowFileRepository::prune_stored_flowfiles() {
- const auto encrypted_env = createEncryptingEnv(utils::crypto::EncryptionManager{config_->getHome()}, DbEncryptionOptions{checkpoint_dir_.string(), ENCRYPTION_KEY_NAME});
- logger_->log_info("Using %s FlowFileRepository checkpoint", encrypted_env ? "encrypted" : "plaintext");
-
- auto set_db_opts = [encrypted_env] (minifi::internal::Writable<rocksdb::DBOptions>& db_opts) {
- db_opts.set(&rocksdb::DBOptions::create_if_missing, true);
- db_opts.set(&rocksdb::DBOptions::use_direct_io_for_flush_and_compaction, true);
- db_opts.set(&rocksdb::DBOptions::use_direct_reads, true);
- if (encrypted_env) {
- db_opts.set(&rocksdb::DBOptions::env, encrypted_env.get(), EncryptionEq{});
- } else {
- db_opts.set(&rocksdb::DBOptions::env, rocksdb::Env::Default());
- }
- };
- auto checkpointDB = minifi::internal::RocksDatabase::create(set_db_opts, {}, checkpoint_dir_.string(), minifi::internal::RocksDbMode::ReadOnly);
- std::optional<minifi::internal::OpenRocksDb> opendb;
- if (nullptr != checkpoint_) {
- opendb = checkpointDB->open();
- if (opendb) {
- logger_->log_trace("Successfully opened checkpoint database at '%s'", checkpoint_dir_.string());
- } else {
- logger_->log_error("Couldn't open checkpoint database at '%s' using live database", checkpoint_dir_.string());
- opendb = db_->open();
- }
- if (!opendb) {
- logger_->log_trace("Could not open neither the checkpoint nor the live database.");
- return;
+bool FlowFileRepository::ExecuteWithRetry(const std::function<rocksdb::Status()>& operation) {
+ constexpr int RETRY_COUNT = 3;
+ std::chrono::milliseconds wait_time = 0ms;
+ for (int i=0; i < RETRY_COUNT; ++i) {
+ auto status = operation();
+ if (status.ok()) {
+ logger_->log_trace("Rocksdb operation executed successfully");
+ return true;
}
- } else {
- logger_->log_trace("Could not open checkpoint as object doesn't exist. Likely not needed or file system error.");
+ logger_->log_error("Rocksdb operation failed: %s", status.ToString());
+ wait_time += FLOWFILE_REPOSITORY_RETRY_INTERVAL_INCREMENTS;
+ std::this_thread::sleep_for(wait_time);
+ }
+ return false;
+}
+
+void FlowFileRepository::initialize_repository() {
+ auto opendb = db_->open();
+ if (!opendb) {
+ logger_->log_trace("Couldn't open database to load existing flow files");
return;
}
+ logger_->log_info("Reading existing flow files from database");
auto it = opendb->NewIterator(rocksdb::ReadOptions());
for (it->SeekToFirst(); it->Valid(); it->Next()) {
@@ -191,62 +179,8 @@ void FlowFileRepository::prune_stored_flowfiles() {
keys_to_delete.enqueue(key);
}
}
-}
-
-bool FlowFileRepository::ExecuteWithRetry(const std::function<rocksdb::Status()>& operation) {
- std::chrono::milliseconds waitTime = 0ms;
- for (int i=0; i < 3; ++i) {
- auto status = operation();
- if (status.ok()) {
- logger_->log_trace("Rocksdb operation executed successfully");
- return true;
- }
- logger_->log_error("Rocksdb operation failed: %s", status.ToString());
- waitTime += FLOWFILE_REPOSITORY_RETRY_INTERVAL_INCREMENTS;
- std::this_thread::sleep_for(waitTime);
- }
- return false;
-}
-
-/**
- * Returns True if there is data to interrogate.
- * @return true if our db has data stored.
- */
-bool FlowFileRepository::need_checkpoint(minifi::internal::OpenRocksDb& opendb) {
- auto it = opendb.NewIterator(rocksdb::ReadOptions());
- it->SeekToFirst();
- return it->Valid();
-}
-void FlowFileRepository::initialize_repository() {
- checkpoint_.reset();
- auto opendb = db_->open();
- if (!opendb) {
- logger_->log_trace("Couldn't open database, no way to checkpoint");
- return;
- }
- // first we need to establish a checkpoint iff it is needed.
- if (!need_checkpoint(*opendb)) {
- logger_->log_trace("Do not need checkpoint");
- return;
- }
- // delete any previous copy
- if (utils::file::delete_dir(checkpoint_dir_) < 0) {
- logger_->log_error("Could not delete existing checkpoint directory '%s'", checkpoint_dir_.string());
- return;
- }
- std::unique_ptr<rocksdb::Checkpoint> checkpoint;
- rocksdb::Status checkpoint_status = opendb->NewCheckpoint(checkpoint);
- if (!checkpoint_status.ok()) {
- logger_->log_error("Could not create checkpoint object: %s", checkpoint_status.ToString());
- return;
- }
- checkpoint_status = checkpoint->CreateCheckpoint(checkpoint_dir_.string());
- if (!checkpoint_status.ok()) {
- logger_->log_error("Could not initialize checkpoint: %s", checkpoint_status.ToString());
- return;
- }
- checkpoint_ = std::move(checkpoint);
- logger_->log_trace("Created checkpoint in directory '%s'", checkpoint_dir_.string());
+ flush();
+ content_repo_->clearOrphans();
}
void FlowFileRepository::loadComponent(const std::shared_ptr<core::ContentRepository> &content_repo) {
@@ -266,12 +200,6 @@ bool FlowFileRepository::initialize(const std::shared_ptr<Configure> &configure)
}
logger_->log_debug("NiFi FlowFile Repository Directory %s", directory_);
- value.clear();
- if (configure->get(Configure::nifi_flowfile_checkpoint_directory_default, value) && !value.empty()) {
- checkpoint_dir_ = value;
- }
- logger_->log_debug("NiFi FlowFile Checkpoint Directory %s", checkpoint_dir_.string());
-
const auto encrypted_env = createEncryptingEnv(utils::crypto::EncryptionManager{configure->getHome()}, DbEncryptionOptions{directory_, ENCRYPTION_KEY_NAME});
logger_->log_info("Using %s FlowFileRepository", encrypted_env ? "encrypted" : "plaintext");
diff --git a/extensions/rocksdb-repos/FlowFileRepository.h b/extensions/rocksdb-repos/FlowFileRepository.h
index aeda5af9c..832fde642 100644
--- a/extensions/rocksdb-repos/FlowFileRepository.h
+++ b/extensions/rocksdb-repos/FlowFileRepository.h
@@ -66,13 +66,11 @@ class FlowFileRepository : public ThreadedRepository, public SwapManager {
}
explicit FlowFileRepository(const std::string& repo_name = "",
- std::filesystem::path checkpoint_dir = FLOWFILE_CHECKPOINT_DIRECTORY,
std::string directory = FLOWFILE_REPOSITORY_DIRECTORY,
std::chrono::milliseconds maxPartitionMillis = MAX_FLOWFILE_REPOSITORY_ENTRY_LIFE_TIME,
int64_t maxPartitionBytes = MAX_FLOWFILE_REPOSITORY_STORAGE_SIZE,
std::chrono::milliseconds purgePeriod = FLOWFILE_REPOSITORY_PURGE_PERIOD)
: ThreadedRepository(repo_name.length() > 0 ? std::move(repo_name) : core::getClassName<FlowFileRepository>(), std::move(directory), maxPartitionMillis, maxPartitionBytes, purgePeriod),
- checkpoint_dir_(std::move(checkpoint_dir)),
logger_(logging::LoggerFactory<FlowFileRepository>::getLogger()) {
}
@@ -112,23 +110,13 @@ class FlowFileRepository : public ThreadedRepository, public SwapManager {
void initialize_repository();
- /**
- * Returns true if a checkpoint is needed at startup
- * @return true if a checkpoint is needed.
- */
- static bool need_checkpoint(minifi::internal::OpenRocksDb& opendb);
-
- void prune_stored_flowfiles();
-
std::thread& getThread() override {
return thread_;
}
- std::filesystem::path checkpoint_dir_;
moodycamel::ConcurrentQueue<std::string> keys_to_delete;
std::shared_ptr<core::ContentRepository> content_repo_;
std::unique_ptr<minifi::internal::RocksDatabase> db_;
- std::unique_ptr<rocksdb::Checkpoint> checkpoint_;
std::unique_ptr<FlowFileLoader> swap_loader_;
std::shared_ptr<logging::Logger> logger_;
std::shared_ptr<minifi::Configure> config_;
diff --git a/libminifi/include/core/ContentRepository.h b/libminifi/include/core/ContentRepository.h
index 84ac4eded..7d92634fb 100644
--- a/libminifi/include/core/ContentRepository.h
+++ b/libminifi/include/core/ContentRepository.h
@@ -51,6 +51,8 @@ class ContentRepository : public StreamManager<minifi::ResourceClaim>, public ut
void incrementStreamCount(const minifi::ResourceClaim &streamId) override;
StreamState decrementStreamCount(const minifi::ResourceClaim &streamId) override;
+ virtual void clearOrphans() = 0;
+
protected:
std::string directory_;
std::mutex count_map_mutex_;
diff --git a/libminifi/include/core/repository/FileSystemRepository.h b/libminifi/include/core/repository/FileSystemRepository.h
index 66cec39ef..01926f5bd 100644
--- a/libminifi/include/core/repository/FileSystemRepository.h
+++ b/libminifi/include/core/repository/FileSystemRepository.h
@@ -49,6 +49,8 @@ class FileSystemRepository : public core::ContentRepository {
bool remove(const minifi::ResourceClaim& claim) override;
std::shared_ptr<ContentSession> createSession() override;
+ void clearOrphans() override;
+
private:
std::shared_ptr<logging::Logger> logger_;
};
diff --git a/libminifi/include/core/repository/VolatileContentRepository.h b/libminifi/include/core/repository/VolatileContentRepository.h
index 54c1dcc6f..62e9fd80b 100644
--- a/libminifi/include/core/repository/VolatileContentRepository.h
+++ b/libminifi/include/core/repository/VolatileContentRepository.h
@@ -94,6 +94,10 @@ class VolatileContentRepository : public core::ContentRepository {
*/
bool remove(const minifi::ResourceClaim &claim) override;
+ void clearOrphans() override {
+ // there are no persisted orphans to delete
+ }
+
private:
VolatileRepositoryData repo_data_;
bool minimize_locking_;
diff --git a/libminifi/include/properties/Configuration.h b/libminifi/include/properties/Configuration.h
index eb2f598dc..25e6b5ebc 100644
--- a/libminifi/include/properties/Configuration.h
+++ b/libminifi/include/properties/Configuration.h
@@ -66,7 +66,6 @@ class Configuration : public Properties {
static constexpr const char *nifi_provenance_repository_max_storage_time = "nifi.provenance.repository.max.storage.time";
static constexpr const char *nifi_provenance_repository_directory_default = "nifi.provenance.repository.directory.default";
static constexpr const char *nifi_flowfile_repository_directory_default = "nifi.flowfile.repository.directory.default";
- static constexpr const char *nifi_flowfile_checkpoint_directory_default = "nifi.flowfile.checkpoint.directory.default";
static constexpr const char *nifi_dbcontent_repository_directory_default = "nifi.database.content.repository.directory.default";
static constexpr const char *nifi_remote_input_secure = "nifi.remote.input.secure";
static constexpr const char *nifi_security_need_ClientAuth = "nifi.security.need.ClientAuth";
diff --git a/libminifi/src/Configuration.cpp b/libminifi/src/Configuration.cpp
index 69b24e9e8..78ae790f7 100644
--- a/libminifi/src/Configuration.cpp
+++ b/libminifi/src/Configuration.cpp
@@ -51,7 +51,6 @@ const std::vector<core::ConfigurationProperty> Configuration::CONFIGURATION_PROP
core::ConfigurationProperty{Configuration::nifi_provenance_repository_max_storage_time, gsl::make_not_null(core::StandardValidators::get().TIME_PERIOD_VALIDATOR.get())},
core::ConfigurationProperty{Configuration::nifi_provenance_repository_directory_default},
core::ConfigurationProperty{Configuration::nifi_flowfile_repository_directory_default},
- core::ConfigurationProperty{Configuration::nifi_flowfile_checkpoint_directory_default},
core::ConfigurationProperty{Configuration::nifi_dbcontent_repository_directory_default},
core::ConfigurationProperty{Configuration::nifi_remote_input_secure, gsl::make_not_null(core::StandardValidators::get().BOOLEAN_VALIDATOR.get())},
core::ConfigurationProperty{Configuration::nifi_security_need_ClientAuth, gsl::make_not_null(core::StandardValidators::get().BOOLEAN_VALIDATOR.get())},
diff --git a/libminifi/src/core/repository/FileSystemRepository.cpp b/libminifi/src/core/repository/FileSystemRepository.cpp
index b41cbcfaf..034e9c36a 100644
--- a/libminifi/src/core/repository/FileSystemRepository.cpp
+++ b/libminifi/src/core/repository/FileSystemRepository.cpp
@@ -59,4 +59,17 @@ std::shared_ptr<ContentSession> FileSystemRepository::createSession() {
return std::make_shared<ForwardingContentSession>(sharedFromThis());
}
+void FileSystemRepository::clearOrphans() {
+ std::lock_guard<std::mutex> lock(count_map_mutex_);
+ utils::file::list_dir(directory_, [&] (auto& /*dir*/, auto& filename) {
+ auto path = directory_ + "/" + filename.string();
+ auto it = count_map_.find(path);
+ if (it == count_map_.end() || it->second == 0) {
+ logger_->log_debug("Deleting orphan resource %s", path);
+ std::remove(path.c_str());
+ }
+ return true;
+ }, logger_, false);
+}
+
} // namespace org::apache::nifi::minifi::core::repository
diff --git a/libminifi/test/flow-tests/SessionTests.cpp b/libminifi/test/flow-tests/SessionTests.cpp
index 59cd8e26b..b722e93b1 100644
--- a/libminifi/test/flow-tests/SessionTests.cpp
+++ b/libminifi/test/flow-tests/SessionTests.cpp
@@ -46,12 +46,6 @@ class TestProcessor : public minifi::core::Processor {
ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_PROCESSORS
};
-#ifdef WIN32
-const std::string SESSIONTEST_FLOWFILE_CHECKPOINT_DIR = ".\\sessiontest_flowfile_checkpoint";
-#else
-const std::string SESSIONTEST_FLOWFILE_CHECKPOINT_DIR = "./sessiontest_flowfile_checkpoint";
-#endif
-
TEST_CASE("Import null data") {
TestController testController;
LogTestController::getInstance().setDebug<core::ContentRepository>();
@@ -69,7 +63,7 @@ TEST_CASE("Import null data") {
config->set(minifi::Configure::nifi_flowfile_repository_directory_default, (dir / "flowfile_repository").string());
std::shared_ptr<core::Repository> prov_repo = core::createRepository("nooprepository");
- std::shared_ptr<core::Repository> ff_repository = std::make_shared<core::repository::FlowFileRepository>("flowFileRepository", SESSIONTEST_FLOWFILE_CHECKPOINT_DIR);
+ std::shared_ptr<core::Repository> ff_repository = std::make_shared<core::repository::FlowFileRepository>("flowFileRepository");
std::shared_ptr<core::ContentRepository> content_repo;
SECTION("VolatileContentRepository") {
testController.getLogger()->log_info("Using VolatileContentRepository");
diff --git a/libminifi/test/persistence-tests/PersistenceTests.cpp b/libminifi/test/persistence-tests/PersistenceTests.cpp
index 530aa06c6..e417ae417 100644
--- a/libminifi/test/persistence-tests/PersistenceTests.cpp
+++ b/libminifi/test/persistence-tests/PersistenceTests.cpp
@@ -53,12 +53,6 @@ class TestProcessor : public minifi::core::Processor {
ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_PROCESSORS
};
-#ifdef WIN32
-const std::string PERSISTENCETEST_FLOWFILE_CHECKPOINT_DIR = ".\\persistencetest_flowfile_checkpoint";
-#else
-const std::string PERSISTENCETEST_FLOWFILE_CHECKPOINT_DIR = "./persistencetest_flowfile_checkpoint";
-#endif
-
struct TestFlow{
TestFlow(const std::shared_ptr<core::Repository>& ff_repository, const std::shared_ptr<core::ContentRepository>& content_repo, const std::shared_ptr<core::Repository>& prov_repo,
const std::function<std::unique_ptr<core::Processor>(utils::Identifier&)>& processorGenerator, const core::Relationship& relationshipToOutput)
@@ -179,7 +173,7 @@ TEST_CASE("Processors Can Store FlowFiles", "[TestP1]") {
config->set(minifi::Configure::nifi_flowfile_repository_directory_default, (dir / "flowfile_repository").string());
std::shared_ptr<core::Repository> prov_repo = std::make_shared<TestThreadedRepository>();
- auto ff_repository = std::make_shared<core::repository::FlowFileRepository>("flowFileRepository", PERSISTENCETEST_FLOWFILE_CHECKPOINT_DIR);
+ auto ff_repository = std::make_shared<core::repository::FlowFileRepository>("flowFileRepository");
std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::FileSystemRepository>();
ff_repository->initialize(config);
content_repo->initialize(config);
@@ -286,7 +280,7 @@ TEST_CASE("Persisted flowFiles are updated on modification", "[TestP1]") {
config->set(minifi::Configure::nifi_flowfile_repository_directory_default, (dir / "flowfile_repository").string());
std::shared_ptr<core::Repository> prov_repo = std::make_shared<TestThreadedRepository>();
- std::shared_ptr<core::Repository> ff_repository = std::make_shared<core::repository::FlowFileRepository>("flowFileRepository", PERSISTENCETEST_FLOWFILE_CHECKPOINT_DIR);
+ std::shared_ptr<core::Repository> ff_repository = std::make_shared<core::repository::FlowFileRepository>("flowFileRepository");
std::shared_ptr<core::ContentRepository> content_repo;
SECTION("VolatileContentRepository") {
testController.getLogger()->log_info("Using VolatileContentRepository");
diff --git a/libminifi/test/rocksdb-tests/DBContentRepositoryTests.cpp b/libminifi/test/rocksdb-tests/DBContentRepositoryTests.cpp
index 0a50cd129..942f1544e 100644
--- a/libminifi/test/rocksdb-tests/DBContentRepositoryTests.cpp
+++ b/libminifi/test/rocksdb-tests/DBContentRepositoryTests.cpp
@@ -242,3 +242,42 @@ TEST_CASE("ProcessSession::append should append to the flowfile and set its size
TEST_CASE("ProcessSession::read can read zero length flowfiles without crash (RocksDB)", "[zerolengthread]") {
ContentRepositoryDependentTests::testReadFromZeroLengthFlowFile(std::make_shared<core::repository::DatabaseContentRepository>());
}
+
+size_t getDbSize(const std::filesystem::path& dir) {
+ auto db = minifi::internal::RocksDatabase::create({}, {}, dir.string());
+ auto opendb = db->open();
+ REQUIRE(opendb);
+
+ size_t count = 0;
+ auto it = opendb->NewIterator({});
+ for (it->SeekToFirst(); it->Valid(); it->Next()) {
+ ++count;
+ }
+ return count;
+}
+
+TEST_CASE("DBContentRepository can clear orphan entries") {
+ TestController testController;
+ auto dir = testController.createTempDirectory();
+ auto configuration = std::make_shared<org::apache::nifi::minifi::Configure>();
+ configuration->set(minifi::Configure::nifi_dbcontent_repository_directory_default, dir.string());
+ {
+ auto content_repo = std::make_shared<core::repository::DatabaseContentRepository>();
+ REQUIRE(content_repo->initialize(configuration));
+
+ minifi::ResourceClaim claim(content_repo);
+ content_repo->write(claim)->write("hi");
+ // ensure that the content is not deleted during resource claim destruction
+ content_repo->incrementStreamCount(claim);
+ }
+
+ REQUIRE(getDbSize(dir) == 1);
+
+ {
+ auto content_repo = std::make_shared<core::repository::DatabaseContentRepository>();
+ REQUIRE(content_repo->initialize(configuration));
+ content_repo->clearOrphans();
+ }
+
+ REQUIRE(getDbSize(dir) == 0);
+}
diff --git a/libminifi/test/rocksdb-tests/EncryptionTests.cpp b/libminifi/test/rocksdb-tests/EncryptionTests.cpp
index 2fff68980..b544d7016 100644
--- a/libminifi/test/rocksdb-tests/EncryptionTests.cpp
+++ b/libminifi/test/rocksdb-tests/EncryptionTests.cpp
@@ -33,7 +33,6 @@ class FFRepoFixture : public TestController {
LogTestController::getInstance().setTrace<FlowFileRepository>();
home_ = createTempDirectory();
repo_dir_ = home_ / "flowfile_repo";
- checkpoint_dir_ = home_ / "checkpoint_dir";
config_ = std::make_shared<minifi::Configure>();
config_->setHome(home_);
container_ = std::make_unique<minifi::Connection>(nullptr, nullptr, "container");
@@ -50,7 +49,7 @@ class FFRepoFixture : public TestController {
template<typename Fn>
void runWithNewRepository(Fn&& fn) {
- auto repository = std::make_shared<FlowFileRepository>("ff", checkpoint_dir_, repo_dir_.string());
+ auto repository = std::make_shared<FlowFileRepository>("ff", repo_dir_.string());
repository->initialize(config_);
std::map<std::string, core::Connectable*> container_map;
container_map[container_->getUUIDStr()] = container_.get();
@@ -65,7 +64,6 @@ class FFRepoFixture : public TestController {
std::unique_ptr<minifi::Connection> container_;
std::filesystem::path home_;
std::filesystem::path repo_dir_;
- std::filesystem::path checkpoint_dir_;
std::shared_ptr<minifi::Configure> config_;
std::shared_ptr<core::repository::VolatileContentRepository> content_repo_;
};
@@ -93,14 +91,11 @@ TEST_CASE_METHOD(FFRepoFixture, "FlowFileRepository creates checkpoint and loads
REQUIRE(container_->isEmpty());
runWithNewRepository([&] (const std::shared_ptr<core::repository::FlowFileRepository>& /*repo*/) {
- // wait for the flowfiles to be loaded from the checkpoint
+ // wait for the flowfiles to be loaded
bool success = utils::verifyEventHappenedInPollTime(std::chrono::seconds{5}, [&] {
return !container_->isEmpty();
});
REQUIRE(success);
- REQUIRE(utils::verifyLogLinePresenceInPollTime(
- std::chrono::seconds{5},
- "Successfully opened checkpoint database at '" + checkpoint_dir_.string() + "'"));
std::set<std::shared_ptr<core::FlowFile>> expired;
auto flowfile = container_->poll(expired);
REQUIRE(expired.empty());
diff --git a/libminifi/test/rocksdb-tests/RepoTests.cpp b/libminifi/test/rocksdb-tests/RepoTests.cpp
index e280c0432..ef4d073bf 100644
--- a/libminifi/test/rocksdb-tests/RepoTests.cpp
+++ b/libminifi/test/rocksdb-tests/RepoTests.cpp
@@ -39,12 +39,6 @@ using namespace std::literals::chrono_literals;
namespace {
-#ifdef WIN32
-const std::string REPOTEST_FLOWFILE_CHECKPOINT_DIR = ".\\repotest_flowfile_checkpoint";
-#else
-const std::string REPOTEST_FLOWFILE_CHECKPOINT_DIR = "./repotest_flowfile_checkpoint";
-#endif
-
namespace {
class TestProcessor : public minifi::core::Processor {
public:
@@ -72,7 +66,7 @@ TEST_CASE("Test Repo Empty Value Attribute", "[TestFFR1]") {
LogTestController::getInstance().setDebug<core::repository::FlowFileRepository>();
TestController testController;
auto dir = testController.createTempDirectory();
- std::shared_ptr<core::repository::FlowFileRepository> repository = std::make_shared<core::repository::FlowFileRepository>("ff", REPOTEST_FLOWFILE_CHECKPOINT_DIR, dir.string(), 0ms, 0, 1ms);
+ std::shared_ptr<core::repository::FlowFileRepository> repository = std::make_shared<core::repository::FlowFileRepository>("ff", dir.string(), 0ms, 0, 1ms);
repository->initialize(std::make_shared<minifi::Configure>());
@@ -83,8 +77,6 @@ TEST_CASE("Test Repo Empty Value Attribute", "[TestFFR1]") {
REQUIRE(true == file->Persist(repository));
- utils::file::FileUtils::delete_dir(REPOTEST_FLOWFILE_CHECKPOINT_DIR, true);
-
repository->stop();
}
@@ -94,7 +86,7 @@ TEST_CASE("Test Repo Empty Key Attribute ", "[TestFFR2]") {
LogTestController::getInstance().setDebug<core::repository::FlowFileRepository>();
TestController testController;
auto dir = testController.createTempDirectory();
- std::shared_ptr<core::repository::FlowFileRepository> repository = std::make_shared<core::repository::FlowFileRepository>("ff", REPOTEST_FLOWFILE_CHECKPOINT_DIR, dir.string(), 0ms, 0, 1ms);
+ std::shared_ptr<core::repository::FlowFileRepository> repository = std::make_shared<core::repository::FlowFileRepository>("ff", dir.string(), 0ms, 0, 1ms);
repository->initialize(std::make_shared<minifi::Configure>());
std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
@@ -106,8 +98,6 @@ TEST_CASE("Test Repo Empty Key Attribute ", "[TestFFR2]") {
REQUIRE(true == file->Persist(repository));
- utils::file::FileUtils::delete_dir(REPOTEST_FLOWFILE_CHECKPOINT_DIR, true);
-
repository->stop();
}
@@ -117,7 +107,7 @@ TEST_CASE("Test Repo Key Attribute Verify ", "[TestFFR3]") {
LogTestController::getInstance().setDebug<core::repository::FlowFileRepository>();
TestController testController;
auto dir = testController.createTempDirectory();
- std::shared_ptr<core::repository::FlowFileRepository> repository = std::make_shared<core::repository::FlowFileRepository>("ff", REPOTEST_FLOWFILE_CHECKPOINT_DIR, dir.string(), 0ms, 0, 1ms);
+ std::shared_ptr<core::repository::FlowFileRepository> repository = std::make_shared<core::repository::FlowFileRepository>("ff", dir.string(), 0ms, 0, 1ms);
repository->initialize(std::make_shared<org::apache::nifi::minifi::Configure>());
@@ -154,8 +144,6 @@ TEST_CASE("Test Repo Key Attribute Verify ", "[TestFFR3]") {
REQUIRE(record2->getAttribute("keyB", value));
REQUIRE(value.empty());
-
- utils::file::FileUtils::delete_dir(REPOTEST_FLOWFILE_CHECKPOINT_DIR, true);
}
TEST_CASE("Test Delete Content ", "[TestFFR4]") {
@@ -167,7 +155,7 @@ TEST_CASE("Test Delete Content ", "[TestFFR4]") {
auto dir = testController.createTempDirectory();
- std::shared_ptr<core::repository::FlowFileRepository> repository = std::make_shared<core::repository::FlowFileRepository>("ff", REPOTEST_FLOWFILE_CHECKPOINT_DIR, dir.string(), 0ms, 0, 1ms);
+ std::shared_ptr<core::repository::FlowFileRepository> repository = std::make_shared<core::repository::FlowFileRepository>("ff", dir.string(), 0ms, 0, 1ms);
std::fstream file;
file.open(dir / "tstFile.ext", std::ios::out);
@@ -203,14 +191,11 @@ TEST_CASE("Test Delete Content ", "[TestFFR4]") {
std::ifstream fileopen(dir / "tstFile.ext", std::ios::in);
REQUIRE(!fileopen.good());
- utils::file::FileUtils::delete_dir(REPOTEST_FLOWFILE_CHECKPOINT_DIR, true);
-
LogTestController::getInstance().reset();
}
TEST_CASE("Test Validate Checkpoint ", "[TestFFR5]") {
TestController testController;
- utils::file::FileUtils::delete_dir(REPOTEST_FLOWFILE_CHECKPOINT_DIR, true);
LogTestController::getInstance().setDebug<core::ContentRepository>();
LogTestController::getInstance().setTrace<core::repository::FileSystemRepository>();
@@ -219,7 +204,7 @@ TEST_CASE("Test Validate Checkpoint ", "[TestFFR5]") {
auto dir = testController.createTempDirectory();
- std::shared_ptr<core::repository::FlowFileRepository> repository = std::make_shared<core::repository::FlowFileRepository>("ff", REPOTEST_FLOWFILE_CHECKPOINT_DIR, dir.string(), 0ms, 0, 1ms);
+ std::shared_ptr<core::repository::FlowFileRepository> repository = std::make_shared<core::repository::FlowFileRepository>("ff", dir.string(), 0ms, 0, 1ms);
std::fstream file;
file.open(dir / "tstFile.ext", std::ios::out);
@@ -264,8 +249,6 @@ TEST_CASE("Test Validate Checkpoint ", "[TestFFR5]") {
std::ifstream fileopen(dir / "tstFile.ext", std::ios::in);
REQUIRE(fileopen.fail());
- utils::file::FileUtils::delete_dir(REPOTEST_FLOWFILE_CHECKPOINT_DIR, true);
-
LogTestController::getInstance().reset();
}
@@ -284,7 +267,7 @@ TEST_CASE("Test FlowFile Restore", "[TestFFR6]") {
config->set(minifi::Configure::nifi_flowfile_repository_directory_default, (dir / "flowfile_repository").string());
std::shared_ptr<core::Repository> prov_repo = std::make_shared<TestThreadedRepository>();
- auto ff_repository = std::make_shared<core::repository::FlowFileRepository>("flowFileRepository", REPOTEST_FLOWFILE_CHECKPOINT_DIR);
+ auto ff_repository = std::make_shared<core::repository::FlowFileRepository>("flowFileRepository");
std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::FileSystemRepository>();
ff_repository->initialize(config);
content_repo->initialize(config);
@@ -361,7 +344,7 @@ TEST_CASE("Flush deleted flowfiles before shutdown", "[TestFFR7]") {
class TestFlowFileRepository: public core::repository::FlowFileRepository{
public:
explicit TestFlowFileRepository(const std::string& name)
- : FlowFileRepository(name, REPOTEST_FLOWFILE_CHECKPOINT_DIR, core::repository::FLOWFILE_REPOSITORY_DIRECTORY,
+ : FlowFileRepository(name, core::repository::FLOWFILE_REPOSITORY_DIRECTORY,
10min, core::repository::MAX_FLOWFILE_REPOSITORY_STORAGE_SIZE, 1ms) {}
void flush() override {
@@ -438,4 +421,93 @@ TEST_CASE("Flush deleted flowfiles before shutdown", "[TestFFR7]") {
}
}
+TEST_CASE("FlowFileRepository triggers content repo orphan clear") {
+ LogTestController::getInstance().setDebug<core::ContentRepository>();
+ LogTestController::getInstance().setDebug<core::repository::FileSystemRepository>();
+ LogTestController::getInstance().setDebug<core::repository::FlowFileRepository>();
+ TestController testController;
+ auto ff_dir = testController.createTempDirectory();
+ auto content_dir = testController.createTempDirectory();
+
+ auto config = std::make_shared<minifi::Configure>();
+ config->set(minifi::Configure::nifi_flowfile_repository_directory_default, ff_dir.string());
+ config->set(minifi::Configure::nifi_dbcontent_repository_directory_default, content_dir.string());
+
+ {
+ auto content_repo = std::make_shared<core::repository::FileSystemRepository>();
+ REQUIRE(content_repo->initialize(config));
+ minifi::ResourceClaim claim(content_repo);
+ content_repo->write(claim)->write("hi");
+ // ensure that the content is not deleted during resource claim destruction
+ content_repo->incrementStreamCount(claim);
+ }
+
+ REQUIRE(utils::file::list_dir_all(content_dir, testController.getLogger()).size() == 1);
+
+ auto ff_repo = std::make_shared<core::repository::FlowFileRepository>();
+ REQUIRE(ff_repo->initialize(config));
+ auto content_repo = std::make_shared<core::repository::FileSystemRepository>();
+ REQUIRE(content_repo->initialize(config));
+
+ ff_repo->loadComponent(content_repo);
+
+ REQUIRE(utils::file::list_dir_all(content_dir, testController.getLogger()).empty());
+}
+
+TEST_CASE("FlowFileRepository synchronously pushes existing flow files") {
+ LogTestController::getInstance().setDebug<core::ContentRepository>();
+ LogTestController::getInstance().setDebug<core::repository::FileSystemRepository>();
+ LogTestController::getInstance().setDebug<core::repository::FlowFileRepository>();
+ TestController testController;
+ auto ff_dir = testController.createTempDirectory();
+ auto content_dir = testController.createTempDirectory();
+
+ auto config = std::make_shared<minifi::Configure>();
+ config->set(minifi::Configure::nifi_flowfile_repository_directory_default, ff_dir.string());
+ config->set(minifi::Configure::nifi_dbcontent_repository_directory_default, content_dir.string());
+
+
+ utils::Identifier ff_id;
+ auto connection_id = utils::IdGenerator::getIdGenerator()->generate();
+
+ {
+ auto ff_repo = std::make_shared<core::repository::FlowFileRepository>();
+ REQUIRE(ff_repo->initialize(config));
+ auto content_repo = std::make_shared<core::repository::FileSystemRepository>();
+ REQUIRE(content_repo->initialize(config));
+ auto conn = std::make_shared<minifi::Connection>(ff_repo, content_repo, "TestConnection", connection_id);
+
+ auto claim = std::make_shared<minifi::ResourceClaim>(content_repo);
+
+ std::vector<std::pair<std::string, std::unique_ptr<minifi::io::BufferStream>>> flow_data;
+ auto ff = std::make_shared<minifi::FlowFileRecord>();
+ ff_id = ff->getUUID();
+ ff->setConnection(conn.get());
+ content_repo->write(*claim)->write("hello");
+ ff->setResourceClaim(claim);
+ auto stream = std::make_unique<minifi::io::BufferStream>();
+ ff->Serialize(*stream);
+ flow_data.emplace_back(ff->getUUIDStr(), std::move(stream));
+
+ REQUIRE(ff_repo->MultiPut(flow_data));
+ }
+
+ {
+ auto ff_repo = std::make_shared<core::repository::FlowFileRepository>();
+ REQUIRE(ff_repo->initialize(config));
+ auto content_repo = std::make_shared<core::repository::FileSystemRepository>();
+ REQUIRE(content_repo->initialize(config));
+ auto conn = std::make_shared<minifi::Connection>(ff_repo, content_repo, "TestConnection", connection_id);
+
+ ff_repo->setConnectionMap({{connection_id.to_string(), conn.get()}});
+ ff_repo->loadComponent(content_repo);
+
+ std::set<std::shared_ptr<core::FlowFile>> expired;
+ std::shared_ptr<core::FlowFile> ff = conn->poll(expired);
+ REQUIRE(expired.empty());
+ REQUIRE(ff);
+ REQUIRE(ff->getUUID() == ff_id);
+ }
+}
+
} // namespace
diff --git a/libminifi/test/unit/FileSystemRepositoryTests.cpp b/libminifi/test/unit/FileSystemRepositoryTests.cpp
index b49fe60c5..c61579dd9 100644
--- a/libminifi/test/unit/FileSystemRepositoryTests.cpp
+++ b/libminifi/test/unit/FileSystemRepositoryTests.cpp
@@ -58,3 +58,27 @@ TEST_CASE("Test Physical memory usage", "[testphysicalmemoryusage]") {
return end_memory < start_memory + int64_t{5_MB};
}, 100ms));
}
+
+TEST_CASE("FileSystemRepository can clear orphan entries") {
+ TestController testController;
+ auto dir = testController.createTempDirectory();
+ auto configuration = std::make_shared<org::apache::nifi::minifi::Configure>();
+ configuration->set(minifi::Configure::nifi_dbcontent_repository_directory_default, dir.string());
+ {
+ auto content_repo = std::make_shared<core::repository::FileSystemRepository>();
+ REQUIRE(content_repo->initialize(configuration));
+
+ minifi::ResourceClaim claim(content_repo);
+ content_repo->write(claim)->write("hi");
+ // ensure that the content is not deleted during resource claim destruction
+ content_repo->incrementStreamCount(claim);
+ }
+
+ REQUIRE(utils::file::list_dir_all(dir, testController.getLogger()).size() == 1);
+
+ auto content_repo = std::make_shared<core::repository::FileSystemRepository>();
+ REQUIRE(content_repo->initialize(configuration));
+ content_repo->clearOrphans();
+
+ REQUIRE(utils::file::list_dir_all(dir, testController.getLogger()).empty());
+}