You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ab...@apache.org on 2020/06/09 16:54:35 UTC

[nifi-minifi-cpp] branch master updated: MINIFICPP-1249 - Restore agent from persisted repo

This is an automated email from the ASF dual-hosted git repository.

aboda pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git


The following commit(s) were added to refs/heads/master by this push:
     new 0f9a22c  MINIFICPP-1249 - Restore agent from persisted repo
0f9a22c is described below

commit 0f9a22cb2734fff261963df0d04dd18374d136cd
Author: Adam Debreceni <ad...@192.168.0.205>
AuthorDate: Tue Jun 9 11:52:09 2020 +0200

    MINIFICPP-1249 - Restore agent from persisted repo
    
    Signed-off-by: Arpad Boda <ab...@apache.org>
    
    This closes #804
---
 .../standard-processors/processors/GetFile.h       | 12 ++--
 libminifi/include/Connection.h                     |  8 +--
 libminifi/include/core/Connectable.h               |  2 +-
 libminifi/test/rocksdb-tests/RepoTests.cpp         | 74 ++++++++++++++++++++++
 4 files changed, 85 insertions(+), 11 deletions(-)

diff --git a/extensions/standard-processors/processors/GetFile.h b/extensions/standard-processors/processors/GetFile.h
index 5e9b04a..837e14b 100644
--- a/extensions/standard-processors/processors/GetFile.h
+++ b/extensions/standard-processors/processors/GetFile.h
@@ -66,11 +66,11 @@ class GetFileMetrics : public state::response::ResponseNode {
   virtual ~GetFileMetrics() {
 
   }
-  virtual std::string getName() const {
+  std::string getName() const override {
     return core::Connectable::getName();
   }
 
-  virtual std::vector<state::response::SerializedResponseNode> serialize() {
+  std::vector<state::response::SerializedResponseNode> serialize() override {
     std::vector<state::response::SerializedResponseNode> resp;
 
     state::response::SerializedResponseNode iter;
@@ -143,23 +143,23 @@ class GetFile : public core::Processor, public state::response::MetricsNodeSourc
    * @param sessionFactory process session factory that is used when creating
    * ProcessSession objects.
    */
-  void onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory);
+  void onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory) override;
   /**
    * Execution trigger for the GetFile Processor
    * @param context processor context
    * @param session processor session reference.
    */
-  virtual void onTrigger(core::ProcessContext *context, core::ProcessSession *session);
+  void onTrigger(core::ProcessContext *context, core::ProcessSession *session) override;
 
   // Initialize, over write by NiFi GetFile
-  virtual void initialize(void);
+  void initialize(void) override;
   /**
    * performs a listing on the directory.
    * @param request get file request.
    */
   void performListing(const GetFileRequest &request);
 
-  int16_t getMetricNodes(std::vector<std::shared_ptr<state::response::ResponseNode>> &metric_vector);
+  int16_t getMetricNodes(std::vector<std::shared_ptr<state::response::ResponseNode>> &metric_vector) override;
 
  protected:
 
diff --git a/libminifi/include/Connection.h b/libminifi/include/Connection.h
index ed452e4..ed4349c 100644
--- a/libminifi/include/Connection.h
+++ b/libminifi/include/Connection.h
@@ -152,7 +152,7 @@ class Connection : public core::Connectable, public std::enable_shared_from_this
   uint64_t getQueueDataSize() {
     return queued_data_size_;
   }
-  void put(std::shared_ptr<core::Connectable> flow) {
+  void put(std::shared_ptr<core::Connectable> flow) override {
     std::shared_ptr<core::FlowFile> ff = std::static_pointer_cast<core::FlowFile>(flow);
     if (nullptr != ff) {
       put(ff);
@@ -169,15 +169,15 @@ class Connection : public core::Connectable, public std::enable_shared_from_this
   // Drain the flow records
   void drain();
 
-  void yield() {
+  void yield() override {
 
   }
 
-  bool isWorkAvailable() {
+  bool isWorkAvailable() override {
     return !isEmpty();
   }
 
-  bool isRunning() {
+  bool isRunning() override {
     return true;
   }
 
diff --git a/libminifi/include/core/Connectable.h b/libminifi/include/core/Connectable.h
index 82a9878..485732d 100644
--- a/libminifi/include/core/Connectable.h
+++ b/libminifi/include/core/Connectable.h
@@ -74,7 +74,7 @@ class Connectable : public CoreComponent {
    */
   std::set<std::shared_ptr<Connectable>> getOutGoingConnections(const std::string &relationship) const;
 
-  void put(std::shared_ptr<Connectable> flow) {
+  virtual void put(std::shared_ptr<Connectable> flow) {
 
   }
 
diff --git a/libminifi/test/rocksdb-tests/RepoTests.cpp b/libminifi/test/rocksdb-tests/RepoTests.cpp
index 34e0a39..e970c40 100644
--- a/libminifi/test/rocksdb-tests/RepoTests.cpp
+++ b/libminifi/test/rocksdb-tests/RepoTests.cpp
@@ -252,3 +252,77 @@ TEST_CASE("Test Validate Checkpoint ", "[TestFFR5]") {
   LogTestController::getInstance().reset();
 }
 
+TEST_CASE("Test FlowFile Restore", "[TestFFR6]") {
+  TestController testController;
+  LogTestController::getInstance().setDebug<core::ContentRepository>();
+  LogTestController::getInstance().setTrace<core::repository::FileSystemRepository>();
+  LogTestController::getInstance().setTrace<minifi::ResourceClaim>();
+  LogTestController::getInstance().setTrace<minifi::FlowFileRecord>();
+
+  char format[] = "/var/tmp/testRepo.XXXXXX";
+  auto dir = testController.createTempDirectory(format);
+
+  auto config = std::make_shared<minifi::Configure>();
+  config->set(minifi::Configure::nifi_dbcontent_repository_directory_default, utils::file::FileUtils::concat_path(dir, "content_repository"));
+  config->set(minifi::Configure::nifi_flowfile_repository_directory_default, utils::file::FileUtils::concat_path(dir, "flowfile_repository"));
+
+  std::shared_ptr<core::Repository> prov_repo = std::make_shared<TestRepository>();
+  std::shared_ptr<core::repository::FlowFileRepository> ff_repository = std::make_shared<core::repository::FlowFileRepository>("flowFileRepository");
+  std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::FileSystemRepository>();
+  ff_repository->initialize(config);
+  content_repo->initialize(config);
+
+  std::shared_ptr<minifi::Connection> input = std::make_shared<minifi::Connection>(ff_repository, content_repo, "Input");
+
+  auto root = std::make_shared<core::ProcessGroup>(core::ProcessGroupType::ROOT_PROCESS_GROUP, "root");
+  root->addConnection(input);
+
+  auto flowConfig = std::unique_ptr<core::FlowConfiguration>{new core::FlowConfiguration(prov_repo, ff_repository, content_repo, nullptr, config, "")};
+  auto flowController = std::make_shared<minifi::FlowController>(prov_repo, ff_repository, config, std::move(flowConfig), content_repo, "", true);
+
+  std::string data = "banana";
+  minifi::io::DataStream content(reinterpret_cast<const uint8_t*>(data.c_str()), data.length());
+
+  /**
+   * Currently it is the Connection's responsibility to persist the incoming
+   * flowFiles to the FlowFileRepository. Upon restart the FlowFileRepository
+   * checks the persisted database and moves every FlowFile into the Connection
+   * that persisted it (if it can find it. We could have a different flow, in
+   * which case the orphan FlowFiles are deleted.)
+   */
+  {
+    std::shared_ptr<core::Processor> processor = std::make_shared<core::Processor>("dummy");
+    std::shared_ptr<core::ProcessorNode> node = std::make_shared<core::ProcessorNode>(processor);
+    std::shared_ptr<core::controller::ControllerServiceProvider> controller_services_provider = nullptr;
+    auto context = std::make_shared<core::ProcessContext>(node, controller_services_provider, prov_repo, ff_repository, content_repo);
+    core::ProcessSession sessionGenFlowFile(context);
+    std::shared_ptr<core::FlowFile> flow = std::static_pointer_cast<core::FlowFile>(sessionGenFlowFile.create());
+    sessionGenFlowFile.importFrom(content, flow);
+    input->put(flow);  // stores it in the flowFileRepository
+  }
+
+  // remove flow from the connection but it is still present in the
+  // flowFileRepo
+  std::set<std::shared_ptr<core::FlowFile>> expiredFiles;
+  auto oldFlow = input->poll(expiredFiles);
+  REQUIRE(oldFlow);
+  REQUIRE(expiredFiles.empty());
+
+  // this notifies the FlowFileRepository of the flow structure
+  // i.e. what Connections are present (more precisely what Connectables
+  // are present)
+  flowController->load(root);
+  // this will first check the persisted repo and restore all FlowFiles
+  // that still has an owner Connectable
+  ff_repository->start();
+
+  std::this_thread::sleep_for(std::chrono::milliseconds{500});
+
+  // check if the @input Connection's FlowFile was restored
+  // upon the FlowFileRepository's startup
+  auto newFlow = input->poll(expiredFiles);
+  REQUIRE(newFlow);
+  REQUIRE(expiredFiles.empty());
+
+  LogTestController::getInstance().reset();
+}