You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by GitBox <gi...@apache.org> on 2022/11/09 10:51:19 UTC

[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a diff in pull request #1431: MINIFICPP-1937 - Dynamically reopen rocksdb on column config change

adamdebreceni commented on code in PR #1431:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1431#discussion_r1017778091


##########
extensions/rocksdb-repos/database/RocksDbInstance.cpp:
##########
@@ -18,56 +18,138 @@
 
 #include "RocksDbInstance.h"
 #include <vector>
+#include <utility>
 #include "logging/LoggerConfiguration.h"
 #include "rocksdb/utilities/options_util.h"
 #include "OpenRocksDb.h"
 #include "ColumnHandle.h"
+#include "DbHandle.h"
 
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace internal {
+namespace org::apache::nifi::minifi::internal {
 
 std::shared_ptr<core::logging::Logger> RocksDbInstance::logger_ = core::logging::LoggerFactory<RocksDbInstance>::getLogger();
 
-RocksDbInstance::RocksDbInstance(const std::string& path, RocksDbMode mode) : db_name_(path), mode_(mode) {}
+RocksDbInstance::RocksDbInstance(std::string path, RocksDbMode mode) : db_name_(std::move(path)), mode_(mode) {}
 
 void RocksDbInstance::invalidate() {
   std::lock_guard<std::mutex> db_guard{mtx_};
+  invalidate(db_guard);
+}
+
+void RocksDbInstance::invalidate(const std::lock_guard<std::mutex>&) {
   // discard our own instance
   columns_.clear();
   impl_.reset();
 }
 
-std::optional<OpenRocksDb> RocksDbInstance::open(const std::string& column, const DBOptionsPatch& db_options_patch, const ColumnFamilyOptionsPatch& cf_options_patch) {
+void RocksDbInstance::registerColumnConfig(const std::string& column, const DBOptionsPatch& db_options_patch, const ColumnFamilyOptionsPatch& cf_options_patch) {
+  std::lock_guard<std::mutex> db_guard{mtx_};
+  logger_->log_trace("Registering column '%s' in database '%s'", column, db_name_);
+  auto it = column_configs_.find(column);
+  if (it != column_configs_.end()) {
+    throw std::runtime_error("Configuration is already registered for column '" + column + "'");
+  }
+  column_configs_[column] = {.dbo_patch = db_options_patch, .cfo_patch = cf_options_patch};
+
+  bool need_reopen = [&] {
+    if (!impl_) {
+      logger_->log_trace("Database is already scheduled to be reopened");
+      return false;
+    }
+    {
+      rocksdb::DBOptions db_opts_copy = db_options_;
+      Writable<rocksdb::DBOptions> db_opts_writer(db_opts_copy);
+      if (db_options_patch) {
+        db_options_patch(db_opts_writer);
+        if (db_opts_writer.isModified()) {
+          logger_->log_trace("Requested a difference DBOptions than the one that was used to open the database");
+          return true;
+        }
+      }
+    }
+    if (!columns_.contains(column)) {
+      logger_->log_trace("Previously unspecified column, will dynamically create the column");
+      return false;
+    }
+    if (!cf_options_patch) {
+      logger_->log_trace("No explicit ColumnFamilyOptions was requested");
+      return false;
+    }
+    logger_->log_trace("Could not determine if we definitely need to reopen or we are definitely safe, requesting reopen");
+    return true;
+  }();
+  if (need_reopen) {
+    // reset impl_, for the database to be reopened on the next RocksDbInstance::open call
+    invalidate(db_guard);
+  }
+}
+
+void RocksDbInstance::unregisterColumnConfig(const std::string& column) {
+  std::lock_guard<std::mutex> db_guard{mtx_};
+  auto it = column_configs_.find(column);

Review Comment:
   good idea, changed



##########
extensions/rocksdb-repos/database/RocksDbInstance.cpp:
##########
@@ -18,56 +18,138 @@
 
 #include "RocksDbInstance.h"
 #include <vector>
+#include <utility>
 #include "logging/LoggerConfiguration.h"
 #include "rocksdb/utilities/options_util.h"
 #include "OpenRocksDb.h"
 #include "ColumnHandle.h"
+#include "DbHandle.h"
 
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace internal {
+namespace org::apache::nifi::minifi::internal {
 
 std::shared_ptr<core::logging::Logger> RocksDbInstance::logger_ = core::logging::LoggerFactory<RocksDbInstance>::getLogger();
 
-RocksDbInstance::RocksDbInstance(const std::string& path, RocksDbMode mode) : db_name_(path), mode_(mode) {}
+RocksDbInstance::RocksDbInstance(std::string path, RocksDbMode mode) : db_name_(std::move(path)), mode_(mode) {}
 
 void RocksDbInstance::invalidate() {
   std::lock_guard<std::mutex> db_guard{mtx_};
+  invalidate(db_guard);
+}
+
+void RocksDbInstance::invalidate(const std::lock_guard<std::mutex>&) {
   // discard our own instance
   columns_.clear();
   impl_.reset();
 }
 
-std::optional<OpenRocksDb> RocksDbInstance::open(const std::string& column, const DBOptionsPatch& db_options_patch, const ColumnFamilyOptionsPatch& cf_options_patch) {
+void RocksDbInstance::registerColumnConfig(const std::string& column, const DBOptionsPatch& db_options_patch, const ColumnFamilyOptionsPatch& cf_options_patch) {
+  std::lock_guard<std::mutex> db_guard{mtx_};
+  logger_->log_trace("Registering column '%s' in database '%s'", column, db_name_);
+  auto it = column_configs_.find(column);

Review Comment:
   good idea, changed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org