You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by fg...@apache.org on 2022/11/15 18:38:20 UTC
[nifi-minifi-cpp] 02/04: MINIFICPP-1959 - Ensure that VolatileFlowFileRepository does not delete referenced resource
This is an automated email from the ASF dual-hosted git repository.
fgerlits pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
commit 8bda98d4a959cdf18459cc57f156e2d6dc557e75
Author: Adam Debreceni <ad...@apache.org>
AuthorDate: Fri Oct 14 14:15:40 2022 +0200
MINIFICPP-1959 - Ensure that VolatileFlowFileRepository does not delete referenced resource
Signed-off-by: Ferenc Gerlits <fg...@gmail.com>
This closes #1435
---
extensions/sftp/tests/ListSFTPTests.cpp | 2 +-
.../tests/unit/TailFileTests.cpp | 4 +-
.../core/repository/VolatileFlowFileRepository.h | 4 +
.../include/core/repository/VolatileRepository.h | 11 +--
libminifi/src/core/ProcessSession.cpp | 94 +++++++++++++---------
libminifi/test/TestBase.cpp | 32 ++++++--
libminifi/test/TestBase.h | 13 ++-
.../test/unit/ContentRepositoryDependentTests.h | 2 +-
libminifi/test/unit/ProcessSessionTests.cpp | 51 +++++++++++-
9 files changed, 156 insertions(+), 57 deletions(-)
diff --git a/extensions/sftp/tests/ListSFTPTests.cpp b/extensions/sftp/tests/ListSFTPTests.cpp
index f98a51d40..0ddabf900 100644
--- a/extensions/sftp/tests/ListSFTPTests.cpp
+++ b/extensions/sftp/tests/ListSFTPTests.cpp
@@ -94,7 +94,7 @@ class ListSFTPTestsFixture {
list_sftp.reset();
plan.reset();
- plan = testController.createPlan(configuration, state_dir.c_str());
+ plan = testController.createPlan(configuration, state_dir);
if (list_sftp_uuid == nullptr) {
list_sftp = plan->addProcessor(
"ListSFTP",
diff --git a/extensions/standard-processors/tests/unit/TailFileTests.cpp b/extensions/standard-processors/tests/unit/TailFileTests.cpp
index 808dbb190..ccfba30ec 100644
--- a/extensions/standard-processors/tests/unit/TailFileTests.cpp
+++ b/extensions/standard-processors/tests/unit/TailFileTests.cpp
@@ -1117,7 +1117,7 @@ TEST_CASE("TailFile finds and finishes the renamed file and continues with the n
// use persistent state storage that defaults to rocksDB, not volatile
const auto configuration = std::make_shared<minifi::Configure>();
{
- auto test_plan = testController.createPlan(configuration, state_dir.c_str());
+ auto test_plan = testController.createPlan(configuration, state_dir);
auto tail_file = test_plan->addProcessor("TailFile", tail_file_uuid, "Tail", {success_relationship});
test_plan->setProperty(tail_file, minifi::processors::TailFile::FileName.getName(), test_file);
auto log_attr = test_plan->addProcessor("LogAttribute", "Log", success_relationship, true);
@@ -1138,7 +1138,7 @@ TEST_CASE("TailFile finds and finishes the renamed file and continues with the n
createTempFile(log_dir, "test.log", "line eight is the last line\n");
{
- auto test_plan = testController.createPlan(configuration, state_dir.c_str());
+ auto test_plan = testController.createPlan(configuration, state_dir);
auto tail_file = test_plan->addProcessor("TailFile", tail_file_uuid, "Tail", {success_relationship});
test_plan->setProperty(tail_file, minifi::processors::TailFile::FileName.getName(), test_file);
auto log_attr = test_plan->addProcessor("LogAttribute", "Log", success_relationship, true);
diff --git a/libminifi/include/core/repository/VolatileFlowFileRepository.h b/libminifi/include/core/repository/VolatileFlowFileRepository.h
index 79fbf7671..23b00be97 100644
--- a/libminifi/include/core/repository/VolatileFlowFileRepository.h
+++ b/libminifi/include/core/repository/VolatileFlowFileRepository.h
@@ -26,6 +26,8 @@
#include "core/ThreadedRepository.h"
#include "utils/gsl.h"
+struct VolatileFlowFileRepositoryTestAccessor;
+
namespace org {
namespace apache {
namespace nifi {
@@ -38,6 +40,8 @@ namespace repository {
* those which we no longer hold.
*/
class VolatileFlowFileRepository : public VolatileRepository<std::string, core::ThreadedRepository> {
+ friend struct ::VolatileFlowFileRepositoryTestAccessor;
+
public:
explicit VolatileFlowFileRepository(const std::string& repo_name = "",
const std::string& /*dir*/ = REPOSITORY_DIRECTORY,
diff --git a/libminifi/include/core/repository/VolatileRepository.h b/libminifi/include/core/repository/VolatileRepository.h
index d3771e612..097f551c5 100644
--- a/libminifi/include/core/repository/VolatileRepository.h
+++ b/libminifi/include/core/repository/VolatileRepository.h
@@ -25,6 +25,7 @@
#include <string>
#include <utility>
#include <vector>
+#include <cinttypes>
#include "AtomicRepoEntries.h"
#include "Connection.h"
@@ -134,7 +135,7 @@ class VolatileRepository : public RepositoryType {
// current size of the volatile repo.
std::atomic<size_t> current_size_;
// current index.
- std::atomic<uint16_t> current_index_;
+ std::atomic<uint32_t> current_index_;
// value vector that exists for non blocking iteration over
// objects that store data for this repo instance.
std::vector<AtomicEntry<KeyType>*> value_vector_;
@@ -226,11 +227,11 @@ bool VolatileRepository<KeyType, RepositoryType>::Put(const KeyType& key, const
size_t reclaimed_size = 0;
RepoValue<KeyType> old_value;
do {
- uint16_t private_index = current_index_.fetch_add(1);
+ uint32_t private_index = current_index_.fetch_add(1);
// round robin through the beginning
if (private_index >= max_count_) {
- uint16_t new_index = 0;
- if (current_index_.compare_exchange_weak(new_index, 0)) {
+ uint32_t new_index = private_index + 1;
+ if (current_index_.compare_exchange_weak(new_index, 1)) {
private_index = 0;
} else {
continue;
@@ -256,7 +257,7 @@ bool VolatileRepository<KeyType, RepositoryType>::Put(const KeyType& key, const
} while (!updated);
current_size_ += size;
- logger_->log_debug("VolatileRepository -- put %u %u", current_size_.load(), current_index_.load());
+ logger_->log_debug("VolatileRepository -- put %zu %" PRIu32, current_size_.load(), current_index_.load());
return true;
}
diff --git a/libminifi/src/core/ProcessSession.cpp b/libminifi/src/core/ProcessSession.cpp
index b9ca57272..d472fff0e 100644
--- a/libminifi/src/core/ProcessSession.cpp
+++ b/libminifi/src/core/ProcessSession.cpp
@@ -976,54 +976,72 @@ void ProcessSession::persistFlowFilesBeforeTransfer(
auto flowFileRepo = process_context_->getFlowFileRepository();
auto contentRepo = process_context_->getContentRepository();
- for (auto& [target, flows] : transactionMap) {
- const auto connection = dynamic_cast<Connection*>(target);
- const bool shouldDropEmptyFiles = connection && connection->getDropEmptyFlowFiles();
- for (auto &ff : flows) {
- if (shouldDropEmptyFiles && ff->getSize() == 0) {
- // the receiver will drop this FF
- continue;
+ enum class Type {
+ Dropped, Transferred
+ };
+
+ auto forEachFlowFile = [&] (Type type, auto fn) {
+ for (auto& [target, flows] : transactionMap) {
+ const auto connection = dynamic_cast<Connection*>(target);
+ const bool shouldDropEmptyFiles = connection && connection->getDropEmptyFlowFiles();
+ for (auto &ff : flows) {
+ auto snapshotIt = modifiedFlowFiles.find(ff->getUUID());
+ auto original = snapshotIt != modifiedFlowFiles.end() ? snapshotIt->second.snapshot : nullptr;
+ if (shouldDropEmptyFiles && ff->getSize() == 0) {
+ // the receiver will drop this FF
+ if (type == Type::Dropped) {
+ fn(ff, original);
+ }
+ } else {
+ if (type == Type::Transferred) {
+ fn(ff, original);
+ }
+ }
}
+ }
+ };
- std::unique_ptr<io::BufferStream> stream(new io::BufferStream());
- std::static_pointer_cast<FlowFileRecord>(ff)->Serialize(*stream);
+ // collect serialized flowfiles
+ forEachFlowFile(Type::Transferred, [&] (auto& ff, auto& /*original*/) {
+ auto stream = std::make_unique<io::BufferStream>();
+ std::static_pointer_cast<FlowFileRecord>(ff)->Serialize(*stream);
- flowData.emplace_back(ff->getUUIDStr(), std::move(stream));
- }
- }
+ flowData.emplace_back(ff->getUUIDStr(), std::move(stream));
+ });
+
+ // increment on behalf of the to be persisted instance
+ forEachFlowFile(Type::Transferred, [&] (auto& ff, auto& /*original*/) {
+ if (auto claim = ff->getResourceClaim())
+ claim->increaseFlowFileRecordOwnedCount();
+ });
if (!flowFileRepo->MultiPut(flowData)) {
logger_->log_error("Failed execute multiput on FF repo!");
+ // decrement on behalf of the failed persisted instance
+ forEachFlowFile(Type::Transferred, [&] (auto& ff, auto& /*original*/) {
+ if (auto claim = ff->getResourceClaim())
+ claim->decreaseFlowFileRecordOwnedCount();
+ });
throw Exception(PROCESS_SESSION_EXCEPTION, "Failed to put flowfiles to repository");
}
- for (auto& [target, flows] : transactionMap) {
- const auto connection = dynamic_cast<Connection*>(target);
- const bool shouldDropEmptyFiles = connection && connection->getDropEmptyFlowFiles();
- for (auto &ff : flows) {
- utils::Identifier uuid = ff->getUUID();
- auto snapshotIt = modifiedFlowFiles.find(uuid);
- auto original = snapshotIt != modifiedFlowFiles.end() ? snapshotIt->second.snapshot : nullptr;
- if (shouldDropEmptyFiles && ff->getSize() == 0) {
- // the receiver promised to drop this FF, no need for it anymore
- if (ff->isStored() && flowFileRepo->Delete(ff->getUUIDStr())) {
- // original must be non-null since this flowFile is already stored in the repos ->
- // must have come from a session->get()
- assert(original);
- ff->setStoredToRepository(false);
- }
- continue;
- }
- auto claim = ff->getResourceClaim();
- // increment on behalf of the persisted instance
- if (claim) claim->increaseFlowFileRecordOwnedCount();
- auto originalClaim = original ? original->getResourceClaim() : nullptr;
- // decrement on behalf of the overridden instance if any
- if (originalClaim) originalClaim->decreaseFlowFileRecordOwnedCount();
-
- ff->setStoredToRepository(true);
+ // decrement on behalf of the overridden instance if any
+ forEachFlowFile(Type::Transferred, [&] (auto& ff, auto& original) {
+ if (auto original_claim = original ? original->getResourceClaim() : nullptr) {
+ original_claim->decreaseFlowFileRecordOwnedCount();
}
- }
+ ff->setStoredToRepository(true);
+ });
+
+ forEachFlowFile(Type::Dropped, [&] (auto& ff, auto& original) {
+ // the receiver promised to drop this FF, no need for it anymore
+ if (ff->isStored() && flowFileRepo->Delete(ff->getUUIDStr())) {
+ // original must be non-null since this flowFile is already stored in the repos ->
+ // must have come from a session->get()
+ gsl_Assert(original);
+ ff->setStoredToRepository(false);
+ }
+ });
}
void ProcessSession::ensureNonNullResourceClaim(
diff --git a/libminifi/test/TestBase.cpp b/libminifi/test/TestBase.cpp
index b8599f9cc..f09983677 100644
--- a/libminifi/test/TestBase.cpp
+++ b/libminifi/test/TestBase.cpp
@@ -619,16 +619,34 @@ TestController::TestController()
flow_version_ = std::make_shared<minifi::state::response::FlowVersion>("test", "test", "test");
}
-std::shared_ptr<TestPlan> TestController::createPlan(std::shared_ptr<minifi::Configure> configuration, const char* state_dir, std::shared_ptr<minifi::core::ContentRepository> content_repo) {
- if (configuration == nullptr) {
- configuration = std::make_shared<minifi::Configure>();
- configuration->set(minifi::Configure::nifi_state_management_provider_local_class_name, "UnorderedMapKeyValueStoreService");
- configuration->set(minifi::Configure::nifi_dbcontent_repository_directory_default, createTempDirectory());
+std::shared_ptr<TestPlan> TestController::createPlan(PlanConfig config) {
+ if (!config.configuration) {
+ config.configuration = std::make_shared<minifi::Configure>();
+ config.configuration->set(minifi::Configure::nifi_state_management_provider_local_class_name, "UnorderedMapKeyValueStoreService");
+ config.configuration->set(minifi::Configure::nifi_dbcontent_repository_directory_default, createTempDirectory());
}
- content_repo->initialize(configuration);
+ if (!config.flow_file_repo)
+ config.flow_file_repo = std::make_shared<TestRepository>();
- return std::make_shared<TestPlan>(std::move(content_repo), std::make_shared<TestRepository>(), std::make_shared<TestRepository>(), flow_version_, configuration, state_dir);
+ if (!config.content_repo)
+ config.content_repo = std::make_shared<minifi::core::repository::VolatileContentRepository>();
+
+ config.content_repo->initialize(config.configuration);
+ config.flow_file_repo->initialize(config.configuration);
+ config.flow_file_repo->loadComponent(config.content_repo);
+
+ return std::make_shared<TestPlan>(
+ std::move(config.content_repo), std::move(config.flow_file_repo), std::make_shared<TestRepository>(),
+ flow_version_, config.configuration, config.state_dir ? config.state_dir->string().c_str() : nullptr);
+}
+
+std::shared_ptr<TestPlan> TestController::createPlan(std::shared_ptr<minifi::Configure> configuration, std::optional<std::filesystem::path> state_dir, std::shared_ptr<minifi::core::ContentRepository> content_repo) {
+ return createPlan(PlanConfig{
+ .configuration = std::move(configuration),
+ .state_dir = std::move(state_dir),
+ .content_repo = std::move(content_repo)
+ });
}
std::string TestController::createTempDirectory() {
diff --git a/libminifi/test/TestBase.h b/libminifi/test/TestBase.h
index fee34300d..df401203c 100644
--- a/libminifi/test/TestBase.h
+++ b/libminifi/test/TestBase.h
@@ -331,10 +331,19 @@ class TestPlan {
class TestController {
public:
+ struct PlanConfig {
+ std::shared_ptr<minifi::Configure> configuration = {};
+ std::optional<std::filesystem::path> state_dir = {};
+ std::shared_ptr<minifi::core::ContentRepository> content_repo = {};
+ std::shared_ptr<minifi::core::Repository> flow_file_repo = {};
+ };
+
TestController();
- std::shared_ptr<TestPlan> createPlan(std::shared_ptr<minifi::Configure> configuration = nullptr, const char* state_dir = nullptr,
- std::shared_ptr<minifi::core::ContentRepository> content_repo = std::make_shared<minifi::core::repository::VolatileContentRepository>());
+ std::shared_ptr<TestPlan> createPlan(PlanConfig config);
+
+ std::shared_ptr<TestPlan> createPlan(std::shared_ptr<minifi::Configure> configuration = nullptr, std::optional<std::filesystem::path> state_dir = {},
+ std::shared_ptr<minifi::core::ContentRepository> content_repo = nullptr);
static void runSession(const std::shared_ptr<TestPlan> &plan,
bool runToCompletion = true,
diff --git a/libminifi/test/unit/ContentRepositoryDependentTests.h b/libminifi/test/unit/ContentRepositoryDependentTests.h
index 6674729e8..f19533ba3 100644
--- a/libminifi/test/unit/ContentRepositoryDependentTests.h
+++ b/libminifi/test/unit/ContentRepositoryDependentTests.h
@@ -60,7 +60,7 @@ class Fixture {
const core::Relationship Failure{"failure", "something has gone awry"};
explicit Fixture(std::shared_ptr<core::ContentRepository> content_repo) {
- test_plan_ = test_controller_.createPlan(nullptr, nullptr, content_repo);
+ test_plan_ = test_controller_.createPlan(nullptr, std::nullopt, content_repo);
dummy_processor_ = test_plan_->addProcessor("DummyProcessor", "dummyProcessor");
context_ = [this] {
test_plan_->runNextProcessor();
diff --git a/libminifi/test/unit/ProcessSessionTests.cpp b/libminifi/test/unit/ProcessSessionTests.cpp
index 55a39b272..ff4a78645 100644
--- a/libminifi/test/unit/ProcessSessionTests.cpp
+++ b/libminifi/test/unit/ProcessSessionTests.cpp
@@ -25,16 +25,22 @@
#include "../Catch.h"
#include "ContentRepositoryDependentTests.h"
#include "Processor.h"
+#include "core/repository/VolatileFlowFileRepository.h"
+#include "IntegrationTestUtils.h"
+#include "../Utils.h"
namespace {
class Fixture {
public:
+ explicit Fixture(TestController::PlanConfig config = {}): plan_config_(std::move(config)) {}
+
minifi::core::ProcessSession &processSession() { return *process_session_; }
private:
TestController test_controller_;
- std::shared_ptr<TestPlan> test_plan_ = test_controller_.createPlan();
+ TestController::PlanConfig plan_config_;
+ std::shared_ptr<TestPlan> test_plan_ = test_controller_.createPlan(plan_config_);
std::shared_ptr<minifi::core::Processor> dummy_processor_ = test_plan_->addProcessor("DummyProcessor", "dummyProcessor");
std::shared_ptr<minifi::core::ProcessContext> context_ = [this] { test_plan_->runNextProcessor(); return test_plan_->getCurrentContext(); }();
std::unique_ptr<minifi::core::ProcessSession> process_session_ = std::make_unique<core::ProcessSession>(context_);
@@ -122,3 +128,46 @@ TEST_CASE("ProcessSession::read can read zero length flowfiles without crash", "
ContentRepositoryDependentTests::testReadFromZeroLengthFlowFile(std::make_shared<core::repository::VolatileContentRepository>());
ContentRepositoryDependentTests::testReadFromZeroLengthFlowFile(std::make_shared<core::repository::FileSystemRepository>());
}
+
+struct VolatileFlowFileRepositoryTestAccessor {
+ METHOD_ACCESSOR(flush);
+};
+
+class TestVolatileFlowFileRepository : public core::repository::VolatileFlowFileRepository {
+ public:
+ explicit TestVolatileFlowFileRepository(const std::string& name) : core::SerializableComponent(name) {}
+
+ bool MultiPut(const std::vector<std::pair<std::string, std::unique_ptr<minifi::io::BufferStream>>>& data) override {
+ auto flush_on_exit = gsl::finally([&] {VolatileFlowFileRepositoryTestAccessor::call_flush(*this);});
+ return VolatileFlowFileRepository::MultiPut(data);
+ }
+};
+
+TEST_CASE("ProcessSession::commit avoids dangling ResourceClaims when using VolatileFlowFileRepository", "[incrementbefore]") {
+ auto configuration = std::make_shared<minifi::Configure>();
+ configuration->set(minifi::Configure::nifi_volatile_repository_options_flowfile_max_count, "2");
+ auto ff_repo = std::make_shared<TestVolatileFlowFileRepository>("flowfile");
+ Fixture fixture({
+ .configuration = std::move(configuration),
+ .flow_file_repo = ff_repo
+ });
+ auto& session = fixture.processSession();
+
+ const auto flow_file_1 = session.create();
+ const auto flow_file_2 = session.create();
+ const auto flow_file_3 = session.create();
+ session.transfer(flow_file_1, Success);
+ session.transfer(flow_file_2, Success);
+ session.transfer(flow_file_3, Success);
+ session.commit();
+
+ // flow_files are owned by the shared_ptr on the stack and the ff_repo
+ // but the first one has been evicted from the ff_repo
+ REQUIRE(flow_file_1->getResourceClaim()->getFlowFileRecordOwnedCount() == 1);
+ REQUIRE(flow_file_2->getResourceClaim()->getFlowFileRecordOwnedCount() == 2);
+ REQUIRE(flow_file_3->getResourceClaim()->getFlowFileRecordOwnedCount() == 2);
+
+ REQUIRE(flow_file_1->getResourceClaim()->exists());
+ REQUIRE(flow_file_2->getResourceClaim()->exists());
+ REQUIRE(flow_file_3->getResourceClaim()->exists());
+}