You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by GitBox <gi...@apache.org> on 2022/11/02 15:43:42 UTC

[GitHub] [nifi-minifi-cpp] adam-markovics commented on a diff in pull request #1435: MINIFICPP-1959 - Avoid dangling content references when using VolatileFlowFileRepository

adam-markovics commented on code in PR #1435:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1435#discussion_r1011863921


##########
libminifi/src/core/ProcessSession.cpp:
##########
@@ -958,54 +958,72 @@ void ProcessSession::persistFlowFilesBeforeTransfer(
   auto flowFileRepo = process_context_->getFlowFileRepository();
   auto contentRepo = process_context_->getContentRepository();
 
-  for (auto& [target, flows] : transactionMap) {
-    const auto connection = dynamic_cast<Connection*>(target);
-    const bool shouldDropEmptyFiles = connection && connection->getDropEmptyFlowFiles();
-    for (auto &ff : flows) {
-      if (shouldDropEmptyFiles && ff->getSize() == 0) {
-        // the receiver will drop this FF
-        continue;
+  enum class Type {
+    Dropped, Transferred
+  };
+
+  auto forEachFlowFile = [&] (Type type, auto fn) {
+    for (auto& [target, flows] : transactionMap) {
+      const auto connection = dynamic_cast<Connection*>(target);
+      const bool shouldDropEmptyFiles = connection && connection->getDropEmptyFlowFiles();
+      for (auto &ff : flows) {
+        auto snapshotIt = modifiedFlowFiles.find(ff->getUUID());
+        auto original = snapshotIt != modifiedFlowFiles.end() ? snapshotIt->second.snapshot : nullptr;
+        if (shouldDropEmptyFiles && ff->getSize() == 0) {
+          // the receiver will drop this FF
+          if (type == Type::Dropped) {
+            fn(ff, original);
+          }
+        } else {
+          if (type == Type::Transferred) {
+            fn(ff, original);
+          }
+        }
       }
+    }
+  };
 
-      std::unique_ptr<io::BufferStream> stream(new io::BufferStream());
-      std::static_pointer_cast<FlowFileRecord>(ff)->Serialize(*stream);
+  // collect serialized flowfiles
+  forEachFlowFile(Type::Transferred, [&] (auto& ff, auto& /*original*/) {
+    std::unique_ptr<io::BufferStream> stream(new io::BufferStream());

Review Comment:
   make_unique is more modern



##########
libminifi/test/TestBase.h:
##########
@@ -331,10 +331,19 @@ class TestPlan {
 
 class TestController {
  public:
+  struct PlanConfig {
+    std::shared_ptr<minifi::Configure> configuration = {};
+    const char* state_dir = nullptr;

Review Comment:
   I think `string_view` would be better, as pointers don't store either. Default member initializer would no longer be necessary, neither for other members (shared_ptrs).



##########
libminifi/test/unit/ProcessSessionTests.cpp:
##########
@@ -138,3 +144,44 @@ TEST_CASE("ProcessSession::read can read zero length flowfiles without crash", "
   ContentRepositoryDependentTests::testReadFromZeroLengthFlowFile(std::make_shared<core::repository::VolatileContentRepository>());
   ContentRepositoryDependentTests::testReadFromZeroLengthFlowFile(std::make_shared<core::repository::FileSystemRepository>());
 }
+
+struct VolatileFlowFileRepositoryTestAccessor {
+  METHOD_ACCESSOR(flush);
+};
+
+class TestVolatileFlowFileRepository : public core::repository::VolatileFlowFileRepository {
+ public:
+  explicit TestVolatileFlowFileRepository(const std::string& name) : core::SerializableComponent(name) {}
+
+  bool MultiPut(const std::vector<std::pair<std::string, std::unique_ptr<minifi::io::BufferStream>>>& data) override {
+    auto flush_on_exit = gsl::finally([&] {VolatileFlowFileRepositoryTestAccessor::call_flush(*this);});
+    return VolatileFlowFileRepository::MultiPut(data);
+  }
+};
+
+TEST_CASE("ProcessSession::commit avoids dangling ResourceClaims when using VolatileFlowFileRepository", "[incrementbefore]") {
+  auto configuration = std::make_shared<minifi::Configure>();
+  configuration->set(minifi::Configure::nifi_volatile_repository_options_flowfile_max_count, "2");
+  auto ff_repo = std::make_shared<TestVolatileFlowFileRepository>("flowfile");
+  Fixture fixture({
+    .configuration = std::move(configuration),
+    .flow_file_repo = ff_repo
+  });
+  auto& session = fixture.processSession();
+
+  const auto flow_file_1 = session.create();
+  const auto flow_file_2 = session.create();
+  const auto flow_file_3 = session.create();
+  session.transfer(flow_file_1, Success);
+  session.transfer(flow_file_2, Success);
+  session.transfer(flow_file_3, Success);
+  session.commit();
+
+  REQUIRE(flow_file_1->getResourceClaim()->getFlowFileRecordOwnedCount() == 1);
+  REQUIRE(flow_file_2->getResourceClaim()->getFlowFileRecordOwnedCount() == 2);
+  REQUIRE(flow_file_3->getResourceClaim()->getFlowFileRecordOwnedCount() == 2);

Review Comment:
   Why is it 1, 2, 2? I thought it should be 0, 1, 1. Who is the other owner than `ff_repo`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org