You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by GitBox <gi...@apache.org> on 2020/06/29 15:36:39 UTC

[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #826: MINIFICPP-1274 - Commit delete operation before shutdown

szaszm commented on a change in pull request #826:
URL: https://github.com/apache/nifi-minifi-cpp/pull/826#discussion_r447059548



##########
File path: libminifi/test/rocksdb-tests/RepoTests.cpp
##########
@@ -326,3 +326,82 @@ TEST_CASE("Test FlowFile Restore", "[TestFFR6]") {
 
   LogTestController::getInstance().reset();
 }
+
+TEST_CASE("Flush deleted flowfiles before shutdown", "[TestFFR7]") {
+  using ConnectionMap = std::map<std::string, std::shared_ptr<core::Connectable>>;
+
+  class TestFlowFileRepository: public core::repository::FlowFileRepository{
+   public:
+    explicit TestFlowFileRepository(const std::string& name)
+        : core::SerializableComponent(name),
+          FlowFileRepository(name, FLOWFILE_REPOSITORY_DIRECTORY, MAX_FLOWFILE_REPOSITORY_ENTRY_LIFE_TIME,
+                             MAX_FLOWFILE_REPOSITORY_STORAGE_SIZE, 1) {}
+    void flush() override {
+      FlowFileRepository::flush();
+      if (onFlush_) {
+        onFlush_();
+      }
+    }
+    std::function<void()> onFlush_;
+  };
+
+  TestController testController;
+  char format[] = "/tmp/testRepo.XXXXXX";
+  auto dir = testController.createTempDirectory(format);
+
+  auto config = std::make_shared<minifi::Configure>();
+  config->set(minifi::Configure::nifi_flowfile_repository_directory_default, utils::file::FileUtils::concat_path(dir, "flowfile_repository"));
+
+  auto connection = std::make_shared<minifi::Connection>(nullptr, nullptr, "Connection");
+  ConnectionMap connectionMap{{connection->getUUIDStr(), connection}};
+  // initialize repository
+  {
+    std::shared_ptr<TestFlowFileRepository> ff_repository = std::make_shared<TestFlowFileRepository>("flowFileRepository");
+
+    std::atomic<int> flush_counter{0};
+
+    std::atomic<bool> stop{false};
+    std::thread shutdown{[&] {
+      while (!stop.load()) {}
+      ff_repository->stop();
+    }};
+
+    ff_repository->onFlush_ = [&] {
+      if (++flush_counter != 1) {
+        return;
+      }
+      
+      for (int keyIdx = 0; keyIdx < 100; ++keyIdx) {
+        auto file = std::make_shared<minifi::FlowFileRecord>(ff_repository, nullptr);
+        file->setUuidConnection(connection->getUUIDStr());
+        // Serialize is sync
+        file->Serialize();
+        if (keyIdx % 2 == 0) {
+          // delete every second flowFile
+          ff_repository->Delete(file->getUUIDStr());
+        }
+      }
+      stop = true;
+      // wait for the shutdown thread to start waiting for the worker thread
+      std::this_thread::sleep_for(std::chrono::milliseconds{100});
+    };
+
+    ff_repository->setConnectionMap(connectionMap);
+    ff_repository->initialize(config);

Review comment:
       Errors should make the test fail early.
   ```suggestion
       const bool init_success = ff_repository->initialize(config);
       REQUIRE(init_success);
   ```

##########
File path: libminifi/test/rocksdb-tests/RepoTests.cpp
##########
@@ -326,3 +326,82 @@ TEST_CASE("Test FlowFile Restore", "[TestFFR6]") {
 
   LogTestController::getInstance().reset();
 }
+
+TEST_CASE("Flush deleted flowfiles before shutdown", "[TestFFR7]") {
+  using ConnectionMap = std::map<std::string, std::shared_ptr<core::Connectable>>;
+
+  class TestFlowFileRepository: public core::repository::FlowFileRepository{
+   public:
+    explicit TestFlowFileRepository(const std::string& name)
+        : core::SerializableComponent(name),
+          FlowFileRepository(name, FLOWFILE_REPOSITORY_DIRECTORY, MAX_FLOWFILE_REPOSITORY_ENTRY_LIFE_TIME,
+                             MAX_FLOWFILE_REPOSITORY_STORAGE_SIZE, 1) {}
+    void flush() override {
+      FlowFileRepository::flush();
+      if (onFlush_) {
+        onFlush_();
+      }
+    }
+    std::function<void()> onFlush_;
+  };
+
+  TestController testController;
+  char format[] = "/tmp/testRepo.XXXXXX";
+  auto dir = testController.createTempDirectory(format);
+
+  auto config = std::make_shared<minifi::Configure>();
+  config->set(minifi::Configure::nifi_flowfile_repository_directory_default, utils::file::FileUtils::concat_path(dir, "flowfile_repository"));
+
+  auto connection = std::make_shared<minifi::Connection>(nullptr, nullptr, "Connection");
+  ConnectionMap connectionMap{{connection->getUUIDStr(), connection}};
+  // initialize repository
+  {
+    std::shared_ptr<TestFlowFileRepository> ff_repository = std::make_shared<TestFlowFileRepository>("flowFileRepository");
+
+    std::atomic<int> flush_counter{0};
+
+    std::atomic<bool> stop{false};
+    std::thread shutdown{[&] {
+      while (!stop.load()) {}
+      ff_repository->stop();
+    }};
+
+    ff_repository->onFlush_ = [&] {
+      if (++flush_counter != 1) {
+        return;
+      }
+      
+      for (int keyIdx = 0; keyIdx < 100; ++keyIdx) {
+        auto file = std::make_shared<minifi::FlowFileRecord>(ff_repository, nullptr);
+        file->setUuidConnection(connection->getUUIDStr());
+        // Serialize is sync
+        file->Serialize();
+        if (keyIdx % 2 == 0) {
+          // delete every second flowFile
+          ff_repository->Delete(file->getUUIDStr());
+        }
+      }
+      stop = true;
+      // wait for the shutdown thread to start waiting for the worker thread
+      std::this_thread::sleep_for(std::chrono::milliseconds{100});
+    };
+
+    ff_repository->setConnectionMap(connectionMap);
+    ff_repository->initialize(config);
+    ff_repository->loadComponent(nullptr);
+    ff_repository->start();
+
+    shutdown.join();
+  }
+
+  // check if the deleted flowfiles are indeed deleted
+  {
+    std::shared_ptr<TestFlowFileRepository> ff_repository = std::make_shared<TestFlowFileRepository>("flowFileRepository");
+    ff_repository->setConnectionMap(connectionMap);
+    ff_repository->initialize(config);
+    ff_repository->loadComponent(nullptr);
+    ff_repository->start();
+    std::this_thread::sleep_for(std::chrono::milliseconds{100});
+    REQUIRE(connection->getQueueSize() == 50);
+  }
+}

Review comment:
       All text files (including source) should end with a newline character. Please add one.

##########
File path: libminifi/test/rocksdb-tests/RepoTests.cpp
##########
@@ -326,3 +326,82 @@ TEST_CASE("Test FlowFile Restore", "[TestFFR6]") {
 
   LogTestController::getInstance().reset();
 }
+
+TEST_CASE("Flush deleted flowfiles before shutdown", "[TestFFR7]") {
+  using ConnectionMap = std::map<std::string, std::shared_ptr<core::Connectable>>;
+
+  class TestFlowFileRepository: public core::repository::FlowFileRepository{
+   public:
+    explicit TestFlowFileRepository(const std::string& name)
+        : core::SerializableComponent(name),
+          FlowFileRepository(name, FLOWFILE_REPOSITORY_DIRECTORY, MAX_FLOWFILE_REPOSITORY_ENTRY_LIFE_TIME,
+                             MAX_FLOWFILE_REPOSITORY_STORAGE_SIZE, 1) {}
+    void flush() override {
+      FlowFileRepository::flush();
+      if (onFlush_) {
+        onFlush_();
+      }
+    }
+    std::function<void()> onFlush_;
+  };
+
+  TestController testController;
+  char format[] = "/tmp/testRepo.XXXXXX";

Review comment:
       Repo tests should place temporary repos on `/var/tmp`, to avoid crashing on systems with `/tmp` mounted as `tmpfs`.
   
   https://issues.apache.org/jira/browse/MINIFICPP-1188
   
   ```suggestion
     char format[] = "/var/tmp/testRepo.XXXXXX";
   ```

##########
File path: libminifi/test/rocksdb-tests/RepoTests.cpp
##########
@@ -326,3 +326,82 @@ TEST_CASE("Test FlowFile Restore", "[TestFFR6]") {
 
   LogTestController::getInstance().reset();
 }
+
+TEST_CASE("Flush deleted flowfiles before shutdown", "[TestFFR7]") {
+  using ConnectionMap = std::map<std::string, std::shared_ptr<core::Connectable>>;

Review comment:
       It seems like we don't win anything with this alias.

##########
File path: libminifi/src/FlowFileRecord.cpp
##########
@@ -366,7 +366,7 @@ bool FlowFileRecord::DeSerialize(const uint8_t *buffer, const int bufferSize) {
     return false;
   }
 
-  if (nullptr == claim_) {

Review comment:
       Why is this change? As far as I understand, here we make a potentially empty claim for the flow file, but the new version only makes a claim if there is associated content. I think empty `ResourceClaim`s are valid for empty flow files.
   
   related: https://issues.apache.org/jira/browse/MINIFICPP-1122




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org