You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by GitBox <gi...@apache.org> on 2022/05/12 09:29:13 UTC

[GitHub] [nifi-minifi-cpp] lordgamez commented on a diff in pull request #1328: MINIFICPP-1812 - Clean up Repository threads

lordgamez commented on code in PR #1328:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1328#discussion_r871108878


##########
extensions/rocksdb-repos/ProvenanceRepository.h:
##########
@@ -26,35 +26,31 @@
 #include "rocksdb/db.h"
 #include "rocksdb/options.h"
 #include "rocksdb/slice.h"
-#include "core/Repository.h"
 #include "core/Core.h"
-#include "provenance/Provenance.h"
 #include "core/logging/LoggerConfiguration.h"
+#include "core/ThreadedRepository.h"
+#include "provenance/Provenance.h"
 
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace provenance {
+namespace org::apache::nifi::minifi::provenance {
 
-#define PROVENANCE_DIRECTORY "./provenance_repository"
-#define MAX_PROVENANCE_STORAGE_SIZE (10*1024*1024)  // 10M
+constexpr auto PROVENANCE_DIRECTORY = "./provenance_repository";
+constexpr auto MAX_PROVENANCE_STORAGE_SIZE = 10 * 1024 * 1024;  // 10M
 constexpr auto MAX_PROVENANCE_ENTRY_LIFE_TIME = std::chrono::minutes(1);
 constexpr auto PROVENANCE_PURGE_PERIOD = std::chrono::milliseconds(2500);
 
-class ProvenanceRepository : public core::Repository {
+class ProvenanceRepository : public core::ThreadedRepository {
  public:
-  ProvenanceRepository(const std::string& name, const utils::Identifier& /*uuid*/)
-      : ProvenanceRepository(name) {
-  }
-  // Constructor
-  /*!
-   * Create a new provenance repository
-   */
-  explicit ProvenanceRepository(const std::string& repo_name = "", std::string directory = PROVENANCE_DIRECTORY, std::chrono::milliseconds maxPartitionMillis = MAX_PROVENANCE_ENTRY_LIFE_TIME,
-      int64_t maxPartitionBytes = MAX_PROVENANCE_STORAGE_SIZE, std::chrono::milliseconds purgePeriod = PROVENANCE_PURGE_PERIOD)
-      : core::SerializableComponent(repo_name),
-        Repository(repo_name.length() > 0 ? repo_name : core::getClassName<ProvenanceRepository>(), directory, maxPartitionMillis, maxPartitionBytes, purgePeriod) {
+  ProvenanceRepository(const std::string &name, const utils::Identifier & /*uuid*/)
+          : ProvenanceRepository(name) {
+  }
+
+  explicit ProvenanceRepository(const std::string &repo_name = "", std::string directory = PROVENANCE_DIRECTORY,
+                                std::chrono::milliseconds maxPartitionMillis = MAX_PROVENANCE_ENTRY_LIFE_TIME,
+                                int64_t maxPartitionBytes = MAX_PROVENANCE_STORAGE_SIZE,
+                                std::chrono::milliseconds purgePeriod = PROVENANCE_PURGE_PERIOD)
+          : core::SerializableComponent(repo_name),
+            ThreadedRepository(repo_name.length() > 0 ? repo_name : core::getClassName<ProvenanceRepository>(), directory,
+                       maxPartitionMillis, maxPartitionBytes, purgePeriod) {

Review Comment:
   Just a nitpick, but I don't think deeper alignment of this line is necessary



##########
extensions/rocksdb-repos/FlowFileRepository.h:
##########
@@ -27,30 +27,25 @@
 #include "rocksdb/options.h"
 #include "rocksdb/slice.h"
 #include "rocksdb/utilities/checkpoint.h"
-#include "core/Repository.h"
 #include "core/Core.h"
-#include "Connection.h"
 #include "core/logging/LoggerConfiguration.h"
+#include "core/ThreadedRepository.h"
+#include "Connection.h"
 #include "concurrentqueue.h"
 #include "database/RocksDatabase.h"
 #include "encryption/RocksDbEncryptionProvider.h"
 #include "utils/crypto/EncryptionProvider.h"
 
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace core {
-namespace repository {
+namespace org::apache::nifi::minifi::core::repository {
 
 #ifdef WIN32
-#define FLOWFILE_REPOSITORY_DIRECTORY ".\\flowfile_repository"
-#define FLOWFILE_CHECKPOINT_DIRECTORY ".\\flowfile_checkpoint"
+constexpr auto FLOWFILE_REPOSITORY_DIRECTORY = ".\\flowfile_repository";
+constexpr auto FLOWFILE_CHECKPOINT_DIRECTORY = ".\\flowfile_checkpoint";
 #else
-#define FLOWFILE_REPOSITORY_DIRECTORY "./flowfile_repository"
-#define FLOWFILE_CHECKPOINT_DIRECTORY "./flowfile_checkpoint"
+constexpr auto FLOWFILE_REPOSITORY_DIRECTORY = "./flowfile_repository";
+constexpr auto FLOWFILE_CHECKPOINT_DIRECTORY = "./flowfile_checkpoint";
 #endif
-#define MAX_FLOWFILE_REPOSITORY_STORAGE_SIZE (10*1024*1024)  // 10M
+constexpr auto MAX_FLOWFILE_REPOSITORY_STORAGE_SIZE = 10*1024*1024;  // 10M

Review Comment:
   You could use 10_MiB from Literals.h. The same could be used in ProvenanceRepository.h and Repository.h



##########
libminifi/src/core/RepositoryFactory.cpp:
##########
@@ -96,6 +96,62 @@ std::unique_ptr<core::ContentRepository> createContentRepository(const std::stri
   throw std::runtime_error("Support for the provided configuration class could not be found");
 }
 
+class NoOpThreadedRepository : public core::ThreadedRepository {
+ public:
+  explicit NoOpThreadedRepository(const std::string& repo_name)
+    : core::SerializableComponent(repo_name),
+    ThreadedRepository(repo_name) {
+  }
+
+  ~NoOpThreadedRepository() override {
+    stop();
+  }
+
+ private:
+  void run() override {
+  }
+
+  std::thread& getThread() override {
+    return thread_;
+  }
+
+  std::thread thread_;
+};
+
+std::unique_ptr<core::ThreadedRepository> createThreadedRepository(const std::string& configuration_class_name, bool fail_safe, const std::string& repo_name) {

Review Comment:
   Do we need the fail_safe argument? It seems that in all scenarios this is set to true, so we could just suppress the exceptions in all cases and remove this parameter.



##########
libminifi/src/core/RepositoryFactory.cpp:
##########
@@ -96,6 +96,62 @@ std::unique_ptr<core::ContentRepository> createContentRepository(const std::stri
   throw std::runtime_error("Support for the provided configuration class could not be found");
 }
 
+class NoOpThreadedRepository : public core::ThreadedRepository {
+ public:
+  explicit NoOpThreadedRepository(const std::string& repo_name)
+    : core::SerializableComponent(repo_name),
+    ThreadedRepository(repo_name) {
+  }
+
+  ~NoOpThreadedRepository() override {
+    stop();
+  }
+
+ private:
+  void run() override {
+  }
+
+  std::thread& getThread() override {
+    return thread_;
+  }
+
+  std::thread thread_;
+};
+
+std::unique_ptr<core::ThreadedRepository> createThreadedRepository(const std::string& configuration_class_name, bool fail_safe, const std::string& repo_name) {
+  std::string class_name_lc = configuration_class_name;
+  std::transform(class_name_lc.begin(), class_name_lc.end(), class_name_lc.begin(), ::tolower);
+  try {
+    auto return_obj = core::ClassLoader::getDefaultClassLoader().instantiate<core::ThreadedRepository>(class_name_lc, class_name_lc);
+    if (return_obj) {
+      return_obj->setName(repo_name);
+      return return_obj;
+    }
+    // if the desired repos don't exist, we can try doing string matches and rely on volatile repositories
+    if (class_name_lc == "flowfilerepository" || class_name_lc == "volatileflowfilerepository") {
+      return_obj = instantiate<repository::VolatileFlowFileRepository>(repo_name);
+    } else if (class_name_lc == "provenancerepository" || class_name_lc == "volatileprovenancefilerepository") {
+      return_obj = instantiate<repository::VolatileProvenanceRepository>(repo_name);
+    } else if (class_name_lc == "nooprepository") {
+      return_obj = std::make_unique<core::NoOpThreadedRepository>(repo_name);
+    }
+    if (return_obj) {
+      return return_obj;
+    }
+    if (fail_safe) {
+      return {};
+    } else {
+      throw std::runtime_error("Support for the provided configuration class could not be found");
+    }
+  } catch (const std::runtime_error &) {
+    if (fail_safe) {
+      return {};
+    }
+  }
+
+  throw std::runtime_error("Support for the provided configuration class could not be found");
+}
+
 } /* namespace core */
 } /* namespace minifi */
 } /* namespace nifi */

Review Comment:
   We could concatenate the nested namespaces here as well.



##########
libminifi/include/core/Repository.h:
##########
@@ -240,30 +205,20 @@ class Repository : public virtual core::SerializableComponent, public core::Trac
   std::map<std::string, core::Connectable*> containers_;
 
   std::map<std::string, core::Connectable*> connection_map_;
-  // Mutex for protection
-  std::mutex mutex_;
   // repository directory

Review Comment:
   You could remove this and similar uninformative member comments below



##########
libminifi/include/core/repository/VolatileRepository.h:
##########
@@ -39,31 +39,24 @@ namespace nifi {
 namespace minifi {
 namespace core {
 namespace repository {
-#if defined(__clang__)
-#pragma clang diagnostic push
-#pragma clang diagnostic ignored "-Woverloaded-virtual"
-#elif defined(__GNUC__) || defined(__GNUG__)
-#pragma GCC diagnostic push
-#pragma GCC diagnostic ignored "-Woverloaded-virtual"
-#endif
+
 /**
  * Flow File repository
  * Design: Extends Repository and implements the run function, using RocksDB as the primary substrate.
  */
-template<typename T>
-class VolatileRepository : public core::Repository {
+template<typename T, typename T_Repository = core::Repository>

Review Comment:
   Maybe you could rename the template arguments to KeyType and RepositoryType to be more clear



##########
libminifi/src/core/RepositoryFactory.cpp:
##########
@@ -96,6 +96,62 @@ std::unique_ptr<core::ContentRepository> createContentRepository(const std::stri
   throw std::runtime_error("Support for the provided configuration class could not be found");
 }
 
+class NoOpThreadedRepository : public core::ThreadedRepository {
+ public:
+  explicit NoOpThreadedRepository(const std::string& repo_name)
+    : core::SerializableComponent(repo_name),
+    ThreadedRepository(repo_name) {
+  }
+
+  ~NoOpThreadedRepository() override {
+    stop();
+  }
+
+ private:
+  void run() override {
+  }
+
+  std::thread& getThread() override {
+    return thread_;
+  }
+
+  std::thread thread_;
+};
+
+std::unique_ptr<core::ThreadedRepository> createThreadedRepository(const std::string& configuration_class_name, bool fail_safe, const std::string& repo_name) {
+  std::string class_name_lc = configuration_class_name;
+  std::transform(class_name_lc.begin(), class_name_lc.end(), class_name_lc.begin(), ::tolower);
+  try {
+    auto return_obj = core::ClassLoader::getDefaultClassLoader().instantiate<core::ThreadedRepository>(class_name_lc, class_name_lc);
+    if (return_obj) {
+      return_obj->setName(repo_name);
+      return return_obj;
+    }
+    // if the desired repos don't exist, we can try doing string matches and rely on volatile repositories
+    if (class_name_lc == "flowfilerepository" || class_name_lc == "volatileflowfilerepository") {
+      return_obj = instantiate<repository::VolatileFlowFileRepository>(repo_name);
+    } else if (class_name_lc == "provenancerepository" || class_name_lc == "volatileprovenancefilerepository") {
+      return_obj = instantiate<repository::VolatileProvenanceRepository>(repo_name);
+    } else if (class_name_lc == "nooprepository") {
+      return_obj = std::make_unique<core::NoOpThreadedRepository>(repo_name);
+    }
+    if (return_obj) {
+      return return_obj;
+    }
+    if (fail_safe) {
+      return {};
+    } else {
+      throw std::runtime_error("Support for the provided configuration class could not be found");
+    }
+  } catch (const std::runtime_error &) {
+    if (fail_safe) {
+      return {};
+    }
+  }
+
+  throw std::runtime_error("Support for the provided configuration class could not be found");

Review Comment:
   We don't need to throw here if we just call a `throw` in the catch block rethrowing the original exception.



##########
libminifi/include/core/Repository.h:
##########
@@ -240,30 +205,20 @@ class Repository : public virtual core::SerializableComponent, public core::Trac
   std::map<std::string, core::Connectable*> containers_;
 
   std::map<std::string, core::Connectable*> connection_map_;
-  // Mutex for protection
-  std::mutex mutex_;
   // repository directory
   std::string directory_;
-  // max db entry life time
+  // max db entry lifetime
   std::chrono::milliseconds max_partition_millis_;
   // max db size
   int64_t max_partition_bytes_;
   // purge period
   std::chrono::milliseconds purge_period_;
-  // thread
-  std::thread thread_;
-  // whether the monitoring thread is running for the repo while it was enabled
-  std::atomic<bool> running_;
-  // whether stop accepting provenace event
+  // whether stop accepting provenance event

Review Comment:
   Typo: Whether TO stop...



##########
libminifi/src/core/RepositoryFactory.cpp:
##########
@@ -96,6 +96,62 @@ std::unique_ptr<core::ContentRepository> createContentRepository(const std::stri
   throw std::runtime_error("Support for the provided configuration class could not be found");
 }
 
+class NoOpThreadedRepository : public core::ThreadedRepository {
+ public:
+  explicit NoOpThreadedRepository(const std::string& repo_name)
+    : core::SerializableComponent(repo_name),
+    ThreadedRepository(repo_name) {
+  }
+
+  ~NoOpThreadedRepository() override {
+    stop();
+  }
+
+ private:
+  void run() override {
+  }
+
+  std::thread& getThread() override {
+    return thread_;
+  }
+
+  std::thread thread_;
+};
+
+std::unique_ptr<core::ThreadedRepository> createThreadedRepository(const std::string& configuration_class_name, bool fail_safe, const std::string& repo_name) {
+  std::string class_name_lc = configuration_class_name;
+  std::transform(class_name_lc.begin(), class_name_lc.end(), class_name_lc.begin(), ::tolower);
+  try {
+    auto return_obj = core::ClassLoader::getDefaultClassLoader().instantiate<core::ThreadedRepository>(class_name_lc, class_name_lc);
+    if (return_obj) {
+      return_obj->setName(repo_name);
+      return return_obj;
+    }
+    // if the desired repos don't exist, we can try doing string matches and rely on volatile repositories
+    if (class_name_lc == "flowfilerepository" || class_name_lc == "volatileflowfilerepository") {
+      return_obj = instantiate<repository::VolatileFlowFileRepository>(repo_name);
+    } else if (class_name_lc == "provenancerepository" || class_name_lc == "volatileprovenancefilerepository") {
+      return_obj = instantiate<repository::VolatileProvenanceRepository>(repo_name);
+    } else if (class_name_lc == "nooprepository") {
+      return_obj = std::make_unique<core::NoOpThreadedRepository>(repo_name);
+    }
+    if (return_obj) {
+      return return_obj;
+    }

Review Comment:
   I don't think we need this, we could just return the repositories in each branch.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org