You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by GitBox <gi...@apache.org> on 2020/06/29 12:51:46 UTC

[GitHub] [nifi-minifi-cpp] adamdebreceni opened a new pull request #826: MINIFICPP-1274 - Commit delete operation before shutdown

adamdebreceni opened a new pull request #826:
URL: https://github.com/apache/nifi-minifi-cpp/pull/826


   Thank you for submitting a contribution to Apache NiFi - MiNiFi C++.
   
   In order to streamline the review of the contribution we ask you
   to ensure the following steps have been taken:
   
   ### For all changes:
   - [ ] Is there a JIRA ticket associated with this PR? Is it referenced
        in the commit message?
   
   - [ ] Does your PR title start with MINIFICPP-XXXX where XXXX is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character.
   
   - [ ] Has your PR been rebased against the latest commit within the target branch (typically master)?
   
   - [ ] Is your initial contribution a single, squashed commit?
   
   ### For code changes:
   - [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)?
   - [ ] If applicable, have you updated the LICENSE file?
   - [ ] If applicable, have you updated the NOTICE file?
   
   ### For documentation related changes:
   - [ ] Have you ensured that format looks appropriate for the output in which it is rendered?
   
   ### Note:
   Please ensure that once the PR is submitted, you check travis-ci for build issues and submit an update to your PR as soon as possible.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a change in pull request #826: MINIFICPP-1274 - Commit delete operation before shutdown

Posted by GitBox <gi...@apache.org>.
adamdebreceni commented on a change in pull request #826:
URL: https://github.com/apache/nifi-minifi-cpp/pull/826#discussion_r447447817



##########
File path: libminifi/test/rocksdb-tests/RepoTests.cpp
##########
@@ -326,3 +326,82 @@ TEST_CASE("Test FlowFile Restore", "[TestFFR6]") {
 
   LogTestController::getInstance().reset();
 }
+
+TEST_CASE("Flush deleted flowfiles before shutdown", "[TestFFR7]") {
+  using ConnectionMap = std::map<std::string, std::shared_ptr<core::Connectable>>;

Review comment:
       indeed, it is only used once (previously it was used multiple times) will remove




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a change in pull request #826: MINIFICPP-1274 - Commit delete operation before shutdown

Posted by GitBox <gi...@apache.org>.
adamdebreceni commented on a change in pull request #826:
URL: https://github.com/apache/nifi-minifi-cpp/pull/826#discussion_r447461792



##########
File path: libminifi/test/rocksdb-tests/RepoTests.cpp
##########
@@ -326,3 +326,82 @@ TEST_CASE("Test FlowFile Restore", "[TestFFR6]") {
 
   LogTestController::getInstance().reset();
 }
+
+TEST_CASE("Flush deleted flowfiles before shutdown", "[TestFFR7]") {
+  using ConnectionMap = std::map<std::string, std::shared_ptr<core::Connectable>>;

Review comment:
       done

##########
File path: libminifi/src/FlowFileRecord.cpp
##########
@@ -366,7 +366,7 @@ bool FlowFileRecord::DeSerialize(const uint8_t *buffer, const int bufferSize) {
     return false;
   }
 
-  if (nullptr == claim_) {

Review comment:
       done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #826: MINIFICPP-1274 - Commit delete operation before shutdown

Posted by GitBox <gi...@apache.org>.
arpadboda commented on a change in pull request #826:
URL: https://github.com/apache/nifi-minifi-cpp/pull/826#discussion_r447269648



##########
File path: libminifi/test/rocksdb-tests/RepoTests.cpp
##########
@@ -326,3 +326,82 @@ TEST_CASE("Test FlowFile Restore", "[TestFFR6]") {
 
   LogTestController::getInstance().reset();
 }
+
+TEST_CASE("Flush deleted flowfiles before shutdown", "[TestFFR7]") {
+  using ConnectionMap = std::map<std::string, std::shared_ptr<core::Connectable>>;
+
+  class TestFlowFileRepository: public core::repository::FlowFileRepository{
+   public:
+    explicit TestFlowFileRepository(const std::string& name)
+        : core::SerializableComponent(name),
+          FlowFileRepository(name, FLOWFILE_REPOSITORY_DIRECTORY, MAX_FLOWFILE_REPOSITORY_ENTRY_LIFE_TIME,
+                             MAX_FLOWFILE_REPOSITORY_STORAGE_SIZE, 1) {}
+    void flush() override {
+      FlowFileRepository::flush();
+      if (onFlush_) {
+        onFlush_();
+      }
+    }
+    std::function<void()> onFlush_;
+  };
+
+  TestController testController;
+  char format[] = "/tmp/testRepo.XXXXXX";
+  auto dir = testController.createTempDirectory(format);
+
+  auto config = std::make_shared<minifi::Configure>();
+  config->set(minifi::Configure::nifi_flowfile_repository_directory_default, utils::file::FileUtils::concat_path(dir, "flowfile_repository"));
+
+  auto connection = std::make_shared<minifi::Connection>(nullptr, nullptr, "Connection");
+  ConnectionMap connectionMap{{connection->getUUIDStr(), connection}};
+  // initialize repository
+  {
+    std::shared_ptr<TestFlowFileRepository> ff_repository = std::make_shared<TestFlowFileRepository>("flowFileRepository");
+
+    std::atomic<int> flush_counter{0};
+
+    std::atomic<bool> stop{false};
+    std::thread shutdown{[&] {
+      while (!stop.load()) {}
+      ff_repository->stop();
+    }};
+
+    ff_repository->onFlush_ = [&] {
+      if (++flush_counter != 1) {
+        return;
+      }
+      
+      for (int keyIdx = 0; keyIdx < 100; ++keyIdx) {
+        auto file = std::make_shared<minifi::FlowFileRecord>(ff_repository, nullptr);
+        file->setUuidConnection(connection->getUUIDStr());
+        // Serialize is sync
+        file->Serialize();
+        if (keyIdx % 2 == 0) {
+          // delete every second flowFile
+          ff_repository->Delete(file->getUUIDStr());
+        }
+      }
+      stop = true;
+      // wait for the shutdown thread to start waiting for the worker thread
+      std::this_thread::sleep_for(std::chrono::milliseconds{100});
+    };
+
+    ff_repository->setConnectionMap(connectionMap);
+    ff_repository->initialize(config);

Review comment:
       More like ```REQUIRE(ff_repository->initialize(config))``` imho :)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #826: MINIFICPP-1274 - Commit delete operation before shutdown

Posted by GitBox <gi...@apache.org>.
szaszm commented on a change in pull request #826:
URL: https://github.com/apache/nifi-minifi-cpp/pull/826#discussion_r447559056



##########
File path: libminifi/test/rocksdb-tests/RepoTests.cpp
##########
@@ -326,3 +326,82 @@ TEST_CASE("Test FlowFile Restore", "[TestFFR6]") {
 
   LogTestController::getInstance().reset();
 }
+
+TEST_CASE("Flush deleted flowfiles before shutdown", "[TestFFR7]") {
+  using ConnectionMap = std::map<std::string, std::shared_ptr<core::Connectable>>;
+
+  class TestFlowFileRepository: public core::repository::FlowFileRepository{
+   public:
+    explicit TestFlowFileRepository(const std::string& name)
+        : core::SerializableComponent(name),
+          FlowFileRepository(name, FLOWFILE_REPOSITORY_DIRECTORY, MAX_FLOWFILE_REPOSITORY_ENTRY_LIFE_TIME,
+                             MAX_FLOWFILE_REPOSITORY_STORAGE_SIZE, 1) {}
+    void flush() override {
+      FlowFileRepository::flush();
+      if (onFlush_) {
+        onFlush_();
+      }
+    }
+    std::function<void()> onFlush_;
+  };
+
+  TestController testController;
+  char format[] = "/tmp/testRepo.XXXXXX";
+  auto dir = testController.createTempDirectory(format);
+
+  auto config = std::make_shared<minifi::Configure>();
+  config->set(minifi::Configure::nifi_flowfile_repository_directory_default, utils::file::FileUtils::concat_path(dir, "flowfile_repository"));
+
+  auto connection = std::make_shared<minifi::Connection>(nullptr, nullptr, "Connection");
+  ConnectionMap connectionMap{{connection->getUUIDStr(), connection}};
+  // initialize repository
+  {
+    std::shared_ptr<TestFlowFileRepository> ff_repository = std::make_shared<TestFlowFileRepository>("flowFileRepository");
+
+    std::atomic<int> flush_counter{0};
+
+    std::atomic<bool> stop{false};
+    std::thread shutdown{[&] {
+      while (!stop.load()) {}
+      ff_repository->stop();
+    }};
+
+    ff_repository->onFlush_ = [&] {
+      if (++flush_counter != 1) {
+        return;
+      }
+      
+      for (int keyIdx = 0; keyIdx < 100; ++keyIdx) {
+        auto file = std::make_shared<minifi::FlowFileRecord>(ff_repository, nullptr);
+        file->setUuidConnection(connection->getUUIDStr());
+        // Serialize is sync
+        file->Serialize();
+        if (keyIdx % 2 == 0) {
+          // delete every second flowFile
+          ff_repository->Delete(file->getUUIDStr());
+        }
+      }
+      stop = true;
+      // wait for the shutdown thread to start waiting for the worker thread
+      std::this_thread::sleep_for(std::chrono::milliseconds{100});
+    };
+
+    ff_repository->setConnectionMap(connectionMap);
+    ff_repository->initialize(config);
+    ff_repository->loadComponent(nullptr);
+    ff_repository->start();
+
+    shutdown.join();
+  }
+
+  // check if the deleted flowfiles are indeed deleted
+  {
+    std::shared_ptr<TestFlowFileRepository> ff_repository = std::make_shared<TestFlowFileRepository>("flowFileRepository");
+    ff_repository->setConnectionMap(connectionMap);
+    ff_repository->initialize(config);
+    ff_repository->loadComponent(nullptr);
+    ff_repository->start();
+    std::this_thread::sleep_for(std::chrono::milliseconds{100});
+    REQUIRE(connection->getQueueSize() == 50);
+  }
+}

Review comment:
       https://stackoverflow.com/a/729795/3997716
   
   Maybe it's more intuitive if you think about them as line-endings, not line separators.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #826: MINIFICPP-1274 - Commit delete operation before shutdown

Posted by GitBox <gi...@apache.org>.
szaszm commented on a change in pull request #826:
URL: https://github.com/apache/nifi-minifi-cpp/pull/826#discussion_r447547205



##########
File path: libminifi/test/rocksdb-tests/RepoTests.cpp
##########
@@ -326,3 +326,82 @@ TEST_CASE("Test FlowFile Restore", "[TestFFR6]") {
 
   LogTestController::getInstance().reset();
 }
+
+TEST_CASE("Flush deleted flowfiles before shutdown", "[TestFFR7]") {
+  using ConnectionMap = std::map<std::string, std::shared_ptr<core::Connectable>>;
+
+  class TestFlowFileRepository: public core::repository::FlowFileRepository{
+   public:
+    explicit TestFlowFileRepository(const std::string& name)
+        : core::SerializableComponent(name),
+          FlowFileRepository(name, FLOWFILE_REPOSITORY_DIRECTORY, MAX_FLOWFILE_REPOSITORY_ENTRY_LIFE_TIME,
+                             MAX_FLOWFILE_REPOSITORY_STORAGE_SIZE, 1) {}
+    void flush() override {
+      FlowFileRepository::flush();
+      if (onFlush_) {
+        onFlush_();
+      }
+    }
+    std::function<void()> onFlush_;
+  };
+
+  TestController testController;
+  char format[] = "/tmp/testRepo.XXXXXX";
+  auto dir = testController.createTempDirectory(format);
+
+  auto config = std::make_shared<minifi::Configure>();
+  config->set(minifi::Configure::nifi_flowfile_repository_directory_default, utils::file::FileUtils::concat_path(dir, "flowfile_repository"));
+
+  auto connection = std::make_shared<minifi::Connection>(nullptr, nullptr, "Connection");
+  ConnectionMap connectionMap{{connection->getUUIDStr(), connection}};
+  // initialize repository
+  {
+    std::shared_ptr<TestFlowFileRepository> ff_repository = std::make_shared<TestFlowFileRepository>("flowFileRepository");
+
+    std::atomic<int> flush_counter{0};
+
+    std::atomic<bool> stop{false};
+    std::thread shutdown{[&] {
+      while (!stop.load()) {}
+      ff_repository->stop();
+    }};
+
+    ff_repository->onFlush_ = [&] {
+      if (++flush_counter != 1) {
+        return;
+      }
+      
+      for (int keyIdx = 0; keyIdx < 100; ++keyIdx) {
+        auto file = std::make_shared<minifi::FlowFileRecord>(ff_repository, nullptr);
+        file->setUuidConnection(connection->getUUIDStr());
+        // Serialize is sync
+        file->Serialize();
+        if (keyIdx % 2 == 0) {
+          // delete every second flowFile
+          ff_repository->Delete(file->getUUIDStr());
+        }
+      }
+      stop = true;
+      // wait for the shutdown thread to start waiting for the worker thread
+      std::this_thread::sleep_for(std::chrono::milliseconds{100});
+    };
+
+    ff_repository->setConnectionMap(connectionMap);
+    ff_repository->initialize(config);

Review comment:
       I agree that returning a bool to signal error is not the best idea. I would prefer the use of exceptions for all errors that are not usually part of the program flow (i.e. exceptional).
   
   I'm not aware of a list of changes we are "holding back", but creating a Jira issue with Fix version = "1.0" could be one way of maintaining such a list, because we can search for those later.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #826: MINIFICPP-1274 - Commit delete operation before shutdown

Posted by GitBox <gi...@apache.org>.
szaszm commented on a change in pull request #826:
URL: https://github.com/apache/nifi-minifi-cpp/pull/826#discussion_r447059548



##########
File path: libminifi/test/rocksdb-tests/RepoTests.cpp
##########
@@ -326,3 +326,82 @@ TEST_CASE("Test FlowFile Restore", "[TestFFR6]") {
 
   LogTestController::getInstance().reset();
 }
+
+TEST_CASE("Flush deleted flowfiles before shutdown", "[TestFFR7]") {
+  using ConnectionMap = std::map<std::string, std::shared_ptr<core::Connectable>>;
+
+  class TestFlowFileRepository: public core::repository::FlowFileRepository{
+   public:
+    explicit TestFlowFileRepository(const std::string& name)
+        : core::SerializableComponent(name),
+          FlowFileRepository(name, FLOWFILE_REPOSITORY_DIRECTORY, MAX_FLOWFILE_REPOSITORY_ENTRY_LIFE_TIME,
+                             MAX_FLOWFILE_REPOSITORY_STORAGE_SIZE, 1) {}
+    void flush() override {
+      FlowFileRepository::flush();
+      if (onFlush_) {
+        onFlush_();
+      }
+    }
+    std::function<void()> onFlush_;
+  };
+
+  TestController testController;
+  char format[] = "/tmp/testRepo.XXXXXX";
+  auto dir = testController.createTempDirectory(format);
+
+  auto config = std::make_shared<minifi::Configure>();
+  config->set(minifi::Configure::nifi_flowfile_repository_directory_default, utils::file::FileUtils::concat_path(dir, "flowfile_repository"));
+
+  auto connection = std::make_shared<minifi::Connection>(nullptr, nullptr, "Connection");
+  ConnectionMap connectionMap{{connection->getUUIDStr(), connection}};
+  // initialize repository
+  {
+    std::shared_ptr<TestFlowFileRepository> ff_repository = std::make_shared<TestFlowFileRepository>("flowFileRepository");
+
+    std::atomic<int> flush_counter{0};
+
+    std::atomic<bool> stop{false};
+    std::thread shutdown{[&] {
+      while (!stop.load()) {}
+      ff_repository->stop();
+    }};
+
+    ff_repository->onFlush_ = [&] {
+      if (++flush_counter != 1) {
+        return;
+      }
+      
+      for (int keyIdx = 0; keyIdx < 100; ++keyIdx) {
+        auto file = std::make_shared<minifi::FlowFileRecord>(ff_repository, nullptr);
+        file->setUuidConnection(connection->getUUIDStr());
+        // Serialize is sync
+        file->Serialize();
+        if (keyIdx % 2 == 0) {
+          // delete every second flowFile
+          ff_repository->Delete(file->getUUIDStr());
+        }
+      }
+      stop = true;
+      // wait for the shutdown thread to start waiting for the worker thread
+      std::this_thread::sleep_for(std::chrono::milliseconds{100});
+    };
+
+    ff_repository->setConnectionMap(connectionMap);
+    ff_repository->initialize(config);

Review comment:
       Errors should make the test fail early.
   ```suggestion
       const bool init_success = ff_repository->initialize(config);
       REQUIRE(init_success);
   ```

##########
File path: libminifi/test/rocksdb-tests/RepoTests.cpp
##########
@@ -326,3 +326,82 @@ TEST_CASE("Test FlowFile Restore", "[TestFFR6]") {
 
   LogTestController::getInstance().reset();
 }
+
+TEST_CASE("Flush deleted flowfiles before shutdown", "[TestFFR7]") {
+  using ConnectionMap = std::map<std::string, std::shared_ptr<core::Connectable>>;
+
+  class TestFlowFileRepository: public core::repository::FlowFileRepository{
+   public:
+    explicit TestFlowFileRepository(const std::string& name)
+        : core::SerializableComponent(name),
+          FlowFileRepository(name, FLOWFILE_REPOSITORY_DIRECTORY, MAX_FLOWFILE_REPOSITORY_ENTRY_LIFE_TIME,
+                             MAX_FLOWFILE_REPOSITORY_STORAGE_SIZE, 1) {}
+    void flush() override {
+      FlowFileRepository::flush();
+      if (onFlush_) {
+        onFlush_();
+      }
+    }
+    std::function<void()> onFlush_;
+  };
+
+  TestController testController;
+  char format[] = "/tmp/testRepo.XXXXXX";
+  auto dir = testController.createTempDirectory(format);
+
+  auto config = std::make_shared<minifi::Configure>();
+  config->set(minifi::Configure::nifi_flowfile_repository_directory_default, utils::file::FileUtils::concat_path(dir, "flowfile_repository"));
+
+  auto connection = std::make_shared<minifi::Connection>(nullptr, nullptr, "Connection");
+  ConnectionMap connectionMap{{connection->getUUIDStr(), connection}};
+  // initialize repository
+  {
+    std::shared_ptr<TestFlowFileRepository> ff_repository = std::make_shared<TestFlowFileRepository>("flowFileRepository");
+
+    std::atomic<int> flush_counter{0};
+
+    std::atomic<bool> stop{false};
+    std::thread shutdown{[&] {
+      while (!stop.load()) {}
+      ff_repository->stop();
+    }};
+
+    ff_repository->onFlush_ = [&] {
+      if (++flush_counter != 1) {
+        return;
+      }
+      
+      for (int keyIdx = 0; keyIdx < 100; ++keyIdx) {
+        auto file = std::make_shared<minifi::FlowFileRecord>(ff_repository, nullptr);
+        file->setUuidConnection(connection->getUUIDStr());
+        // Serialize is sync
+        file->Serialize();
+        if (keyIdx % 2 == 0) {
+          // delete every second flowFile
+          ff_repository->Delete(file->getUUIDStr());
+        }
+      }
+      stop = true;
+      // wait for the shutdown thread to start waiting for the worker thread
+      std::this_thread::sleep_for(std::chrono::milliseconds{100});
+    };
+
+    ff_repository->setConnectionMap(connectionMap);
+    ff_repository->initialize(config);
+    ff_repository->loadComponent(nullptr);
+    ff_repository->start();
+
+    shutdown.join();
+  }
+
+  // check if the deleted flowfiles are indeed deleted
+  {
+    std::shared_ptr<TestFlowFileRepository> ff_repository = std::make_shared<TestFlowFileRepository>("flowFileRepository");
+    ff_repository->setConnectionMap(connectionMap);
+    ff_repository->initialize(config);
+    ff_repository->loadComponent(nullptr);
+    ff_repository->start();
+    std::this_thread::sleep_for(std::chrono::milliseconds{100});
+    REQUIRE(connection->getQueueSize() == 50);
+  }
+}

Review comment:
       All text files (including source) should end with a newline character. Please add one.

##########
File path: libminifi/test/rocksdb-tests/RepoTests.cpp
##########
@@ -326,3 +326,82 @@ TEST_CASE("Test FlowFile Restore", "[TestFFR6]") {
 
   LogTestController::getInstance().reset();
 }
+
+TEST_CASE("Flush deleted flowfiles before shutdown", "[TestFFR7]") {
+  using ConnectionMap = std::map<std::string, std::shared_ptr<core::Connectable>>;
+
+  class TestFlowFileRepository: public core::repository::FlowFileRepository{
+   public:
+    explicit TestFlowFileRepository(const std::string& name)
+        : core::SerializableComponent(name),
+          FlowFileRepository(name, FLOWFILE_REPOSITORY_DIRECTORY, MAX_FLOWFILE_REPOSITORY_ENTRY_LIFE_TIME,
+                             MAX_FLOWFILE_REPOSITORY_STORAGE_SIZE, 1) {}
+    void flush() override {
+      FlowFileRepository::flush();
+      if (onFlush_) {
+        onFlush_();
+      }
+    }
+    std::function<void()> onFlush_;
+  };
+
+  TestController testController;
+  char format[] = "/tmp/testRepo.XXXXXX";

Review comment:
       Repo tests should place temporary repos on `/var/tmp`, to avoid crashing on systems with `/tmp` mounted as `tmpfs`.
   
   https://issues.apache.org/jira/browse/MINIFICPP-1188
   
   ```suggestion
     char format[] = "/var/tmp/testRepo.XXXXXX";
   ```

##########
File path: libminifi/test/rocksdb-tests/RepoTests.cpp
##########
@@ -326,3 +326,82 @@ TEST_CASE("Test FlowFile Restore", "[TestFFR6]") {
 
   LogTestController::getInstance().reset();
 }
+
+TEST_CASE("Flush deleted flowfiles before shutdown", "[TestFFR7]") {
+  using ConnectionMap = std::map<std::string, std::shared_ptr<core::Connectable>>;

Review comment:
       It seems like we don't win anything with this alias.

##########
File path: libminifi/src/FlowFileRecord.cpp
##########
@@ -366,7 +366,7 @@ bool FlowFileRecord::DeSerialize(const uint8_t *buffer, const int bufferSize) {
     return false;
   }
 
-  if (nullptr == claim_) {

Review comment:
       Why is this change? As far as I understand, here we make a potentially empty claim for the flow file, but the new version only makes a claim if there is associated content. I think empty `ResourceClaim`s are valid for empty flow files.
   
   related: https://issues.apache.org/jira/browse/MINIFICPP-1122




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a change in pull request #826: MINIFICPP-1274 - Commit delete operation before shutdown

Posted by GitBox <gi...@apache.org>.
adamdebreceni commented on a change in pull request #826:
URL: https://github.com/apache/nifi-minifi-cpp/pull/826#discussion_r447454207



##########
File path: libminifi/test/rocksdb-tests/RepoTests.cpp
##########
@@ -326,3 +326,82 @@ TEST_CASE("Test FlowFile Restore", "[TestFFR6]") {
 
   LogTestController::getInstance().reset();
 }
+
+TEST_CASE("Flush deleted flowfiles before shutdown", "[TestFFR7]") {
+  using ConnectionMap = std::map<std::string, std::shared_ptr<core::Connectable>>;
+
+  class TestFlowFileRepository: public core::repository::FlowFileRepository{
+   public:
+    explicit TestFlowFileRepository(const std::string& name)
+        : core::SerializableComponent(name),
+          FlowFileRepository(name, FLOWFILE_REPOSITORY_DIRECTORY, MAX_FLOWFILE_REPOSITORY_ENTRY_LIFE_TIME,
+                             MAX_FLOWFILE_REPOSITORY_STORAGE_SIZE, 1) {}
+    void flush() override {
+      FlowFileRepository::flush();
+      if (onFlush_) {
+        onFlush_();
+      }
+    }
+    std::function<void()> onFlush_;
+  };
+
+  TestController testController;
+  char format[] = "/tmp/testRepo.XXXXXX";
+  auto dir = testController.createTempDirectory(format);
+
+  auto config = std::make_shared<minifi::Configure>();
+  config->set(minifi::Configure::nifi_flowfile_repository_directory_default, utils::file::FileUtils::concat_path(dir, "flowfile_repository"));
+
+  auto connection = std::make_shared<minifi::Connection>(nullptr, nullptr, "Connection");
+  ConnectionMap connectionMap{{connection->getUUIDStr(), connection}};
+  // initialize repository
+  {
+    std::shared_ptr<TestFlowFileRepository> ff_repository = std::make_shared<TestFlowFileRepository>("flowFileRepository");
+
+    std::atomic<int> flush_counter{0};
+
+    std::atomic<bool> stop{false};
+    std::thread shutdown{[&] {
+      while (!stop.load()) {}
+      ff_repository->stop();
+    }};
+
+    ff_repository->onFlush_ = [&] {
+      if (++flush_counter != 1) {
+        return;
+      }
+      
+      for (int keyIdx = 0; keyIdx < 100; ++keyIdx) {
+        auto file = std::make_shared<minifi::FlowFileRecord>(ff_repository, nullptr);
+        file->setUuidConnection(connection->getUUIDStr());
+        // Serialize is sync
+        file->Serialize();
+        if (keyIdx % 2 == 0) {
+          // delete every second flowFile
+          ff_repository->Delete(file->getUUIDStr());
+        }
+      }
+      stop = true;
+      // wait for the shutdown thread to start waiting for the worker thread
+      std::this_thread::sleep_for(std::chrono::milliseconds{100});
+    };
+
+    ff_repository->setConnectionMap(connectionMap);
+    ff_repository->initialize(config);
+    ff_repository->loadComponent(nullptr);
+    ff_repository->start();
+
+    shutdown.join();
+  }
+
+  // check if the deleted flowfiles are indeed deleted
+  {
+    std::shared_ptr<TestFlowFileRepository> ff_repository = std::make_shared<TestFlowFileRepository>("flowFileRepository");
+    ff_repository->setConnectionMap(connectionMap);
+    ff_repository->initialize(config);
+    ff_repository->loadComponent(nullptr);
+    ff_repository->start();
+    std::this_thread::sleep_for(std::chrono::milliseconds{100});
+    REQUIRE(connection->getQueueSize() == 50);
+  }
+}

Review comment:
       will add, but why do we do this?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] arpadboda closed pull request #826: MINIFICPP-1274 - Commit delete operation before shutdown

Posted by GitBox <gi...@apache.org>.
arpadboda closed pull request #826:
URL: https://github.com/apache/nifi-minifi-cpp/pull/826


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a change in pull request #826: MINIFICPP-1274 - Commit delete operation before shutdown

Posted by GitBox <gi...@apache.org>.
adamdebreceni commented on a change in pull request #826:
URL: https://github.com/apache/nifi-minifi-cpp/pull/826#discussion_r447464781



##########
File path: libminifi/test/rocksdb-tests/RepoTests.cpp
##########
@@ -326,3 +326,82 @@ TEST_CASE("Test FlowFile Restore", "[TestFFR6]") {
 
   LogTestController::getInstance().reset();
 }
+
+TEST_CASE("Flush deleted flowfiles before shutdown", "[TestFFR7]") {
+  using ConnectionMap = std::map<std::string, std::shared_ptr<core::Connectable>>;
+
+  class TestFlowFileRepository: public core::repository::FlowFileRepository{
+   public:
+    explicit TestFlowFileRepository(const std::string& name)
+        : core::SerializableComponent(name),
+          FlowFileRepository(name, FLOWFILE_REPOSITORY_DIRECTORY, MAX_FLOWFILE_REPOSITORY_ENTRY_LIFE_TIME,
+                             MAX_FLOWFILE_REPOSITORY_STORAGE_SIZE, 1) {}
+    void flush() override {
+      FlowFileRepository::flush();
+      if (onFlush_) {
+        onFlush_();
+      }
+    }
+    std::function<void()> onFlush_;
+  };
+
+  TestController testController;
+  char format[] = "/tmp/testRepo.XXXXXX";
+  auto dir = testController.createTempDirectory(format);
+
+  auto config = std::make_shared<minifi::Configure>();
+  config->set(minifi::Configure::nifi_flowfile_repository_directory_default, utils::file::FileUtils::concat_path(dir, "flowfile_repository"));
+
+  auto connection = std::make_shared<minifi::Connection>(nullptr, nullptr, "Connection");
+  ConnectionMap connectionMap{{connection->getUUIDStr(), connection}};
+  // initialize repository
+  {
+    std::shared_ptr<TestFlowFileRepository> ff_repository = std::make_shared<TestFlowFileRepository>("flowFileRepository");
+
+    std::atomic<int> flush_counter{0};
+
+    std::atomic<bool> stop{false};
+    std::thread shutdown{[&] {
+      while (!stop.load()) {}
+      ff_repository->stop();
+    }};
+
+    ff_repository->onFlush_ = [&] {
+      if (++flush_counter != 1) {
+        return;
+      }
+      
+      for (int keyIdx = 0; keyIdx < 100; ++keyIdx) {
+        auto file = std::make_shared<minifi::FlowFileRecord>(ff_repository, nullptr);
+        file->setUuidConnection(connection->getUUIDStr());
+        // Serialize is sync
+        file->Serialize();
+        if (keyIdx % 2 == 0) {
+          // delete every second flowFile
+          ff_repository->Delete(file->getUUIDStr());
+        }
+      }
+      stop = true;
+      // wait for the shutdown thread to start waiting for the worker thread
+      std::this_thread::sleep_for(std::chrono::milliseconds{100});
+    };
+
+    ff_repository->setConnectionMap(connectionMap);
+    ff_repository->initialize(config);

Review comment:
       done
   
   agree with the fail fast philosophy, what I don't agree is `Repository::initialize` returning `false` on failure, I don't see how is the user expected to handle it, of the two places it is called, one doesn't check the return value and one terminates the application
   
   do we have a list of changes we were holding back on, until a new major release?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #826: MINIFICPP-1274 - Commit delete operation before shutdown

Posted by GitBox <gi...@apache.org>.
szaszm commented on a change in pull request #826:
URL: https://github.com/apache/nifi-minifi-cpp/pull/826#discussion_r447559056



##########
File path: libminifi/test/rocksdb-tests/RepoTests.cpp
##########
@@ -326,3 +326,82 @@ TEST_CASE("Test FlowFile Restore", "[TestFFR6]") {
 
   LogTestController::getInstance().reset();
 }
+
+TEST_CASE("Flush deleted flowfiles before shutdown", "[TestFFR7]") {
+  using ConnectionMap = std::map<std::string, std::shared_ptr<core::Connectable>>;
+
+  class TestFlowFileRepository: public core::repository::FlowFileRepository{
+   public:
+    explicit TestFlowFileRepository(const std::string& name)
+        : core::SerializableComponent(name),
+          FlowFileRepository(name, FLOWFILE_REPOSITORY_DIRECTORY, MAX_FLOWFILE_REPOSITORY_ENTRY_LIFE_TIME,
+                             MAX_FLOWFILE_REPOSITORY_STORAGE_SIZE, 1) {}
+    void flush() override {
+      FlowFileRepository::flush();
+      if (onFlush_) {
+        onFlush_();
+      }
+    }
+    std::function<void()> onFlush_;
+  };
+
+  TestController testController;
+  char format[] = "/tmp/testRepo.XXXXXX";
+  auto dir = testController.createTempDirectory(format);
+
+  auto config = std::make_shared<minifi::Configure>();
+  config->set(minifi::Configure::nifi_flowfile_repository_directory_default, utils::file::FileUtils::concat_path(dir, "flowfile_repository"));
+
+  auto connection = std::make_shared<minifi::Connection>(nullptr, nullptr, "Connection");
+  ConnectionMap connectionMap{{connection->getUUIDStr(), connection}};
+  // initialize repository
+  {
+    std::shared_ptr<TestFlowFileRepository> ff_repository = std::make_shared<TestFlowFileRepository>("flowFileRepository");
+
+    std::atomic<int> flush_counter{0};
+
+    std::atomic<bool> stop{false};
+    std::thread shutdown{[&] {
+      while (!stop.load()) {}
+      ff_repository->stop();
+    }};
+
+    ff_repository->onFlush_ = [&] {
+      if (++flush_counter != 1) {
+        return;
+      }
+      
+      for (int keyIdx = 0; keyIdx < 100; ++keyIdx) {
+        auto file = std::make_shared<minifi::FlowFileRecord>(ff_repository, nullptr);
+        file->setUuidConnection(connection->getUUIDStr());
+        // Serialize is sync
+        file->Serialize();
+        if (keyIdx % 2 == 0) {
+          // delete every second flowFile
+          ff_repository->Delete(file->getUUIDStr());
+        }
+      }
+      stop = true;
+      // wait for the shutdown thread to start waiting for the worker thread
+      std::this_thread::sleep_for(std::chrono::milliseconds{100});
+    };
+
+    ff_repository->setConnectionMap(connectionMap);
+    ff_repository->initialize(config);
+    ff_repository->loadComponent(nullptr);
+    ff_repository->start();
+
+    shutdown.join();
+  }
+
+  // check if the deleted flowfiles are indeed deleted
+  {
+    std::shared_ptr<TestFlowFileRepository> ff_repository = std::make_shared<TestFlowFileRepository>("flowFileRepository");
+    ff_repository->setConnectionMap(connectionMap);
+    ff_repository->initialize(config);
+    ff_repository->loadComponent(nullptr);
+    ff_repository->start();
+    std::this_thread::sleep_for(std::chrono::milliseconds{100});
+    REQUIRE(connection->getQueueSize() == 50);
+  }
+}

Review comment:
       https://stackoverflow.com/a/729795/3997716




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a change in pull request #826: MINIFICPP-1274 - Commit delete operation before shutdown

Posted by GitBox <gi...@apache.org>.
adamdebreceni commented on a change in pull request #826:
URL: https://github.com/apache/nifi-minifi-cpp/pull/826#discussion_r447453017



##########
File path: libminifi/test/rocksdb-tests/RepoTests.cpp
##########
@@ -326,3 +326,82 @@ TEST_CASE("Test FlowFile Restore", "[TestFFR6]") {
 
   LogTestController::getInstance().reset();
 }
+
+TEST_CASE("Flush deleted flowfiles before shutdown", "[TestFFR7]") {
+  using ConnectionMap = std::map<std::string, std::shared_ptr<core::Connectable>>;
+
+  class TestFlowFileRepository: public core::repository::FlowFileRepository{
+   public:
+    explicit TestFlowFileRepository(const std::string& name)
+        : core::SerializableComponent(name),
+          FlowFileRepository(name, FLOWFILE_REPOSITORY_DIRECTORY, MAX_FLOWFILE_REPOSITORY_ENTRY_LIFE_TIME,
+                             MAX_FLOWFILE_REPOSITORY_STORAGE_SIZE, 1) {}
+    void flush() override {
+      FlowFileRepository::flush();
+      if (onFlush_) {
+        onFlush_();
+      }
+    }
+    std::function<void()> onFlush_;
+  };
+
+  TestController testController;
+  char format[] = "/tmp/testRepo.XXXXXX";

Review comment:
       interesting will rollback
   - do we have a list of platforms we plan to support?
   - is the `/var/tmp` the second best place? according to [this](https://refspecs.linuxfoundation.org/FHS_3.0/fhs/ch05s15.html) they are preserved between reboots
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #826: MINIFICPP-1274 - Commit delete operation before shutdown

Posted by GitBox <gi...@apache.org>.
arpadboda commented on a change in pull request #826:
URL: https://github.com/apache/nifi-minifi-cpp/pull/826#discussion_r447269150



##########
File path: libminifi/src/FlowFileRecord.cpp
##########
@@ -366,7 +366,7 @@ bool FlowFileRecord::DeSerialize(const uint8_t *buffer, const int bufferSize) {
     return false;
   }
 
-  if (nullptr == claim_) {

Review comment:
       Yep, the goal would be to *always* have claim, even if the content is empty.
   The reason behind is that we don't have to deal with failing streams and claims when handling flowfiles. A good example of this is PutFile or PublishKafka, where handling both types of "empty" flowfiles is a pain. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a change in pull request #826: MINIFICPP-1274 - Commit delete operation before shutdown

Posted by GitBox <gi...@apache.org>.
adamdebreceni commented on a change in pull request #826:
URL: https://github.com/apache/nifi-minifi-cpp/pull/826#discussion_r447465005



##########
File path: libminifi/test/rocksdb-tests/RepoTests.cpp
##########
@@ -326,3 +326,82 @@ TEST_CASE("Test FlowFile Restore", "[TestFFR6]") {
 
   LogTestController::getInstance().reset();
 }
+
+TEST_CASE("Flush deleted flowfiles before shutdown", "[TestFFR7]") {
+  using ConnectionMap = std::map<std::string, std::shared_ptr<core::Connectable>>;
+
+  class TestFlowFileRepository: public core::repository::FlowFileRepository{
+   public:
+    explicit TestFlowFileRepository(const std::string& name)
+        : core::SerializableComponent(name),
+          FlowFileRepository(name, FLOWFILE_REPOSITORY_DIRECTORY, MAX_FLOWFILE_REPOSITORY_ENTRY_LIFE_TIME,
+                             MAX_FLOWFILE_REPOSITORY_STORAGE_SIZE, 1) {}
+    void flush() override {
+      FlowFileRepository::flush();
+      if (onFlush_) {
+        onFlush_();
+      }
+    }
+    std::function<void()> onFlush_;
+  };
+
+  TestController testController;
+  char format[] = "/tmp/testRepo.XXXXXX";

Review comment:
       done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #826: MINIFICPP-1274 - Commit delete operation before shutdown

Posted by GitBox <gi...@apache.org>.
szaszm commented on a change in pull request #826:
URL: https://github.com/apache/nifi-minifi-cpp/pull/826#discussion_r447549997



##########
File path: libminifi/test/rocksdb-tests/RepoTests.cpp
##########
@@ -326,3 +326,82 @@ TEST_CASE("Test FlowFile Restore", "[TestFFR6]") {
 
   LogTestController::getInstance().reset();
 }
+
+TEST_CASE("Flush deleted flowfiles before shutdown", "[TestFFR7]") {
+  using ConnectionMap = std::map<std::string, std::shared_ptr<core::Connectable>>;
+
+  class TestFlowFileRepository: public core::repository::FlowFileRepository{
+   public:
+    explicit TestFlowFileRepository(const std::string& name)
+        : core::SerializableComponent(name),
+          FlowFileRepository(name, FLOWFILE_REPOSITORY_DIRECTORY, MAX_FLOWFILE_REPOSITORY_ENTRY_LIFE_TIME,
+                             MAX_FLOWFILE_REPOSITORY_STORAGE_SIZE, 1) {}
+    void flush() override {
+      FlowFileRepository::flush();
+      if (onFlush_) {
+        onFlush_();
+      }
+    }
+    std::function<void()> onFlush_;
+  };
+
+  TestController testController;
+  char format[] = "/tmp/testRepo.XXXXXX";

Review comment:
       1. I don't think we have a clear list. I think we just aim for a large GNU/Linux coverage.
   2. Not sure if it's the second best place. Normally we clean up temporary directories, but this doesn't happen when tests crash, so there may be leftovers in some rare situations. In a discussion around the time of fixing the referred bug, we considered disabling direct IO, but that would mean we don't test the same behavior that we are running, so going for a different directory seemed to be the better approach.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #826: MINIFICPP-1274 - Commit delete operation before shutdown

Posted by GitBox <gi...@apache.org>.
arpadboda commented on a change in pull request #826:
URL: https://github.com/apache/nifi-minifi-cpp/pull/826#discussion_r447271999



##########
File path: libminifi/test/rocksdb-tests/RepoTests.cpp
##########
@@ -326,3 +326,82 @@ TEST_CASE("Test FlowFile Restore", "[TestFFR6]") {
 
   LogTestController::getInstance().reset();
 }
+
+TEST_CASE("Flush deleted flowfiles before shutdown", "[TestFFR7]") {
+  using ConnectionMap = std::map<std::string, std::shared_ptr<core::Connectable>>;
+
+  class TestFlowFileRepository: public core::repository::FlowFileRepository{
+   public:
+    explicit TestFlowFileRepository(const std::string& name)
+        : core::SerializableComponent(name),
+          FlowFileRepository(name, FLOWFILE_REPOSITORY_DIRECTORY, MAX_FLOWFILE_REPOSITORY_ENTRY_LIFE_TIME,
+                             MAX_FLOWFILE_REPOSITORY_STORAGE_SIZE, 1) {}
+    void flush() override {
+      FlowFileRepository::flush();
+      if (onFlush_) {
+        onFlush_();
+      }
+    }
+    std::function<void()> onFlush_;
+  };
+
+  TestController testController;
+  char format[] = "/tmp/testRepo.XXXXXX";
+  auto dir = testController.createTempDirectory(format);
+
+  auto config = std::make_shared<minifi::Configure>();
+  config->set(minifi::Configure::nifi_flowfile_repository_directory_default, utils::file::FileUtils::concat_path(dir, "flowfile_repository"));
+
+  auto connection = std::make_shared<minifi::Connection>(nullptr, nullptr, "Connection");
+  ConnectionMap connectionMap{{connection->getUUIDStr(), connection}};
+  // initialize repository
+  {
+    std::shared_ptr<TestFlowFileRepository> ff_repository = std::make_shared<TestFlowFileRepository>("flowFileRepository");
+
+    std::atomic<int> flush_counter{0};
+
+    std::atomic<bool> stop{false};
+    std::thread shutdown{[&] {
+      while (!stop.load()) {}

Review comment:
       Maybe it's a bit too late, but it seems like a busy wait for me. 
   Which naturally works, but I would avoid that on hour already overloaded CI infra. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a change in pull request #826: MINIFICPP-1274 - Commit delete operation before shutdown

Posted by GitBox <gi...@apache.org>.
adamdebreceni commented on a change in pull request #826:
URL: https://github.com/apache/nifi-minifi-cpp/pull/826#discussion_r447465066



##########
File path: libminifi/test/rocksdb-tests/RepoTests.cpp
##########
@@ -326,3 +326,82 @@ TEST_CASE("Test FlowFile Restore", "[TestFFR6]") {
 
   LogTestController::getInstance().reset();
 }
+
+TEST_CASE("Flush deleted flowfiles before shutdown", "[TestFFR7]") {
+  using ConnectionMap = std::map<std::string, std::shared_ptr<core::Connectable>>;
+
+  class TestFlowFileRepository: public core::repository::FlowFileRepository{
+   public:
+    explicit TestFlowFileRepository(const std::string& name)
+        : core::SerializableComponent(name),
+          FlowFileRepository(name, FLOWFILE_REPOSITORY_DIRECTORY, MAX_FLOWFILE_REPOSITORY_ENTRY_LIFE_TIME,
+                             MAX_FLOWFILE_REPOSITORY_STORAGE_SIZE, 1) {}
+    void flush() override {
+      FlowFileRepository::flush();
+      if (onFlush_) {
+        onFlush_();
+      }
+    }
+    std::function<void()> onFlush_;
+  };
+
+  TestController testController;
+  char format[] = "/tmp/testRepo.XXXXXX";
+  auto dir = testController.createTempDirectory(format);
+
+  auto config = std::make_shared<minifi::Configure>();
+  config->set(minifi::Configure::nifi_flowfile_repository_directory_default, utils::file::FileUtils::concat_path(dir, "flowfile_repository"));
+
+  auto connection = std::make_shared<minifi::Connection>(nullptr, nullptr, "Connection");
+  ConnectionMap connectionMap{{connection->getUUIDStr(), connection}};
+  // initialize repository
+  {
+    std::shared_ptr<TestFlowFileRepository> ff_repository = std::make_shared<TestFlowFileRepository>("flowFileRepository");
+
+    std::atomic<int> flush_counter{0};
+
+    std::atomic<bool> stop{false};
+    std::thread shutdown{[&] {
+      while (!stop.load()) {}
+      ff_repository->stop();
+    }};
+
+    ff_repository->onFlush_ = [&] {
+      if (++flush_counter != 1) {
+        return;
+      }
+      
+      for (int keyIdx = 0; keyIdx < 100; ++keyIdx) {
+        auto file = std::make_shared<minifi::FlowFileRecord>(ff_repository, nullptr);
+        file->setUuidConnection(connection->getUUIDStr());
+        // Serialize is sync
+        file->Serialize();
+        if (keyIdx % 2 == 0) {
+          // delete every second flowFile
+          ff_repository->Delete(file->getUUIDStr());
+        }
+      }
+      stop = true;
+      // wait for the shutdown thread to start waiting for the worker thread
+      std::this_thread::sleep_for(std::chrono::milliseconds{100});
+    };
+
+    ff_repository->setConnectionMap(connectionMap);
+    ff_repository->initialize(config);
+    ff_repository->loadComponent(nullptr);
+    ff_repository->start();
+
+    shutdown.join();
+  }
+
+  // check if the deleted flowfiles are indeed deleted
+  {
+    std::shared_ptr<TestFlowFileRepository> ff_repository = std::make_shared<TestFlowFileRepository>("flowFileRepository");
+    ff_repository->setConnectionMap(connectionMap);
+    ff_repository->initialize(config);
+    ff_repository->loadComponent(nullptr);
+    ff_repository->start();
+    std::this_thread::sleep_for(std::chrono::milliseconds{100});
+    REQUIRE(connection->getQueueSize() == 50);
+  }
+}

Review comment:
       done

##########
File path: libminifi/test/rocksdb-tests/RepoTests.cpp
##########
@@ -326,3 +326,82 @@ TEST_CASE("Test FlowFile Restore", "[TestFFR6]") {
 
   LogTestController::getInstance().reset();
 }
+
+TEST_CASE("Flush deleted flowfiles before shutdown", "[TestFFR7]") {
+  using ConnectionMap = std::map<std::string, std::shared_ptr<core::Connectable>>;
+
+  class TestFlowFileRepository: public core::repository::FlowFileRepository{
+   public:
+    explicit TestFlowFileRepository(const std::string& name)
+        : core::SerializableComponent(name),
+          FlowFileRepository(name, FLOWFILE_REPOSITORY_DIRECTORY, MAX_FLOWFILE_REPOSITORY_ENTRY_LIFE_TIME,
+                             MAX_FLOWFILE_REPOSITORY_STORAGE_SIZE, 1) {}
+    void flush() override {
+      FlowFileRepository::flush();
+      if (onFlush_) {
+        onFlush_();
+      }
+    }
+    std::function<void()> onFlush_;
+  };
+
+  TestController testController;
+  char format[] = "/tmp/testRepo.XXXXXX";
+  auto dir = testController.createTempDirectory(format);
+
+  auto config = std::make_shared<minifi::Configure>();
+  config->set(minifi::Configure::nifi_flowfile_repository_directory_default, utils::file::FileUtils::concat_path(dir, "flowfile_repository"));
+
+  auto connection = std::make_shared<minifi::Connection>(nullptr, nullptr, "Connection");
+  ConnectionMap connectionMap{{connection->getUUIDStr(), connection}};
+  // initialize repository
+  {
+    std::shared_ptr<TestFlowFileRepository> ff_repository = std::make_shared<TestFlowFileRepository>("flowFileRepository");
+
+    std::atomic<int> flush_counter{0};
+
+    std::atomic<bool> stop{false};
+    std::thread shutdown{[&] {
+      while (!stop.load()) {}

Review comment:
       done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #826: MINIFICPP-1274 - Commit delete operation before shutdown

Posted by GitBox <gi...@apache.org>.
arpadboda commented on a change in pull request #826:
URL: https://github.com/apache/nifi-minifi-cpp/pull/826#discussion_r447271999



##########
File path: libminifi/test/rocksdb-tests/RepoTests.cpp
##########
@@ -326,3 +326,82 @@ TEST_CASE("Test FlowFile Restore", "[TestFFR6]") {
 
   LogTestController::getInstance().reset();
 }
+
+TEST_CASE("Flush deleted flowfiles before shutdown", "[TestFFR7]") {
+  using ConnectionMap = std::map<std::string, std::shared_ptr<core::Connectable>>;
+
+  class TestFlowFileRepository: public core::repository::FlowFileRepository{
+   public:
+    explicit TestFlowFileRepository(const std::string& name)
+        : core::SerializableComponent(name),
+          FlowFileRepository(name, FLOWFILE_REPOSITORY_DIRECTORY, MAX_FLOWFILE_REPOSITORY_ENTRY_LIFE_TIME,
+                             MAX_FLOWFILE_REPOSITORY_STORAGE_SIZE, 1) {}
+    void flush() override {
+      FlowFileRepository::flush();
+      if (onFlush_) {
+        onFlush_();
+      }
+    }
+    std::function<void()> onFlush_;
+  };
+
+  TestController testController;
+  char format[] = "/tmp/testRepo.XXXXXX";
+  auto dir = testController.createTempDirectory(format);
+
+  auto config = std::make_shared<minifi::Configure>();
+  config->set(minifi::Configure::nifi_flowfile_repository_directory_default, utils::file::FileUtils::concat_path(dir, "flowfile_repository"));
+
+  auto connection = std::make_shared<minifi::Connection>(nullptr, nullptr, "Connection");
+  ConnectionMap connectionMap{{connection->getUUIDStr(), connection}};
+  // initialize repository
+  {
+    std::shared_ptr<TestFlowFileRepository> ff_repository = std::make_shared<TestFlowFileRepository>("flowFileRepository");
+
+    std::atomic<int> flush_counter{0};
+
+    std::atomic<bool> stop{false};
+    std::thread shutdown{[&] {
+      while (!stop.load()) {}

Review comment:
       Maybe it's a bit too late, but it seems like a busy wait for me. 
   Which naturally works, but I would avoid that on hour already overloaded CI infra. 
   
   I don't expect a CV here, sleeping 1ms in the body of the loop is good enough :)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org