You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by fg...@apache.org on 2022/11/15 18:38:20 UTC

[nifi-minifi-cpp] 02/04: MINIFICPP-1959 - Ensure that VolatileFlowFileRepository does not delete referenced resource

This is an automated email from the ASF dual-hosted git repository.

fgerlits pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit 8bda98d4a959cdf18459cc57f156e2d6dc557e75
Author: Adam Debreceni <ad...@apache.org>
AuthorDate: Fri Oct 14 14:15:40 2022 +0200

    MINIFICPP-1959 - Ensure that VolatileFlowFileRepository does not delete referenced resource
    
    Signed-off-by: Ferenc Gerlits <fg...@gmail.com>
    This closes #1435
---
 extensions/sftp/tests/ListSFTPTests.cpp            |  2 +-
 .../tests/unit/TailFileTests.cpp                   |  4 +-
 .../core/repository/VolatileFlowFileRepository.h   |  4 +
 .../include/core/repository/VolatileRepository.h   | 11 +--
 libminifi/src/core/ProcessSession.cpp              | 94 +++++++++++++---------
 libminifi/test/TestBase.cpp                        | 32 ++++++--
 libminifi/test/TestBase.h                          | 13 ++-
 .../test/unit/ContentRepositoryDependentTests.h    |  2 +-
 libminifi/test/unit/ProcessSessionTests.cpp        | 51 +++++++++++-
 9 files changed, 156 insertions(+), 57 deletions(-)

diff --git a/extensions/sftp/tests/ListSFTPTests.cpp b/extensions/sftp/tests/ListSFTPTests.cpp
index f98a51d40..0ddabf900 100644
--- a/extensions/sftp/tests/ListSFTPTests.cpp
+++ b/extensions/sftp/tests/ListSFTPTests.cpp
@@ -94,7 +94,7 @@ class ListSFTPTestsFixture {
     list_sftp.reset();
     plan.reset();
 
-    plan = testController.createPlan(configuration, state_dir.c_str());
+    plan = testController.createPlan(configuration, state_dir);
     if (list_sftp_uuid == nullptr) {
       list_sftp = plan->addProcessor(
           "ListSFTP",
diff --git a/extensions/standard-processors/tests/unit/TailFileTests.cpp b/extensions/standard-processors/tests/unit/TailFileTests.cpp
index 808dbb190..ccfba30ec 100644
--- a/extensions/standard-processors/tests/unit/TailFileTests.cpp
+++ b/extensions/standard-processors/tests/unit/TailFileTests.cpp
@@ -1117,7 +1117,7 @@ TEST_CASE("TailFile finds and finishes the renamed file and continues with the n
   // use persistent state storage that defaults to rocksDB, not volatile
   const auto configuration = std::make_shared<minifi::Configure>();
   {
-    auto test_plan = testController.createPlan(configuration, state_dir.c_str());
+    auto test_plan = testController.createPlan(configuration, state_dir);
     auto tail_file = test_plan->addProcessor("TailFile", tail_file_uuid, "Tail", {success_relationship});
     test_plan->setProperty(tail_file, minifi::processors::TailFile::FileName.getName(), test_file);
     auto log_attr = test_plan->addProcessor("LogAttribute", "Log", success_relationship, true);
@@ -1138,7 +1138,7 @@ TEST_CASE("TailFile finds and finishes the renamed file and continues with the n
   createTempFile(log_dir, "test.log", "line eight is the last line\n");
 
   {
-    auto test_plan = testController.createPlan(configuration, state_dir.c_str());
+    auto test_plan = testController.createPlan(configuration, state_dir);
     auto tail_file = test_plan->addProcessor("TailFile", tail_file_uuid, "Tail", {success_relationship});
     test_plan->setProperty(tail_file, minifi::processors::TailFile::FileName.getName(), test_file);
     auto log_attr = test_plan->addProcessor("LogAttribute", "Log", success_relationship, true);
diff --git a/libminifi/include/core/repository/VolatileFlowFileRepository.h b/libminifi/include/core/repository/VolatileFlowFileRepository.h
index 79fbf7671..23b00be97 100644
--- a/libminifi/include/core/repository/VolatileFlowFileRepository.h
+++ b/libminifi/include/core/repository/VolatileFlowFileRepository.h
@@ -26,6 +26,8 @@
 #include "core/ThreadedRepository.h"
 #include "utils/gsl.h"
 
+struct VolatileFlowFileRepositoryTestAccessor;
+
 namespace org {
 namespace apache {
 namespace nifi {
@@ -38,6 +40,8 @@ namespace repository {
  * those which we no longer hold.
  */
 class VolatileFlowFileRepository : public VolatileRepository<std::string, core::ThreadedRepository> {
+  friend struct ::VolatileFlowFileRepositoryTestAccessor;
+
  public:
   explicit VolatileFlowFileRepository(const std::string& repo_name = "",
                                       const std::string& /*dir*/ = REPOSITORY_DIRECTORY,
diff --git a/libminifi/include/core/repository/VolatileRepository.h b/libminifi/include/core/repository/VolatileRepository.h
index d3771e612..097f551c5 100644
--- a/libminifi/include/core/repository/VolatileRepository.h
+++ b/libminifi/include/core/repository/VolatileRepository.h
@@ -25,6 +25,7 @@
 #include <string>
 #include <utility>
 #include <vector>
+#include <cinttypes>
 
 #include "AtomicRepoEntries.h"
 #include "Connection.h"
@@ -134,7 +135,7 @@ class VolatileRepository : public RepositoryType {
   // current size of the volatile repo.
   std::atomic<size_t> current_size_;
   // current index.
-  std::atomic<uint16_t> current_index_;
+  std::atomic<uint32_t> current_index_;
   // value vector that exists for non blocking iteration over
   // objects that store data for this repo instance.
   std::vector<AtomicEntry<KeyType>*> value_vector_;
@@ -226,11 +227,11 @@ bool VolatileRepository<KeyType, RepositoryType>::Put(const KeyType& key, const
   size_t reclaimed_size = 0;
   RepoValue<KeyType> old_value;
   do {
-    uint16_t private_index = current_index_.fetch_add(1);
+    uint32_t private_index = current_index_.fetch_add(1);
     // round robin through the beginning
     if (private_index >= max_count_) {
-      uint16_t new_index = 0;
-      if (current_index_.compare_exchange_weak(new_index, 0)) {
+      uint32_t new_index = private_index + 1;
+      if (current_index_.compare_exchange_weak(new_index, 1)) {
         private_index = 0;
       } else {
         continue;
@@ -256,7 +257,7 @@ bool VolatileRepository<KeyType, RepositoryType>::Put(const KeyType& key, const
   } while (!updated);
   current_size_ += size;
 
-  logger_->log_debug("VolatileRepository -- put %u %u", current_size_.load(), current_index_.load());
+  logger_->log_debug("VolatileRepository -- put %zu %" PRIu32, current_size_.load(), current_index_.load());
   return true;
 }
 
diff --git a/libminifi/src/core/ProcessSession.cpp b/libminifi/src/core/ProcessSession.cpp
index b9ca57272..d472fff0e 100644
--- a/libminifi/src/core/ProcessSession.cpp
+++ b/libminifi/src/core/ProcessSession.cpp
@@ -976,54 +976,72 @@ void ProcessSession::persistFlowFilesBeforeTransfer(
   auto flowFileRepo = process_context_->getFlowFileRepository();
   auto contentRepo = process_context_->getContentRepository();
 
-  for (auto& [target, flows] : transactionMap) {
-    const auto connection = dynamic_cast<Connection*>(target);
-    const bool shouldDropEmptyFiles = connection && connection->getDropEmptyFlowFiles();
-    for (auto &ff : flows) {
-      if (shouldDropEmptyFiles && ff->getSize() == 0) {
-        // the receiver will drop this FF
-        continue;
+  enum class Type {
+    Dropped, Transferred
+  };
+
+  auto forEachFlowFile = [&] (Type type, auto fn) {
+    for (auto& [target, flows] : transactionMap) {
+      const auto connection = dynamic_cast<Connection*>(target);
+      const bool shouldDropEmptyFiles = connection && connection->getDropEmptyFlowFiles();
+      for (auto &ff : flows) {
+        auto snapshotIt = modifiedFlowFiles.find(ff->getUUID());
+        auto original = snapshotIt != modifiedFlowFiles.end() ? snapshotIt->second.snapshot : nullptr;
+        if (shouldDropEmptyFiles && ff->getSize() == 0) {
+          // the receiver will drop this FF
+          if (type == Type::Dropped) {
+            fn(ff, original);
+          }
+        } else {
+          if (type == Type::Transferred) {
+            fn(ff, original);
+          }
+        }
       }
+    }
+  };
 
-      std::unique_ptr<io::BufferStream> stream(new io::BufferStream());
-      std::static_pointer_cast<FlowFileRecord>(ff)->Serialize(*stream);
+  // collect serialized flowfiles
+  forEachFlowFile(Type::Transferred, [&] (auto& ff, auto& /*original*/) {
+    auto stream = std::make_unique<io::BufferStream>();
+    std::static_pointer_cast<FlowFileRecord>(ff)->Serialize(*stream);
 
-      flowData.emplace_back(ff->getUUIDStr(), std::move(stream));
-    }
-  }
+    flowData.emplace_back(ff->getUUIDStr(), std::move(stream));
+  });
+
+  // increment on behalf of the to be persisted instance
+  forEachFlowFile(Type::Transferred, [&] (auto& ff, auto& /*original*/) {
+    if (auto claim = ff->getResourceClaim())
+      claim->increaseFlowFileRecordOwnedCount();
+  });
 
   if (!flowFileRepo->MultiPut(flowData)) {
     logger_->log_error("Failed execute multiput on FF repo!");
+    // decrement on behalf of the failed persisted instance
+    forEachFlowFile(Type::Transferred, [&] (auto& ff, auto& /*original*/) {
+      if (auto claim = ff->getResourceClaim())
+        claim->decreaseFlowFileRecordOwnedCount();
+    });
     throw Exception(PROCESS_SESSION_EXCEPTION, "Failed to put flowfiles to repository");
   }
 
-  for (auto& [target, flows] : transactionMap) {
-    const auto connection = dynamic_cast<Connection*>(target);
-    const bool shouldDropEmptyFiles = connection && connection->getDropEmptyFlowFiles();
-    for (auto &ff : flows) {
-      utils::Identifier uuid = ff->getUUID();
-      auto snapshotIt = modifiedFlowFiles.find(uuid);
-      auto original = snapshotIt != modifiedFlowFiles.end() ? snapshotIt->second.snapshot : nullptr;
-      if (shouldDropEmptyFiles && ff->getSize() == 0) {
-        // the receiver promised to drop this FF, no need for it anymore
-        if (ff->isStored() && flowFileRepo->Delete(ff->getUUIDStr())) {
-          // original must be non-null since this flowFile is already stored in the repos ->
-          // must have come from a session->get()
-          assert(original);
-          ff->setStoredToRepository(false);
-        }
-        continue;
-      }
-      auto claim = ff->getResourceClaim();
-      // increment on behalf of the persisted instance
-      if (claim) claim->increaseFlowFileRecordOwnedCount();
-      auto originalClaim = original ? original->getResourceClaim() : nullptr;
-      // decrement on behalf of the overridden instance if any
-      if (originalClaim) originalClaim->decreaseFlowFileRecordOwnedCount();
-
-      ff->setStoredToRepository(true);
+  // decrement on behalf of the overridden instance if any
+  forEachFlowFile(Type::Transferred, [&] (auto& ff, auto& original) {
+    if (auto original_claim = original ? original->getResourceClaim() : nullptr) {
+      original_claim->decreaseFlowFileRecordOwnedCount();
     }
-  }
+    ff->setStoredToRepository(true);
+  });
+
+  forEachFlowFile(Type::Dropped, [&] (auto& ff, auto& original) {
+    // the receiver promised to drop this FF, no need for it anymore
+    if (ff->isStored() && flowFileRepo->Delete(ff->getUUIDStr())) {
+      // original must be non-null since this flowFile is already stored in the repos ->
+      // must have come from a session->get()
+      gsl_Assert(original);
+      ff->setStoredToRepository(false);
+    }
+  });
 }
 
 void ProcessSession::ensureNonNullResourceClaim(
diff --git a/libminifi/test/TestBase.cpp b/libminifi/test/TestBase.cpp
index b8599f9cc..f09983677 100644
--- a/libminifi/test/TestBase.cpp
+++ b/libminifi/test/TestBase.cpp
@@ -619,16 +619,34 @@ TestController::TestController()
   flow_version_ = std::make_shared<minifi::state::response::FlowVersion>("test", "test", "test");
 }
 
-std::shared_ptr<TestPlan> TestController::createPlan(std::shared_ptr<minifi::Configure> configuration, const char* state_dir, std::shared_ptr<minifi::core::ContentRepository> content_repo) {
-  if (configuration == nullptr) {
-    configuration = std::make_shared<minifi::Configure>();
-    configuration->set(minifi::Configure::nifi_state_management_provider_local_class_name, "UnorderedMapKeyValueStoreService");
-      configuration->set(minifi::Configure::nifi_dbcontent_repository_directory_default, createTempDirectory());
+std::shared_ptr<TestPlan> TestController::createPlan(PlanConfig config) {
+  if (!config.configuration) {
+    config.configuration = std::make_shared<minifi::Configure>();
+    config.configuration->set(minifi::Configure::nifi_state_management_provider_local_class_name, "UnorderedMapKeyValueStoreService");
+    config.configuration->set(minifi::Configure::nifi_dbcontent_repository_directory_default, createTempDirectory());
   }
 
-  content_repo->initialize(configuration);
+  if (!config.flow_file_repo)
+    config.flow_file_repo = std::make_shared<TestRepository>();
 
-  return std::make_shared<TestPlan>(std::move(content_repo), std::make_shared<TestRepository>(), std::make_shared<TestRepository>(), flow_version_, configuration, state_dir);
+  if (!config.content_repo)
+    config.content_repo = std::make_shared<minifi::core::repository::VolatileContentRepository>();
+
+  config.content_repo->initialize(config.configuration);
+  config.flow_file_repo->initialize(config.configuration);
+  config.flow_file_repo->loadComponent(config.content_repo);
+
+  return std::make_shared<TestPlan>(
+      std::move(config.content_repo), std::move(config.flow_file_repo), std::make_shared<TestRepository>(),
+      flow_version_, config.configuration, config.state_dir ? config.state_dir->string().c_str() : nullptr);
+}
+
+std::shared_ptr<TestPlan> TestController::createPlan(std::shared_ptr<minifi::Configure> configuration, std::optional<std::filesystem::path> state_dir, std::shared_ptr<minifi::core::ContentRepository> content_repo) {
+  return createPlan(PlanConfig{
+    .configuration = std::move(configuration),
+    .state_dir = std::move(state_dir),
+    .content_repo = std::move(content_repo)
+  });
 }
 
 std::string TestController::createTempDirectory() {
diff --git a/libminifi/test/TestBase.h b/libminifi/test/TestBase.h
index fee34300d..df401203c 100644
--- a/libminifi/test/TestBase.h
+++ b/libminifi/test/TestBase.h
@@ -331,10 +331,19 @@ class TestPlan {
 
 class TestController {
  public:
+  struct PlanConfig {
+    std::shared_ptr<minifi::Configure> configuration = {};
+    std::optional<std::filesystem::path> state_dir = {};
+    std::shared_ptr<minifi::core::ContentRepository> content_repo = {};
+    std::shared_ptr<minifi::core::Repository> flow_file_repo = {};
+  };
+
   TestController();
 
-  std::shared_ptr<TestPlan> createPlan(std::shared_ptr<minifi::Configure> configuration = nullptr, const char* state_dir = nullptr,
-      std::shared_ptr<minifi::core::ContentRepository> content_repo = std::make_shared<minifi::core::repository::VolatileContentRepository>());
+  std::shared_ptr<TestPlan> createPlan(PlanConfig config);
+
+  std::shared_ptr<TestPlan> createPlan(std::shared_ptr<minifi::Configure> configuration = nullptr, std::optional<std::filesystem::path> state_dir = {},
+      std::shared_ptr<minifi::core::ContentRepository> content_repo = nullptr);
 
   static void runSession(const std::shared_ptr<TestPlan> &plan,
                   bool runToCompletion = true,
diff --git a/libminifi/test/unit/ContentRepositoryDependentTests.h b/libminifi/test/unit/ContentRepositoryDependentTests.h
index 6674729e8..f19533ba3 100644
--- a/libminifi/test/unit/ContentRepositoryDependentTests.h
+++ b/libminifi/test/unit/ContentRepositoryDependentTests.h
@@ -60,7 +60,7 @@ class Fixture {
   const core::Relationship Failure{"failure", "something has gone awry"};
 
   explicit Fixture(std::shared_ptr<core::ContentRepository> content_repo) {
-    test_plan_ = test_controller_.createPlan(nullptr, nullptr, content_repo);
+    test_plan_ = test_controller_.createPlan(nullptr, std::nullopt, content_repo);
     dummy_processor_ = test_plan_->addProcessor("DummyProcessor", "dummyProcessor");
     context_ = [this] {
       test_plan_->runNextProcessor();
diff --git a/libminifi/test/unit/ProcessSessionTests.cpp b/libminifi/test/unit/ProcessSessionTests.cpp
index 55a39b272..ff4a78645 100644
--- a/libminifi/test/unit/ProcessSessionTests.cpp
+++ b/libminifi/test/unit/ProcessSessionTests.cpp
@@ -25,16 +25,22 @@
 #include "../Catch.h"
 #include "ContentRepositoryDependentTests.h"
 #include "Processor.h"
+#include "core/repository/VolatileFlowFileRepository.h"
+#include "IntegrationTestUtils.h"
+#include "../Utils.h"
 
 namespace {
 
 class Fixture {
  public:
+  explicit Fixture(TestController::PlanConfig config = {}): plan_config_(std::move(config)) {}
+
   minifi::core::ProcessSession &processSession() { return *process_session_; }
 
  private:
   TestController test_controller_;
-  std::shared_ptr<TestPlan> test_plan_ = test_controller_.createPlan();
+  TestController::PlanConfig plan_config_;
+  std::shared_ptr<TestPlan> test_plan_ = test_controller_.createPlan(plan_config_);
   std::shared_ptr<minifi::core::Processor> dummy_processor_ = test_plan_->addProcessor("DummyProcessor", "dummyProcessor");
   std::shared_ptr<minifi::core::ProcessContext> context_ = [this] { test_plan_->runNextProcessor(); return test_plan_->getCurrentContext(); }();
   std::unique_ptr<minifi::core::ProcessSession> process_session_ = std::make_unique<core::ProcessSession>(context_);
@@ -122,3 +128,46 @@ TEST_CASE("ProcessSession::read can read zero length flowfiles without crash", "
   ContentRepositoryDependentTests::testReadFromZeroLengthFlowFile(std::make_shared<core::repository::VolatileContentRepository>());
   ContentRepositoryDependentTests::testReadFromZeroLengthFlowFile(std::make_shared<core::repository::FileSystemRepository>());
 }
+
+struct VolatileFlowFileRepositoryTestAccessor {
+  METHOD_ACCESSOR(flush);
+};
+
+class TestVolatileFlowFileRepository : public core::repository::VolatileFlowFileRepository {
+ public:
+  explicit TestVolatileFlowFileRepository(const std::string& name) : core::SerializableComponent(name) {}
+
+  bool MultiPut(const std::vector<std::pair<std::string, std::unique_ptr<minifi::io::BufferStream>>>& data) override {
+    auto flush_on_exit = gsl::finally([&] {VolatileFlowFileRepositoryTestAccessor::call_flush(*this);});
+    return VolatileFlowFileRepository::MultiPut(data);
+  }
+};
+
+TEST_CASE("ProcessSession::commit avoids dangling ResourceClaims when using VolatileFlowFileRepository", "[incrementbefore]") {
+  auto configuration = std::make_shared<minifi::Configure>();
+  configuration->set(minifi::Configure::nifi_volatile_repository_options_flowfile_max_count, "2");
+  auto ff_repo = std::make_shared<TestVolatileFlowFileRepository>("flowfile");
+  Fixture fixture({
+    .configuration = std::move(configuration),
+    .flow_file_repo = ff_repo
+  });
+  auto& session = fixture.processSession();
+
+  const auto flow_file_1 = session.create();
+  const auto flow_file_2 = session.create();
+  const auto flow_file_3 = session.create();
+  session.transfer(flow_file_1, Success);
+  session.transfer(flow_file_2, Success);
+  session.transfer(flow_file_3, Success);
+  session.commit();
+
+  // flow_files are owned by the shared_ptr on the stack and the ff_repo
+  // but the first one has been evicted from the ff_repo
+  REQUIRE(flow_file_1->getResourceClaim()->getFlowFileRecordOwnedCount() == 1);
+  REQUIRE(flow_file_2->getResourceClaim()->getFlowFileRecordOwnedCount() == 2);
+  REQUIRE(flow_file_3->getResourceClaim()->getFlowFileRecordOwnedCount() == 2);
+
+  REQUIRE(flow_file_1->getResourceClaim()->exists());
+  REQUIRE(flow_file_2->getResourceClaim()->exists());
+  REQUIRE(flow_file_3->getResourceClaim()->exists());
+}