You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by "lordgamez (via GitHub)" <gi...@apache.org> on 2023/03/01 14:31:15 UTC

[GitHub] [nifi-minifi-cpp] lordgamez commented on a diff in pull request #1516: MINIFICPP-2054 - Periodically run manual compaction

lordgamez commented on code in PR #1516:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1516#discussion_r1121794460


##########
extensions/rocksdb-repos/DatabaseContentRepository.cpp:
##########
@@ -42,6 +43,21 @@ bool DatabaseContentRepository::initialize(const std::shared_ptr<minifi::Configu
   const auto encrypted_env = createEncryptingEnv(utils::crypto::EncryptionManager{configuration->getHome()}, DbEncryptionOptions{directory_, ENCRYPTION_KEY_NAME});
   logger_->log_info("Using %s DatabaseContentRepository", encrypted_env ? "encrypted" : "plaintext");
 
+  compaction_period_ = DEFAULT_COMPACTION_PERIOD;
+  if (auto compaction_period_str = configuration->get(Configure::nifi_dbcontent_repository_compaction_period)) {
+    if (auto compaction_period = TimePeriodValue::fromString(compaction_period_str.value())) {
+      compaction_period_ = compaction_period->getMilliseconds();
+      if (compaction_period_.count() == 0) {
+        logger_->log_warn("Setting '%s' to 0 disables forced compaction", Configure::nifi_dbcontent_repository_compaction_period);
+      }
+    } else {
+      logger_->log_error("Malformed property '%s', expected time period, using default", Configure::nifi_dbcontent_repository_compaction_period);
+    }
+  } else {
+    logger_->log_debug("Using default compaction period of %" PRId64 " ms", int64_t{compaction_period_.count()});
+  }
+

Review Comment:
   Remove additional newline



##########
extensions/rocksdb-repos/DatabaseContentRepository.h:
##########
@@ -69,12 +72,23 @@ class DatabaseContentRepository : public core::ContentRepository {
 
   void clearOrphans() override;
 
+  void start() override;
+  void stop() override;
+
+  // test-only utility
+  void test_deinitialize();

Review Comment:
   It may be better to create a TestDatabaseContentRepository in the test codebase that inherits from this and implement the test only functions there. Also please use camelCase here.



##########
extensions/rocksdb-repos/DatabaseContentRepository.cpp:
##########
@@ -42,6 +43,21 @@ bool DatabaseContentRepository::initialize(const std::shared_ptr<minifi::Configu
   const auto encrypted_env = createEncryptingEnv(utils::crypto::EncryptionManager{configuration->getHome()}, DbEncryptionOptions{directory_, ENCRYPTION_KEY_NAME});
   logger_->log_info("Using %s DatabaseContentRepository", encrypted_env ? "encrypted" : "plaintext");
 
+  compaction_period_ = DEFAULT_COMPACTION_PERIOD;
+  if (auto compaction_period_str = configuration->get(Configure::nifi_dbcontent_repository_compaction_period)) {
+    if (auto compaction_period = TimePeriodValue::fromString(compaction_period_str.value())) {
+      compaction_period_ = compaction_period->getMilliseconds();
+      if (compaction_period_.count() == 0) {
+        logger_->log_warn("Setting '%s' to 0 disables forced compaction", Configure::nifi_dbcontent_repository_compaction_period);
+      }
+    } else {
+      logger_->log_error("Malformed property '%s', expected time period, using default", Configure::nifi_dbcontent_repository_compaction_period);
+    }
+  } else {
+    logger_->log_debug("Using default compaction period of %" PRId64 " ms", int64_t{compaction_period_.count()});
+  }

Review Comment:
   This could be extracted to a function like `setCompactionPeriod`



##########
libminifi/include/utils/StoppableThread.h:
##########
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <mutex>
+#include <thread>
+#include <condition_variable>
+#include <atomic>
+#include <functional>
+
+namespace org::apache::nifi::minifi::utils {
+
+// mimics some aspects of std::jthread
+// unfortunately clang's jthread support is lacking
+// TODO(adebreceni): replace this with std::jthread
+class StoppableThread {
+ public:
+  explicit StoppableThread(std::function<void()> fn);
+
+  void stopWait() {
+    running_ = false;
+    {
+      std::unique_lock lock(mtx_);
+      cv_.notify_all();
+    }
+    if (thread_.joinable()) {
+      thread_.join();
+    }
+  }
+
+  ~StoppableThread() {
+    stopWait();
+  }
+
+  // return true if stop was requested
+  static bool wait_stop_requested(std::chrono::milliseconds time);

Review Comment:
   Maybe a name like `waitForStopRequest` would be more clear, and please use camelCase



##########
extensions/rocksdb-repos/FlowFileRepository.cpp:
##########
@@ -200,6 +200,20 @@ bool FlowFileRepository::initialize(const std::shared_ptr<Configure> &configure)
   }
   logger_->log_debug("NiFi FlowFile Repository Directory %s", directory_);
 
+  compaction_period_ = DEFAULT_COMPACTION_PERIOD;
+  if (auto compaction_period_str = configure->get(Configure::nifi_flowfile_repository_compaction_period)) {
+    if (auto compaction_period = TimePeriodValue::fromString(compaction_period_str.value())) {
+      compaction_period_ = compaction_period->getMilliseconds();
+      if (compaction_period_.count() == 0) {
+        logger_->log_warn("Setting '%s' to 0 disables forced compaction", Configure::nifi_dbcontent_repository_compaction_period);
+      }
+    } else {
+      logger_->log_error("Malformed property '%s', expected time period, using default", Configure::nifi_flowfile_repository_compaction_period);
+    }
+  } else {
+    logger_->log_debug("Using default compaction period of %" PRId64 " ms", int64_t{compaction_period_.count()});
+  }

Review Comment:
   Please extract to a function.



##########
libminifi/include/properties/Configuration.h:
##########
@@ -69,6 +69,11 @@ class Configuration : public Properties {
   static constexpr const char *nifi_provenance_repository_directory_default = "nifi.provenance.repository.directory.default";
   static constexpr const char *nifi_flowfile_repository_directory_default = "nifi.flowfile.repository.directory.default";
   static constexpr const char *nifi_dbcontent_repository_directory_default = "nifi.database.content.repository.directory.default";
+
+  // these are internal properties related to the rocksdb backend
+  static constexpr const char *nifi_flowfile_repository_compaction_period = "nifi.flowfile.repository.compaction.period";
+  static constexpr const char *nifi_dbcontent_repository_compaction_period = "nifi.database.content.repository.compaction.period";

Review Comment:
   These should be added to the `CONFIGURATION_PROPERTIES` vector as well to be configurable through C2. Should they also be added to the default minifi.properties file as commented properties to be visible to the user?



##########
libminifi/include/utils/StoppableThread.h:
##########
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <mutex>
+#include <thread>
+#include <condition_variable>
+#include <atomic>
+#include <functional>
+
+namespace org::apache::nifi::minifi::utils {
+
+// mimics some aspects of std::jthread
+// unfortunately clang's jthread support is lacking
+// TODO(adebreceni): replace this with std::jthread
+class StoppableThread {
+ public:
+  explicit StoppableThread(std::function<void()> fn);
+
+  void stopWait() {

Review Comment:
   Wouldn't the name `stop()` be more clear?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org