You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by lo...@apache.org on 2023/01/17 15:14:08 UTC
[nifi-minifi-cpp] 03/03: MINIFICPP-1972 - Refactor State Manager code
This is an automated email from the ASF dual-hosted git repository.
lordgamez pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
commit 00d145e08cf1f04710d56a133a3cd94b1c3a6b09
Author: Adam Markovics <nu...@gmail.com>
AuthorDate: Mon Oct 24 11:47:23 2022 +0200
MINIFICPP-1972 - Refactor State Manager code
Co-authored-by: adamdebreceni <64...@users.noreply.github.com>
Co-authored-by: Ferenc Gerlits <fg...@gmail.com>
Signed-off-by: Gabor Gyimesi <ga...@gmail.com>
This closes #1443
---
.../SourceInitiatedSubscriptionListener.h | 2 +-
...lueStoreService.cpp => RocksDbStateStorage.cpp} | 74 ++++---
...eyValueStoreService.h => RocksDbStateStorage.h} | 36 ++--
extensions/sftp/processors/ListSFTP.h | 4 +-
extensions/sql/processors/QueryDatabaseTable.h | 2 +-
.../controllers/InMemoryKeyValueStorage.cpp | 77 +++++++
.../controllers/InMemoryKeyValueStorage.h | 45 ++---
...reService.cpp => PersistentMapStateStorage.cpp} | 120 ++++++-----
...eStoreService.h => PersistentMapStateStorage.h} | 47 ++---
.../UnorderedMapKeyValueStoreService.cpp | 108 ----------
.../controllers/VolatileMapStateStorage.cpp | 75 +++++++
...lueStoreService.h => VolatileMapStateStorage.h} | 26 ++-
.../standard-processors/processors/TailFile.h | 2 +-
extensions/systemd/ConsumeJournald.h | 4 +-
extensions/windows-event-log/Bookmark.cpp | 2 +-
extensions/windows-event-log/Bookmark.h | 4 +-
.../windows-event-log/ConsumeWindowsEventLog.h | 2 +-
.../windows-event-log/tests/BookmarkTests.cpp | 22 +-
.../AbstractCoreComponentStateManagerProvider.h | 87 --------
...stingKeyValueStoreService.h => AutoPersistor.h} | 45 ++---
.../controllers/keyvalue/KeyValueStateManager.h | 60 ++++++
...yValueStoreService.h => KeyValueStateStorage.h} | 55 +++--
.../keyvalue/PersistableKeyValueStoreService.h | 49 -----
libminifi/include/core/CoreComponentState.h | 82 --------
libminifi/include/core/ProcessContext.h | 108 +++++-----
libminifi/include/core/ProcessSession.h | 2 +-
libminifi/include/core/StateManager.h | 67 +++++++
.../core/StateStorage.h} | 36 ++--
libminifi/include/properties/Configuration.h | 15 +-
libminifi/include/properties/Configure.h | 17 +-
libminifi/include/utils/ListingStateManager.h | 6 +-
libminifi/src/Configuration.cpp | 15 +-
libminifi/src/Configure.cpp | 18 +-
libminifi/src/c2/C2Agent.cpp | 18 +-
.../AbstractCoreComponentStateManagerProvider.cpp | 222 ---------------------
...gKeyValueStoreService.cpp => AutoPersistor.cpp} | 59 ++----
.../controllers/keyvalue/KeyValueStateManager.cpp | 145 ++++++++++++++
.../controllers/keyvalue/KeyValueStateStorage.cpp | 104 ++++++++++
.../keyvalue/PersistableKeyValueStoreService.cpp | 63 ------
libminifi/test/StatefulProcessor.h | 6 +-
libminifi/test/TestBase.cpp | 10 +-
libminifi/test/TestBase.h | 8 +-
libminifi/test/flow-tests/CycleTest.cpp | 4 +-
libminifi/test/flow-tests/FlowControllerTests.cpp | 4 +-
libminifi/test/flow-tests/LoopTest.cpp | 4 +-
libminifi/test/flow-tests/MultiLoopTest.cpp | 4 +-
libminifi/test/integration/IntegrationBase.h | 8 +-
.../integration/StateTransactionalityTests.cpp | 118 +++++------
libminifi/test/keyvalue-tests/CMakeLists.txt | 6 +-
...viceTest.cpp => PersistentStateStorageTest.cpp} | 34 ++--
...iceTest.cpp => VolatileMapStateStorageTest.cpp} | 33 ++-
...rviceTest.yml => PersistentMapStateStorage.yml} | 2 +-
...toreServiceTest.yml => RocksDbStateStorage.yml} | 2 +-
...ServiceTest.yml => VolatileMapStateStorage.yml} | 2 +-
54 files changed, 1043 insertions(+), 1127 deletions(-)
diff --git a/extensions/openwsman/processors/SourceInitiatedSubscriptionListener.h b/extensions/openwsman/processors/SourceInitiatedSubscriptionListener.h
index be72d8c1d..8dede17d8 100644
--- a/extensions/openwsman/processors/SourceInitiatedSubscriptionListener.h
+++ b/extensions/openwsman/processors/SourceInitiatedSubscriptionListener.h
@@ -128,7 +128,7 @@ class SourceInitiatedSubscriptionListener : public core::Processor {
protected:
std::shared_ptr<core::logging::Logger> logger_ = core::logging::LoggerFactory<SourceInitiatedSubscriptionListener>::getLogger();
- core::CoreComponentStateManager* state_manager_;
+ core::StateManager* state_manager_;
std::shared_ptr<core::ProcessSessionFactory> session_factory_;
diff --git a/extensions/rocksdb-repos/controllers/RocksDbPersistableKeyValueStoreService.cpp b/extensions/rocksdb-repos/controllers/RocksDbStateStorage.cpp
similarity index 68%
rename from extensions/rocksdb-repos/controllers/RocksDbPersistableKeyValueStoreService.cpp
rename to extensions/rocksdb-repos/controllers/RocksDbStateStorage.cpp
index e4b38f884..9227e9a60 100644
--- a/extensions/rocksdb-repos/controllers/RocksDbPersistableKeyValueStoreService.cpp
+++ b/extensions/rocksdb-repos/controllers/RocksDbStateStorage.cpp
@@ -15,10 +15,11 @@
* limitations under the License.
*/
+#include <cinttypes>
#include <fstream>
#include <utility>
-#include "RocksDbPersistableKeyValueStoreService.h"
+#include "RocksDbStateStorage.h"
#include "../encryption/RocksDbEncryptionProvider.h"
#include "utils/StringUtils.h"
#include "core/PropertyBuilder.h"
@@ -26,55 +27,63 @@
namespace org::apache::nifi::minifi::controllers {
-const core::Property RocksDbPersistableKeyValueStoreService::LinkedServices(
- core::PropertyBuilder::createProperty("Linked Services")
- ->withDescription("Referenced Controller Services")
- ->build());
-const core::Property RocksDbPersistableKeyValueStoreService::AlwaysPersist(
- core::PropertyBuilder::createProperty(AbstractAutoPersistingKeyValueStoreService::AlwaysPersistPropertyName)
+const core::Property RocksDbStateStorage::AlwaysPersist(
+ core::PropertyBuilder::createProperty(ALWAYS_PERSIST_PROPERTY_NAME)
->withDescription("Persist every change instead of persisting it periodically.")
->isRequired(false)
->withDefaultValue<bool>(false)
->build());
-const core::Property RocksDbPersistableKeyValueStoreService::AutoPersistenceInterval(
- core::PropertyBuilder::createProperty(AbstractAutoPersistingKeyValueStoreService::AutoPersistenceIntervalPropertyName)
+const core::Property RocksDbStateStorage::AutoPersistenceInterval(
+ core::PropertyBuilder::createProperty(AUTO_PERSISTENCE_INTERVAL_PROPERTY_NAME)
->withDescription("The interval of the periodic task persisting all values. Only used if Always Persist is false. If set to 0 seconds, auto persistence will be disabled.")
->isRequired(false)
->withDefaultValue<core::TimePeriodValue>("1 min")
->build());
-const core::Property RocksDbPersistableKeyValueStoreService::Directory(
+const core::Property RocksDbStateStorage::Directory(
core::PropertyBuilder::createProperty("Directory")
->withDescription("Path to a directory for the database")
->isRequired(true)
->build());
-RocksDbPersistableKeyValueStoreService::RocksDbPersistableKeyValueStoreService(std::string name, const utils::Identifier& uuid /*= utils::Identifier()*/)
- : PersistableKeyValueStoreService(name, uuid)
- , AbstractAutoPersistingKeyValueStoreService(std::move(name), uuid) {
+RocksDbStateStorage::RocksDbStateStorage(const std::string& name, const utils::Identifier& uuid /*= utils::Identifier()*/)
+ : KeyValueStateStorage(name, uuid) {
+}
+
+RocksDbStateStorage::~RocksDbStateStorage() {
+ auto_persistor_.stop();
}
-void RocksDbPersistableKeyValueStoreService::initialize() {
+void RocksDbStateStorage::initialize() {
ControllerService::initialize();
setSupportedProperties(properties());
}
-void RocksDbPersistableKeyValueStoreService::onEnable() {
+void RocksDbStateStorage::onEnable() {
if (configuration_ == nullptr) {
- logger_->log_debug("Cannot enable RocksDbPersistableKeyValueStoreService");
+ logger_->log_debug("Cannot enable RocksDbStateStorage");
return;
}
- AbstractAutoPersistingKeyValueStoreService::onEnable();
+ const auto always_persist = getProperty<bool>(AlwaysPersist.getName()).value_or(false);
+ logger_->log_info("Always Persist property: %s", always_persist ? "true" : "false");
+
+ const auto auto_persistence_interval = getProperty<core::TimePeriodValue>(AutoPersistenceInterval.getName()).value_or(core::TimePeriodValue{}).getMilliseconds();
+ logger_->log_info("Auto Persistence Interval property: %" PRId64 " ms", auto_persistence_interval.count());
if (!getProperty(Directory.getName(), directory_)) {
logger_->log_error("Invalid or missing property: Directory");
return;
}
+ auto_persistor_.start(always_persist, auto_persistence_interval, [this] { return persistNonVirtual(); });
+
db_.reset();
- const auto encrypted_env = createEncryptingEnv(utils::crypto::EncryptionManager{configuration_->getHome()}, core::repository::DbEncryptionOptions{directory_, ENCRYPTION_KEY_NAME});
- logger_->log_info("Using %s RocksDbPersistableKeyValueStoreService", encrypted_env ? "encrypted" : "plaintext");
+ auto encrypted_env = createEncryptingEnv(utils::crypto::EncryptionManager{configuration_->getHome()}, core::repository::DbEncryptionOptions{directory_, ENCRYPTION_KEY_NAME});
+ if (!encrypted_env) {
+ encrypted_env = createEncryptingEnv(utils::crypto::EncryptionManager{configuration_->getHome()}, core::repository::DbEncryptionOptions{directory_, ENCRYPTION_KEY_NAME_OLD});
+ }
+ logger_->log_info("Using %s RocksDbStateStorage", encrypted_env ? "encrypted" : "plaintext");
auto set_db_opts = [encrypted_env] (internal::Writable<rocksdb::DBOptions>& db_opts) {
db_opts.set(&rocksdb::DBOptions::create_if_missing, true);
@@ -100,20 +109,19 @@ void RocksDbPersistableKeyValueStoreService::onEnable() {
return;
}
- if (always_persist_) {
+ if (auto_persistor_.isAlwaysPersisting()) {
default_write_options.sync = true;
}
- logger_->log_trace("Enabled RocksDbPersistableKeyValueStoreService");
+ logger_->log_trace("Enabled RocksDbStateStorage");
}
-void RocksDbPersistableKeyValueStoreService::notifyStop() {
- AbstractAutoPersistingKeyValueStoreService::notifyStop();
-
+void RocksDbStateStorage::notifyStop() {
+ auto_persistor_.stop();
db_.reset();
}
-bool RocksDbPersistableKeyValueStoreService::set(const std::string& key, const std::string& value) {
+bool RocksDbStateStorage::set(const std::string& key, const std::string& value) {
if (!db_) {
return false;
}
@@ -129,7 +137,7 @@ bool RocksDbPersistableKeyValueStoreService::set(const std::string& key, const s
return true;
}
-bool RocksDbPersistableKeyValueStoreService::get(const std::string& key, std::string& value) {
+bool RocksDbStateStorage::get(const std::string& key, std::string& value) {
if (!db_) {
return false;
}
@@ -149,7 +157,7 @@ bool RocksDbPersistableKeyValueStoreService::get(const std::string& key, std::st
return true;
}
-bool RocksDbPersistableKeyValueStoreService::get(std::unordered_map<std::string, std::string>& kvs) {
+bool RocksDbStateStorage::get(std::unordered_map<std::string, std::string>& kvs) {
if (!db_) {
return false;
}
@@ -169,7 +177,7 @@ bool RocksDbPersistableKeyValueStoreService::get(std::unordered_map<std::string,
return true;
}
-bool RocksDbPersistableKeyValueStoreService::remove(const std::string& key) {
+bool RocksDbStateStorage::remove(const std::string& key) {
if (!db_) {
return false;
}
@@ -185,7 +193,7 @@ bool RocksDbPersistableKeyValueStoreService::remove(const std::string& key) {
return true;
}
-bool RocksDbPersistableKeyValueStoreService::clear() {
+bool RocksDbStateStorage::clear() {
if (!db_) {
return false;
}
@@ -208,7 +216,7 @@ bool RocksDbPersistableKeyValueStoreService::clear() {
return true;
}
-bool RocksDbPersistableKeyValueStoreService::update(const std::string& /*key*/, const std::function<bool(bool /*exists*/, std::string& /*value*/)>& /*update_func*/) {
+bool RocksDbStateStorage::update(const std::string& /*key*/, const std::function<bool(bool /*exists*/, std::string& /*value*/)>& /*update_func*/) {
if (!db_) {
return false;
}
@@ -219,7 +227,7 @@ bool RocksDbPersistableKeyValueStoreService::update(const std::string& /*key*/,
throw std::logic_error("Unsupported method");
}
-bool RocksDbPersistableKeyValueStoreService::persist() {
+bool RocksDbStateStorage::persistNonVirtual() {
if (!db_) {
return false;
}
@@ -227,12 +235,12 @@ bool RocksDbPersistableKeyValueStoreService::persist() {
if (!opendb) {
return false;
}
- if (always_persist_) {
+ if (auto_persistor_.isAlwaysPersisting()) {
return true;
}
return opendb->FlushWAL(true /*sync*/).ok();
}
-REGISTER_RESOURCE_AS(RocksDbPersistableKeyValueStoreService, ControllerService, ("RocksDbPersistableKeyValueStoreService", "rocksdbpersistablekeyvaluestoreservice"));
+REGISTER_RESOURCE_AS(RocksDbStateStorage, ControllerService, ("RocksDbPersistableKeyValueStoreService", "rocksdbpersistablekeyvaluestoreservice", "RocksDbStateStorage"));
} // namespace org::apache::nifi::minifi::controllers
diff --git a/extensions/rocksdb-repos/controllers/RocksDbPersistableKeyValueStoreService.h b/extensions/rocksdb-repos/controllers/RocksDbStateStorage.h
similarity index 74%
rename from extensions/rocksdb-repos/controllers/RocksDbPersistableKeyValueStoreService.h
rename to extensions/rocksdb-repos/controllers/RocksDbStateStorage.h
index 4c6a9a2f7..69e290ee4 100644
--- a/extensions/rocksdb-repos/controllers/RocksDbPersistableKeyValueStoreService.h
+++ b/extensions/rocksdb-repos/controllers/RocksDbStateStorage.h
@@ -20,7 +20,8 @@
#include <string>
#include <memory>
-#include "controllers/keyvalue/AbstractAutoPersistingKeyValueStoreService.h"
+#include "controllers/keyvalue/AutoPersistor.h"
+#include "controllers/keyvalue/KeyValueStateStorage.h"
#include "core/Core.h"
#include "core/logging/Logger.h"
#include "core/logging/LoggerConfiguration.h"
@@ -30,27 +31,28 @@
namespace org::apache::nifi::minifi::controllers {
-class RocksDbPersistableKeyValueStoreService : public AbstractAutoPersistingKeyValueStoreService {
+class RocksDbStateStorage : public KeyValueStateStorage {
public:
- static constexpr const char* ENCRYPTION_KEY_NAME = "nifi.state.management.provider.local.encryption.key";
+ static constexpr const char* ENCRYPTION_KEY_NAME = "nifi.state.storage.local.encryption.key";
+ static constexpr const char* ENCRYPTION_KEY_NAME_OLD = "nifi.state.management.provider.local.encryption.key";
- explicit RocksDbPersistableKeyValueStoreService(std::string name, const utils::Identifier& uuid = {});
+ explicit RocksDbStateStorage(const std::string& name, const utils::Identifier& uuid = {});
- ~RocksDbPersistableKeyValueStoreService() override = default;
+ ~RocksDbStateStorage() override;
+
+ EXTENSIONAPI static constexpr const char* Description = "A state storage service implemented by RocksDB";
- EXTENSIONAPI static constexpr const char* Description = "A key-value service implemented by RocksDB";
- EXTENSIONAPI static const core::Property LinkedServices;
EXTENSIONAPI static const core::Property AlwaysPersist;
EXTENSIONAPI static const core::Property AutoPersistenceInterval;
EXTENSIONAPI static const core::Property Directory;
static auto properties() {
return std::array{
- LinkedServices,
AlwaysPersist,
AutoPersistenceInterval,
Directory
};
}
+
EXTENSIONAPI static constexpr bool SupportsDynamicProperties = false;
ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_CONTROLLER_SERVICES
@@ -59,27 +61,25 @@ class RocksDbPersistableKeyValueStoreService : public AbstractAutoPersistingKeyV
void notifyStop() override;
bool set(const std::string& key, const std::string& value) override;
-
bool get(const std::string& key, std::string& value) override;
-
bool get(std::unordered_map<std::string, std::string>& kvs) override;
-
bool remove(const std::string& key) override;
-
bool clear() override;
-
bool update(const std::string& key, const std::function<bool(bool /*exists*/, std::string& /*value*/)>& update_func) override;
+ bool persist() override {
+ return persistNonVirtual();
+ }
- bool persist() override;
+ private:
+ // non-virtual to allow calling on AutoPersistor's thread during destruction
+ bool persistNonVirtual();
- protected:
std::string directory_;
-
std::unique_ptr<minifi::internal::RocksDatabase> db_;
rocksdb::WriteOptions default_write_options;
+ AutoPersistor auto_persistor_;
- private:
- std::shared_ptr<core::logging::Logger> logger_ = core::logging::LoggerFactory<RocksDbPersistableKeyValueStoreService>::getLogger();
+ std::shared_ptr<core::logging::Logger> logger_ = core::logging::LoggerFactory<RocksDbStateStorage>::getLogger();
};
} // namespace org::apache::nifi::minifi::controllers
diff --git a/extensions/sftp/processors/ListSFTP.h b/extensions/sftp/processors/ListSFTP.h
index 85e8cea39..d4725b43d 100644
--- a/extensions/sftp/processors/ListSFTP.h
+++ b/extensions/sftp/processors/ListSFTP.h
@@ -33,7 +33,7 @@
#include "core/Property.h"
#include "utils/ArrayUtils.h"
#include "utils/Id.h"
-#include "controllers/keyvalue/PersistableKeyValueStoreService.h"
+#include "controllers/keyvalue/KeyValueStateStorage.h"
#include "utils/RegexUtils.h"
namespace org::apache::nifi::minifi::processors {
@@ -119,7 +119,7 @@ class ListSFTP : public SFTPProcessorBase {
void onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) override;
private:
- core::CoreComponentStateManager* state_manager_{};
+ core::StateManager* state_manager_{};
std::string listing_strategy_;
bool search_recursively_{};
bool follow_symlink_{};
diff --git a/extensions/sql/processors/QueryDatabaseTable.h b/extensions/sql/processors/QueryDatabaseTable.h
index f510e0953..766325394 100644
--- a/extensions/sql/processors/QueryDatabaseTable.h
+++ b/extensions/sql/processors/QueryDatabaseTable.h
@@ -83,7 +83,7 @@ class QueryDatabaseTable: public SQLProcessor, public FlowFileSource {
bool saveState();
- core::CoreComponentStateManager* state_manager_{};
+ core::StateManager* state_manager_{};
std::string table_name_;
std::unordered_set<sql::SQLColumnIdentifier> return_columns_;
std::string queried_columns_;
diff --git a/extensions/standard-processors/controllers/InMemoryKeyValueStorage.cpp b/extensions/standard-processors/controllers/InMemoryKeyValueStorage.cpp
new file mode 100644
index 000000000..55bc50dca
--- /dev/null
+++ b/extensions/standard-processors/controllers/InMemoryKeyValueStorage.cpp
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "InMemoryKeyValueStorage.h"
+
+namespace org::apache::nifi::minifi::controllers {
+
+bool InMemoryKeyValueStorage::set(const std::string& key, const std::string& value) {
+ map_[key] = value;
+ return true;
+}
+
+bool InMemoryKeyValueStorage::get(const std::string& key, std::string& value) {
+ auto it = map_.find(key);
+ if (it == map_.end()) {
+ return false;
+ } else {
+ value = it->second;
+ return true;
+ }
+}
+
+bool InMemoryKeyValueStorage::get(std::unordered_map<std::string, std::string>& kvs) {
+ kvs = map_;
+ return true;
+}
+
+bool InMemoryKeyValueStorage::remove(const std::string& key) {
+ return map_.erase(key) == 1U;
+}
+
+bool InMemoryKeyValueStorage::clear() {
+ map_.clear();
+ return true;
+}
+
+bool InMemoryKeyValueStorage::update(const std::string& key, const std::function<bool(bool /*exists*/, std::string& /*value*/)>& update_func) {
+ bool exists = false;
+ std::string value;
+ auto it = map_.find(key);
+ if (it != map_.end()) {
+ exists = true;
+ value = it->second;
+ }
+ try {
+ if (!update_func(exists, value)) {
+ return false;
+ }
+ } catch (const std::exception& e) {
+ logger_->log_error("update_func failed with an exception: %s", e.what());
+ return false;
+ } catch (...) {
+ logger_->log_error("update_func failed with an exception");
+ return false;
+ }
+ if (!exists) {
+ it = map_.emplace(key, "").first;
+ }
+ it->second = std::move(value);
+ return true;
+}
+
+} // namespace org::apache::nifi::minifi::controllers
diff --git a/libminifi/include/controllers/keyvalue/AbstractAutoPersistingKeyValueStoreService.h b/extensions/standard-processors/controllers/InMemoryKeyValueStorage.h
similarity index 52%
copy from libminifi/include/controllers/keyvalue/AbstractAutoPersistingKeyValueStoreService.h
copy to extensions/standard-processors/controllers/InMemoryKeyValueStorage.h
index 35a87bb03..1ca503689 100644
--- a/libminifi/include/controllers/keyvalue/AbstractAutoPersistingKeyValueStoreService.h
+++ b/extensions/standard-processors/controllers/InMemoryKeyValueStorage.h
@@ -14,49 +14,34 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-#pragma once
+#pragma once
-#include <string>
-#include <thread>
-#include <mutex>
+#include <functional>
#include <memory>
+#include <string>
+#include <unordered_map>
#include <utility>
-#include "PersistableKeyValueStoreService.h"
-#include "core/Core.h"
-#include "properties/Configure.h"
#include "core/logging/Logger.h"
#include "core/logging/LoggerFactory.h"
-#include "utils/Export.h"
namespace org::apache::nifi::minifi::controllers {
-class AbstractAutoPersistingKeyValueStoreService : virtual public PersistableKeyValueStoreService {
+class InMemoryKeyValueStorage {
public:
- explicit AbstractAutoPersistingKeyValueStoreService(std::string name, const utils::Identifier& uuid = {});
-
- ~AbstractAutoPersistingKeyValueStoreService() override;
-
- static constexpr const char* AlwaysPersistPropertyName = "Always Persist";
- static constexpr const char* AutoPersistenceIntervalPropertyName = "Auto Persistence Interval";
+ InMemoryKeyValueStorage() = default;
+ explicit InMemoryKeyValueStorage(std::unordered_map<std::string, std::string> contents) : map_{std::move(contents)} {}
- void onEnable() override;
- void notifyStop() override;
-
- protected:
- bool always_persist_;
- std::chrono::milliseconds auto_persistence_interval_;
-
- std::thread persisting_thread_;
- bool running_;
- std::mutex persisting_mutex_;
- std::condition_variable persisting_cv_;
- void persistingThreadFunc();
+ bool set(const std::string& key, const std::string& value);
+ bool get(const std::string& key, std::string& value);
+ bool get(std::unordered_map<std::string, std::string>& kvs);
+ bool remove(const std::string& key);
+ bool clear();
+ bool update(const std::string& key, const std::function<bool(bool /*exists*/, std::string& /*value*/)>& update_func);
private:
- std::shared_ptr<core::logging::Logger> logger_ = core::logging::LoggerFactory<AbstractAutoPersistingKeyValueStoreService>::getLogger();
-
- void stopPersistingThread();
+ std::unordered_map<std::string, std::string> map_;
+ std::shared_ptr<core::logging::Logger> logger_ = core::logging::LoggerFactory<InMemoryKeyValueStorage>::getLogger();
};
} // namespace org::apache::nifi::minifi::controllers
diff --git a/extensions/standard-processors/controllers/UnorderedMapPersistableKeyValueStoreService.cpp b/extensions/standard-processors/controllers/PersistentMapStateStorage.cpp
similarity index 59%
rename from extensions/standard-processors/controllers/UnorderedMapPersistableKeyValueStoreService.cpp
rename to extensions/standard-processors/controllers/PersistentMapStateStorage.cpp
index 309a53a71..d771a53b5 100644
--- a/extensions/standard-processors/controllers/UnorderedMapPersistableKeyValueStoreService.cpp
+++ b/extensions/standard-processors/controllers/PersistentMapStateStorage.cpp
@@ -15,8 +15,9 @@
* limitations under the License.
*/
-#include "UnorderedMapPersistableKeyValueStoreService.h"
+#include "PersistentMapStateStorage.h"
+#include <cinttypes>
#include <fstream>
#include <set>
@@ -50,46 +51,39 @@ namespace {
namespace org::apache::nifi::minifi::controllers {
-const core::Property UnorderedMapPersistableKeyValueStoreService::LinkedServices(
- core::PropertyBuilder::createProperty("Linked Services")
- ->withDescription("Referenced Controller Services")
- ->build());
-const core::Property UnorderedMapPersistableKeyValueStoreService::AlwaysPersist(
- core::PropertyBuilder::createProperty(AbstractAutoPersistingKeyValueStoreService::AlwaysPersistPropertyName)
+const core::Property PersistentMapStateStorage::AlwaysPersist(
+ core::PropertyBuilder::createProperty(ALWAYS_PERSIST_PROPERTY_NAME)
->withDescription("Persist every change instead of persisting it periodically.")
->isRequired(false)
->withDefaultValue<bool>(false)
->build());
-const core::Property UnorderedMapPersistableKeyValueStoreService::AutoPersistenceInterval(
- core::PropertyBuilder::createProperty(AbstractAutoPersistingKeyValueStoreService::AutoPersistenceIntervalPropertyName)
+const core::Property PersistentMapStateStorage::AutoPersistenceInterval(
+ core::PropertyBuilder::createProperty(AUTO_PERSISTENCE_INTERVAL_PROPERTY_NAME)
->withDescription("The interval of the periodic task persisting all values. Only used if Always Persist is false. If set to 0 seconds, auto persistence will be disabled.")
->isRequired(false)
->withDefaultValue<core::TimePeriodValue>("1 min")
->build());
-const core::Property UnorderedMapPersistableKeyValueStoreService::File(
+const core::Property PersistentMapStateStorage::File(
core::PropertyBuilder::createProperty("File")
->withDescription("Path to a file to store state")
->isRequired(true)
->build());
-UnorderedMapPersistableKeyValueStoreService::UnorderedMapPersistableKeyValueStoreService(std::string name, const utils::Identifier& uuid /*= utils::Identifier()*/)
- : PersistableKeyValueStoreService(name, uuid)
- , AbstractAutoPersistingKeyValueStoreService(name, uuid)
- , UnorderedMapKeyValueStoreService(std::move(name), uuid) {
+PersistentMapStateStorage::PersistentMapStateStorage(const std::string& name, const utils::Identifier& uuid /*= utils::Identifier()*/)
+ : KeyValueStateStorage(name, uuid) {
}
-UnorderedMapPersistableKeyValueStoreService::UnorderedMapPersistableKeyValueStoreService(std::string name, const std::shared_ptr<Configure> &configuration)
- : PersistableKeyValueStoreService(name)
- , AbstractAutoPersistingKeyValueStoreService(name)
- , UnorderedMapKeyValueStoreService(std::move(name)) {
+PersistentMapStateStorage::PersistentMapStateStorage(const std::string& name, const std::shared_ptr<Configure> &configuration)
+ : KeyValueStateStorage(name) {
setConfiguration(configuration);
}
-UnorderedMapPersistableKeyValueStoreService::~UnorderedMapPersistableKeyValueStoreService() {
+PersistentMapStateStorage::~PersistentMapStateStorage() {
+ auto_persistor_.stop();
persistNonVirtual();
}
-bool UnorderedMapPersistableKeyValueStoreService::parseLine(const std::string& line, std::string& key, std::string& value) {
+bool PersistentMapStateStorage::parseLine(const std::string& line, std::string& key, std::string& value) {
std::stringstream key_ss;
std::stringstream value_ss;
bool in_escape_sequence = false;
@@ -144,18 +138,24 @@ bool UnorderedMapPersistableKeyValueStoreService::parseLine(const std::string& l
return true;
}
-void UnorderedMapPersistableKeyValueStoreService::initialize() {
- // UnorderedMapKeyValueStoreService::initialize() also calls setSupportedProperties, and we don't want that
+void PersistentMapStateStorage::initialize() {
+ // VolatileMapStateStorage::initialize() also calls setSupportedProperties, and we don't want that
ControllerService::initialize(); // NOLINT(bugprone-parent-virtual-call)
setSupportedProperties(properties());
}
-void UnorderedMapPersistableKeyValueStoreService::onEnable() {
+void PersistentMapStateStorage::onEnable() {
if (configuration_ == nullptr) {
- logger_->log_debug("Cannot enable UnorderedMapPersistableKeyValueStoreService");
+ logger_->log_debug("Cannot enable PersistentMapStateStorage");
return;
}
+ const auto always_persist = getProperty<bool>(AlwaysPersist.getName()).value_or(false);
+ logger_->log_info("Always Persist property: %s", always_persist ? "true" : "false");
+
+ const auto auto_persistence_interval = getProperty<core::TimePeriodValue>(AutoPersistenceInterval.getName()).value_or(core::TimePeriodValue{}).getMilliseconds();
+ logger_->log_info("Auto Persistence Interval property: %" PRId64 " ms", auto_persistence_interval.count());
+
if (!getProperty(File.getName(), file_)) {
logger_->log_error("Invalid or missing property: File");
return;
@@ -164,68 +164,85 @@ void UnorderedMapPersistableKeyValueStoreService::onEnable() {
/* We must not start the persistence thread until we attempted to load the state */
load();
- AbstractAutoPersistingKeyValueStoreService::onEnable();
+ auto_persistor_.start(always_persist, auto_persistence_interval, [this] { return persistNonVirtual(); });
- logger_->log_trace("Enabled UnorderedMapPersistableKeyValueStoreService");
+ logger_->log_trace("Enabled PersistentMapStateStorage");
}
-void UnorderedMapPersistableKeyValueStoreService::notifyStop() {
- AbstractAutoPersistingKeyValueStoreService::notifyStop();
+void PersistentMapStateStorage::notifyStop() {
+ auto_persistor_.stop();
persist();
}
-bool UnorderedMapPersistableKeyValueStoreService::set(const std::string& key, const std::string& value) {
- std::lock_guard<std::recursive_mutex> lock(mutex_);
- bool res = UnorderedMapKeyValueStoreService::set(key, value);
- if (always_persist_ && res) {
+bool PersistentMapStateStorage::set(const std::string& key, const std::string& value) {
+ std::lock_guard<std::mutex> lock(mutex_);
+ bool res = storage_.set(key, value);
+ if (auto_persistor_.isAlwaysPersisting() && res) {
return persist();
}
return res;
}
-bool UnorderedMapPersistableKeyValueStoreService::remove(const std::string& key) {
- std::lock_guard<std::recursive_mutex> lock(mutex_);
- bool res = UnorderedMapKeyValueStoreService::remove(key);
- if (always_persist_ && res) {
+bool PersistentMapStateStorage::get(const std::string& key, std::string& value) {
+ std::lock_guard<std::mutex> lock(mutex_);
+ return storage_.get(key, value);
+}
+
+bool PersistentMapStateStorage::get(std::unordered_map<std::string, std::string>& kvs) {
+ std::lock_guard<std::mutex> lock(mutex_);
+ return storage_.get(kvs);
+}
+
+bool PersistentMapStateStorage::remove(const std::string& key) {
+ std::lock_guard<std::mutex> lock(mutex_);
+ bool res = storage_.remove(key);
+ if (auto_persistor_.isAlwaysPersisting() && res) {
return persist();
}
return res;
}
-bool UnorderedMapPersistableKeyValueStoreService::clear() {
- std::lock_guard<std::recursive_mutex> lock(mutex_);
- bool res = UnorderedMapKeyValueStoreService::clear();
- if (always_persist_ && res) {
+bool PersistentMapStateStorage::clear() {
+ std::lock_guard<std::mutex> lock(mutex_);
+ bool res = storage_.clear();
+ if (auto_persistor_.isAlwaysPersisting() && res) {
return persist();
}
return res;
}
-bool UnorderedMapPersistableKeyValueStoreService::update(const std::string& key, const std::function<bool(bool /*exists*/, std::string& /*value*/)>& update_func) {
- std::lock_guard<std::recursive_mutex> lock(mutex_);
- bool res = UnorderedMapKeyValueStoreService::update(key, update_func);
- if (always_persist_ && res) {
+bool PersistentMapStateStorage::update(const std::string& key, const std::function<bool(bool /*exists*/, std::string& /*value*/)>& update_func) {
+ std::lock_guard<std::mutex> lock(mutex_);
+ bool res = storage_.update(key, update_func);
+ if (auto_persistor_.isAlwaysPersisting() && res) {
return persist();
}
return res;
}
-bool UnorderedMapPersistableKeyValueStoreService::persistNonVirtual() {
- std::lock_guard<std::recursive_mutex> lock(mutex_);
+bool PersistentMapStateStorage::persistNonVirtual() {
+ std::lock_guard<std::mutex> lock(mutex_);
std::ofstream ofs(file_);
if (!ofs.is_open()) {
logger_->log_error("Failed to open file \"%s\" to store state", file_.c_str());
return false;
}
+ std::unordered_map<std::string, std::string> storage_copy;
+ if (!storage_.get(storage_copy)) {
+ logger_->log_error("Could not read the contents of the in-memory storage");
+ return false;
+ }
+
ofs << escape(FORMAT_VERSION_KEY) << "=" << escape(std::to_string(FORMAT_VERSION)) << "\n";
- for (const auto& kv : map_) {
+
+ for (const auto& kv : storage_copy) {
ofs << escape(kv.first) << "=" << escape(kv.second) << "\n";
}
return true;
}
-bool UnorderedMapPersistableKeyValueStoreService::load() {
- std::lock_guard<std::recursive_mutex> lock(mutex_);
+bool PersistentMapStateStorage::load() {
+ std::lock_guard<std::mutex> lock(mutex_);
std::ifstream ifs(file_);
if (!ifs.is_open()) {
logger_->log_debug("Failed to open file \"%s\" to load state", file_.c_str());
@@ -255,11 +272,12 @@ bool UnorderedMapPersistableKeyValueStoreService::load() {
map[key] = value;
}
}
- map_ = std::move(map);
+
+ storage_ = InMemoryKeyValueStorage{std::move(map)};
logger_->log_debug("Loaded state from \"%s\"", file_.c_str());
return true;
}
-REGISTER_RESOURCE(UnorderedMapPersistableKeyValueStoreService, ControllerService);
+REGISTER_RESOURCE_AS(PersistentMapStateStorage, ControllerService, ("UnorderedMapPersistableKeyValueStoreService", "PersistentMapStateStorage"));
} // namespace org::apache::nifi::minifi::controllers
diff --git a/extensions/standard-processors/controllers/UnorderedMapPersistableKeyValueStoreService.h b/extensions/standard-processors/controllers/PersistentMapStateStorage.h
similarity index 63%
rename from extensions/standard-processors/controllers/UnorderedMapPersistableKeyValueStoreService.h
rename to extensions/standard-processors/controllers/PersistentMapStateStorage.h
index d0675c25f..8d4496418 100644
--- a/extensions/standard-processors/controllers/UnorderedMapPersistableKeyValueStoreService.h
+++ b/extensions/standard-processors/controllers/PersistentMapStateStorage.h
@@ -22,37 +22,36 @@
#include <memory>
#include <utility>
-#include "controllers/keyvalue/AbstractAutoPersistingKeyValueStoreService.h"
-#include "UnorderedMapKeyValueStoreService.h"
+#include "controllers/keyvalue/AutoPersistor.h"
#include "core/Core.h"
#include "properties/Configure.h"
+#include "InMemoryKeyValueStorage.h"
+#include "controllers/keyvalue/KeyValueStateStorage.h"
#include "core/logging/Logger.h"
#include "core/logging/LoggerConfiguration.h"
namespace org::apache::nifi::minifi::controllers {
-class UnorderedMapPersistableKeyValueStoreService : public AbstractAutoPersistingKeyValueStoreService,
- public UnorderedMapKeyValueStoreService {
+class PersistentMapStateStorage : public KeyValueStateStorage {
public:
- explicit UnorderedMapPersistableKeyValueStoreService(std::string name, const utils::Identifier& uuid = {});
- explicit UnorderedMapPersistableKeyValueStoreService(std::string name, const std::shared_ptr<Configure>& configuration);
+ explicit PersistentMapStateStorage(const std::string& name, const utils::Identifier& uuid = {});
+ explicit PersistentMapStateStorage(const std::string& name, const std::shared_ptr<Configure>& configuration);
- ~UnorderedMapPersistableKeyValueStoreService() override;
+ ~PersistentMapStateStorage() override;
+
+ EXTENSIONAPI static constexpr const char* Description = "A persistable state storage service implemented by a locked std::unordered_map<std::string, std::string> and persisted into a file";
- EXTENSIONAPI static constexpr const char* Description = "A persistable key-value service implemented by a locked std::unordered_map<std::string, std::string> and persisted into a file";
- EXTENSIONAPI static const core::Property LinkedServices;
EXTENSIONAPI static const core::Property AlwaysPersist;
EXTENSIONAPI static const core::Property AutoPersistenceInterval;
- EXTENSIONAPI static const core::Property Directory;
EXTENSIONAPI static const core::Property File;
static auto properties() {
return std::array{
- LinkedServices,
AlwaysPersist,
AutoPersistenceInterval,
File
};
}
+
EXTENSIONAPI static constexpr bool SupportsDynamicProperties = false;
ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_CONTROLLER_SERVICES
@@ -61,39 +60,31 @@ class UnorderedMapPersistableKeyValueStoreService : public AbstractAutoPersistin
void notifyStop() override;
bool set(const std::string& key, const std::string& value) override;
-
+ bool get(const std::string& key, std::string& value) override;
+ bool get(std::unordered_map<std::string, std::string>& kvs) override;
bool remove(const std::string& key) override;
-
bool clear() override;
-
bool update(const std::string& key, const std::function<bool(bool /*exists*/, std::string& /*value*/)>& update_func) override;
bool persist() override {
return persistNonVirtual();
}
- protected:
- using AbstractAutoPersistingKeyValueStoreService::getImpl;
- using AbstractAutoPersistingKeyValueStoreService::setImpl;
- using AbstractAutoPersistingKeyValueStoreService::persistImpl;
- using AbstractAutoPersistingKeyValueStoreService::removeImpl;
-
-
+ private:
static constexpr const char* FORMAT_VERSION_KEY = "__UnorderedMapPersistableKeyValueStoreService_FormatVersion";
static constexpr int FORMAT_VERSION = 1;
- std::string file_;
-
bool load();
-
bool parseLine(const std::string& line, std::string& key, std::string& value);
- private:
+ // non-virtual to allow calling in destructor
bool persistNonVirtual();
- std::shared_ptr<core::logging::Logger> logger_ = core::logging::LoggerFactory<UnorderedMapPersistableKeyValueStoreService>::getLogger();
+ std::mutex mutex_;
+ std::string file_;
+ InMemoryKeyValueStorage storage_;
+ AutoPersistor auto_persistor_;
+ std::shared_ptr<core::logging::Logger> logger_ = core::logging::LoggerFactory<PersistentMapStateStorage>::getLogger();
};
-static_assert(std::is_convertible<UnorderedMapKeyValueStoreService*, PersistableKeyValueStoreService*>::value, "UnorderedMapKeyValueStoreService is a PersistableKeyValueStoreService");
-
} // namespace org::apache::nifi::minifi::controllers
diff --git a/extensions/standard-processors/controllers/UnorderedMapKeyValueStoreService.cpp b/extensions/standard-processors/controllers/UnorderedMapKeyValueStoreService.cpp
deleted file mode 100644
index ffbd1b241..000000000
--- a/extensions/standard-processors/controllers/UnorderedMapKeyValueStoreService.cpp
+++ /dev/null
@@ -1,108 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include "UnorderedMapKeyValueStoreService.h"
-#include "core/PropertyBuilder.h"
-#include "core/Resource.h"
-
-namespace org::apache::nifi::minifi::controllers {
-
-const core::Property UnorderedMapKeyValueStoreService::LinkedServices(
- core::PropertyBuilder::createProperty("Linked Services")
- ->withDescription("Referenced Controller Services")
- ->build());
-
-UnorderedMapKeyValueStoreService::UnorderedMapKeyValueStoreService(std::string name, const utils::Identifier& uuid /*= utils::Identifier()*/)
- : PersistableKeyValueStoreService(std::move(name), uuid) {
-}
-
-UnorderedMapKeyValueStoreService::UnorderedMapKeyValueStoreService(std::string name, const std::shared_ptr<Configure> &configuration)
- : PersistableKeyValueStoreService(std::move(name)) {
- setConfiguration(configuration);
-}
-
-UnorderedMapKeyValueStoreService::~UnorderedMapKeyValueStoreService() = default;
-
-bool UnorderedMapKeyValueStoreService::set(const std::string& key, const std::string& value) {
- std::lock_guard<std::recursive_mutex> lock(mutex_);
- map_[key] = value;
- return true;
-}
-
-bool UnorderedMapKeyValueStoreService::get(const std::string& key, std::string& value) {
- std::lock_guard<std::recursive_mutex> lock(mutex_);
- auto it = map_.find(key);
- if (it == map_.end()) {
- return false;
- } else {
- value = it->second;
- return true;
- }
-}
-
-bool UnorderedMapKeyValueStoreService::get(std::unordered_map<std::string, std::string>& kvs) {
- std::lock_guard<std::recursive_mutex> lock(mutex_);
- kvs = map_;
- return true;
-}
-
-bool UnorderedMapKeyValueStoreService::remove(const std::string& key) {
- std::lock_guard<std::recursive_mutex> lock(mutex_);
- return map_.erase(key) == 1U;
-}
-
-bool UnorderedMapKeyValueStoreService::clear() {
- std::lock_guard<std::recursive_mutex> lock(mutex_);
- map_.clear();
- return true;
-}
-
-void UnorderedMapKeyValueStoreService::initialize() {
- ControllerService::initialize();
- setSupportedProperties(properties());
-}
-
-bool UnorderedMapKeyValueStoreService::update(const std::string& key, const std::function<bool(bool /*exists*/, std::string& /*value*/)>& update_func) {
- std::lock_guard<std::recursive_mutex> lock(mutex_);
- bool exists = false;
- std::string value;
- auto it = map_.find(key);
- if (it != map_.end()) {
- exists = true;
- value = it->second;
- }
- try {
- if (!update_func(exists, value)) {
- return false;
- }
- } catch (const std::exception& e) {
- logger_->log_error("update_func failed with an exception: %s", e.what());
- return false;
- } catch (...) {
- logger_->log_error("update_func failed with an exception");
- return false;
- }
- if (!exists) {
- it = map_.emplace(key, "").first;
- }
- it->second = std::move(value);
- return true;
-}
-
-REGISTER_RESOURCE(UnorderedMapKeyValueStoreService, ControllerService);
-
-} // namespace org::apache::nifi::minifi::controllers
diff --git a/extensions/standard-processors/controllers/VolatileMapStateStorage.cpp b/extensions/standard-processors/controllers/VolatileMapStateStorage.cpp
new file mode 100644
index 000000000..40608cf13
--- /dev/null
+++ b/extensions/standard-processors/controllers/VolatileMapStateStorage.cpp
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "VolatileMapStateStorage.h"
+#include "core/PropertyBuilder.h"
+#include "core/Resource.h"
+
+namespace org::apache::nifi::minifi::controllers {
+
+const core::Property VolatileMapStateStorage::LinkedServices(
+ core::PropertyBuilder::createProperty("Linked Services")
+ ->withDescription("Referenced Controller Services")
+ ->build());
+
+VolatileMapStateStorage::VolatileMapStateStorage(const std::string& name, const utils::Identifier& uuid /*= utils::Identifier()*/)
+ : KeyValueStateStorage(name, uuid) {
+}
+
+VolatileMapStateStorage::VolatileMapStateStorage(const std::string& name, const std::shared_ptr<Configure> &configuration)
+ : KeyValueStateStorage(name) {
+ setConfiguration(configuration);
+}
+
+void VolatileMapStateStorage::initialize() {
+ ControllerService::initialize();
+ setSupportedProperties(properties());
+}
+
+bool VolatileMapStateStorage::set(const std::string& key, const std::string& value) {
+ std::lock_guard<std::mutex> lock(mutex_);
+ return storage_.set(key, value);
+}
+
+bool VolatileMapStateStorage::get(const std::string& key, std::string& value) {
+ std::lock_guard<std::mutex> lock(mutex_);
+ return storage_.get(key, value);
+}
+
+bool VolatileMapStateStorage::get(std::unordered_map<std::string, std::string>& kvs) {
+ std::lock_guard<std::mutex> lock(mutex_);
+ return storage_.get(kvs);
+}
+
+bool VolatileMapStateStorage::remove(const std::string& key) {
+ std::lock_guard<std::mutex> lock(mutex_);
+ return storage_.remove(key);
+}
+
+bool VolatileMapStateStorage::clear() {
+ std::lock_guard<std::mutex> lock(mutex_);
+ return storage_.clear();
+}
+
+bool VolatileMapStateStorage::update(const std::string& key, const std::function<bool(bool /*exists*/, std::string& /*value*/)>& update_func) {
+ std::lock_guard<std::mutex> lock(mutex_);
+ return storage_.update(key, update_func);
+}
+
+REGISTER_RESOURCE_AS(VolatileMapStateStorage, ControllerService, ("UnorderedMapKeyValueStoreService", "VolatileMapStateStorage"));
+
+} // namespace org::apache::nifi::minifi::controllers
diff --git a/extensions/standard-processors/controllers/UnorderedMapKeyValueStoreService.h b/extensions/standard-processors/controllers/VolatileMapStateStorage.h
similarity index 75%
rename from extensions/standard-processors/controllers/UnorderedMapKeyValueStoreService.h
rename to extensions/standard-processors/controllers/VolatileMapStateStorage.h
index baa898daf..952f858f3 100644
--- a/extensions/standard-processors/controllers/UnorderedMapKeyValueStoreService.h
+++ b/extensions/standard-processors/controllers/VolatileMapStateStorage.h
@@ -22,22 +22,20 @@
#include <memory>
#include <utility>
-#include "controllers/keyvalue/KeyValueStoreService.h"
#include "core/Core.h"
#include "properties/Configure.h"
#include "core/logging/Logger.h"
#include "core/logging/LoggerConfiguration.h"
-#include "controllers/keyvalue/PersistableKeyValueStoreService.h"
+#include "controllers/keyvalue/KeyValueStateStorage.h"
+#include "InMemoryKeyValueStorage.h"
namespace org::apache::nifi::minifi::controllers {
-/// Key-value store service purely in RAM without disk usage
-class UnorderedMapKeyValueStoreService : virtual public PersistableKeyValueStoreService {
+/// Key-value state storage purely in RAM without disk usage
+class VolatileMapStateStorage : virtual public KeyValueStateStorage {
public:
- explicit UnorderedMapKeyValueStoreService(std::string name, const utils::Identifier& uuid = {});
- explicit UnorderedMapKeyValueStoreService(std::string name, const std::shared_ptr<Configure>& configuration);
-
- ~UnorderedMapKeyValueStoreService() override;
+ explicit VolatileMapStateStorage(const std::string& name, const utils::Identifier& uuid = {});
+ explicit VolatileMapStateStorage(const std::string& name, const std::shared_ptr<Configure>& configuration);
EXTENSIONAPI static constexpr const char* Description = "A key-value service implemented by a locked std::unordered_map<std::string, std::string>";
EXTENSIONAPI static const core::Property LinkedServices;
@@ -45,23 +43,23 @@ class UnorderedMapKeyValueStoreService : virtual public PersistableKeyValueStore
EXTENSIONAPI static constexpr bool SupportsDynamicProperties = false;
ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_CONTROLLER_SERVICES
+ void initialize() override;
+
bool set(const std::string& key, const std::string& value) override;
bool get(const std::string& key, std::string& value) override;
bool get(std::unordered_map<std::string, std::string>& kvs) override;
bool remove(const std::string& key) override;
bool clear() override;
- void initialize() override;
bool update(const std::string& key, const std::function<bool(bool /*exists*/, std::string& /*value*/)>& update_func) override;
+
bool persist() override {
return true;
}
- protected:
- std::unordered_map<std::string, std::string> map_;
- std::recursive_mutex mutex_;
-
private:
- std::shared_ptr<core::logging::Logger> logger_ = core::logging::LoggerFactory<UnorderedMapKeyValueStoreService>::getLogger();
+ std::mutex mutex_;
+ InMemoryKeyValueStorage storage_;
+ std::shared_ptr<core::logging::Logger> logger_ = core::logging::LoggerFactory<VolatileMapStateStorage>::getLogger();
};
} // namespace org::apache::nifi::minifi::controllers
diff --git a/extensions/standard-processors/processors/TailFile.h b/extensions/standard-processors/processors/TailFile.h
index 49297c979..286ec6b8b 100644
--- a/extensions/standard-processors/processors/TailFile.h
+++ b/extensions/standard-processors/processors/TailFile.h
@@ -201,7 +201,7 @@ class TailFile : public core::Processor {
static const int BUFFER_SIZE = 512;
std::string delimiter_; // Delimiter for the data incoming from the tailed file.
- core::CoreComponentStateManager* state_manager_ = nullptr;
+ core::StateManager* state_manager_ = nullptr;
std::map<std::filesystem::path, TailState> tail_states_;
Mode tail_mode_ = Mode::UNDEFINED;
std::optional<utils::Regex> pattern_regex_;
diff --git a/extensions/systemd/ConsumeJournald.h b/extensions/systemd/ConsumeJournald.h
index 25a70150c..cebb315d1 100644
--- a/extensions/systemd/ConsumeJournald.h
+++ b/extensions/systemd/ConsumeJournald.h
@@ -29,7 +29,7 @@
#include <utility>
#include <vector>
-#include "core/CoreComponentState.h"
+#include "core/StateManager.h"
#include "core/Processor.h"
#include "core/logging/LoggerConfiguration.h"
#include "libwrapper/LibWrapper.h"
@@ -112,7 +112,7 @@ class ConsumeJournald final : public core::Processor {
std::atomic<bool> running_{false};
std::shared_ptr<core::logging::Logger> logger_ = core::logging::LoggerFactory<ConsumeJournald>::getLogger(uuid_);
- core::CoreComponentStateManager* state_manager_;
+ core::StateManager* state_manager_;
std::unique_ptr<libwrapper::LibWrapper> libwrapper_;
std::unique_ptr<utils::FifoExecutor> worker_;
std::unique_ptr<libwrapper::Journal> journal_;
diff --git a/extensions/windows-event-log/Bookmark.cpp b/extensions/windows-event-log/Bookmark.cpp
index 300aef3c4..74ea2238e 100644
--- a/extensions/windows-event-log/Bookmark.cpp
+++ b/extensions/windows-event-log/Bookmark.cpp
@@ -35,7 +35,7 @@ Bookmark::Bookmark(const std::wstring& channel,
const std::filesystem::path& bookmarkRootDir,
const utils::Identifier& uuid,
bool processOldEvents,
- core::CoreComponentStateManager* state_manager,
+ core::StateManager* state_manager,
std::shared_ptr<core::logging::Logger> logger)
: logger_(std::move(logger)),
state_manager_(state_manager) {
diff --git a/extensions/windows-event-log/Bookmark.h b/extensions/windows-event-log/Bookmark.h
index 66937df4f..c2f07fb81 100644
--- a/extensions/windows-event-log/Bookmark.h
+++ b/extensions/windows-event-log/Bookmark.h
@@ -41,7 +41,7 @@ class Bookmark {
const std::filesystem::path& bookmarkRootDir,
const utils::Identifier& uuid,
bool processOldEvents,
- core::CoreComponentStateManager* state_manager,
+ core::StateManager* state_manager,
std::shared_ptr<core::logging::Logger> logger);
~Bookmark();
explicit operator bool() const noexcept;
@@ -57,7 +57,7 @@ class Bookmark {
using unique_evt_handle = wel::unique_evt_handle;
std::shared_ptr<core::logging::Logger> logger_;
- core::CoreComponentStateManager* state_manager_;
+ core::StateManager* state_manager_;
std::filesystem::path filePath_;
bool ok_{};
unique_evt_handle hBookmark_;
diff --git a/extensions/windows-event-log/ConsumeWindowsEventLog.h b/extensions/windows-event-log/ConsumeWindowsEventLog.h
index e7530d624..ef430204c 100644
--- a/extensions/windows-event-log/ConsumeWindowsEventLog.h
+++ b/extensions/windows-event-log/ConsumeWindowsEventLog.h
@@ -146,7 +146,7 @@ class ConsumeWindowsEventLog : public core::Processor {
const EVT_HANDLE& event_query_results);
std::shared_ptr<core::logging::Logger> logger_;
- core::CoreComponentStateManager* state_manager_{nullptr};
+ core::StateManager* state_manager_{nullptr};
wel::METADATA_NAMES header_names_;
std::optional<std::string> header_delimiter_;
std::string channel_;
diff --git a/extensions/windows-event-log/tests/BookmarkTests.cpp b/extensions/windows-event-log/tests/BookmarkTests.cpp
index eeb2c7b1c..139455d16 100644
--- a/extensions/windows-event-log/tests/BookmarkTests.cpp
+++ b/extensions/windows-event-log/tests/BookmarkTests.cpp
@@ -40,7 +40,7 @@ constexpr DWORD EVT_NEXT_TIMEOUT_MS = 100;
std::unique_ptr<Bookmark> createBookmark(TestPlan &test_plan,
const std::wstring &channel,
const utils::Identifier &uuid,
- core::CoreComponentStateManager* state_manager) {
+ core::StateManager* state_manager) {
const auto logger = test_plan.getLogger();
return std::make_unique<Bookmark>(channel, L"*", "", uuid, false, state_manager, logger);
}
@@ -103,7 +103,7 @@ TEST_CASE("Bookmark constructor works", "[create]") {
const utils::Identifier uuid = IdGenerator::getIdGenerator()->generate();
- auto state_manager = test_plan->getStateManagerProvider()->getCoreComponentStateManager(uuid);
+ auto state_manager = test_plan->getStateStorage()->getStateManager(uuid);
std::unique_ptr<Bookmark> bookmark = createBookmark(*test_plan, APPLICATION_CHANNEL, uuid, state_manager.get());
REQUIRE(bookmark);
REQUIRE(*bookmark);
@@ -120,7 +120,7 @@ TEST_CASE("Bookmark is restored from the state", "[create][state]") {
LogTestController::getInstance().setTrace<TestPlan>();
utils::Identifier uuid = IdGenerator::getIdGenerator()->generate();
- auto state_manager = test_plan->getStateManagerProvider()->getCoreComponentStateManager(uuid);
+ auto state_manager = test_plan->getStateStorage()->getStateManager(uuid);
std::unique_ptr<Bookmark> bookmark_before = createBookmark(*test_plan, APPLICATION_CHANNEL, uuid, state_manager.get());
std::wstring bookmark_xml_before = bookmarkAsXml(bookmark_before);
@@ -139,14 +139,14 @@ TEST_CASE("Bookmark created after a new event is different", "[create][state]")
LogTestController::getInstance().setTrace<TestPlan>();
utils::Identifier uuid_one = IdGenerator::getIdGenerator()->generate();
- auto state_manager_one = test_plan->getStateManagerProvider()->getCoreComponentStateManager(uuid_one);
+ auto state_manager_one = test_plan->getStateStorage()->getStateManager(uuid_one);
std::unique_ptr<Bookmark> bookmark_before = createBookmark(*test_plan, APPLICATION_CHANNEL, uuid_one, state_manager_one.get());
reportEvent(APPLICATION_CHANNEL, "Something interesting happened");
utils::Identifier uuid_two = IdGenerator::getIdGenerator()->generate();
// different uuid, so we get a new, empty, state manager
- auto state_manager_two = test_plan->getStateManagerProvider()->getCoreComponentStateManager(uuid_two);
+ auto state_manager_two = test_plan->getStateStorage()->getStateManager(uuid_two);
std::unique_ptr<Bookmark> bookmark_after = createBookmark(*test_plan, APPLICATION_CHANNEL, uuid_two, state_manager_two.get());
REQUIRE(bookmarkAsXml(bookmark_before) != bookmarkAsXml(bookmark_after));
@@ -158,7 +158,7 @@ TEST_CASE("Bookmark::getBookmarkHandleFromXML() returns the same event from a co
LogTestController::getInstance().setTrace<TestPlan>();
utils::Identifier uuid = IdGenerator::getIdGenerator()->generate();
- auto state_manager = test_plan->getStateManagerProvider()->getCoreComponentStateManager(uuid);
+ auto state_manager = test_plan->getStateStorage()->getStateManager(uuid);
std::unique_ptr<Bookmark> bookmark_one = createBookmark(*test_plan, APPLICATION_CHANNEL, uuid, state_manager.get());
std::unique_ptr<Bookmark> bookmark_two = createBookmark(*test_plan, APPLICATION_CHANNEL, uuid, state_manager.get());
@@ -174,14 +174,14 @@ TEST_CASE("Bookmark::getBookmarkHandleFromXML() returns a different event after
GIVEN("We have two different bookmarks") {
const auto uuid = IdGenerator::getIdGenerator()->generate();
- auto state_manager = test_plan->getStateManagerProvider()->getCoreComponentStateManager(uuid);
+ auto state_manager = test_plan->getStateStorage()->getStateManager(uuid);
std::unique_ptr<Bookmark> bookmark_one = createBookmark(*test_plan, APPLICATION_CHANNEL, uuid, state_manager.get());
std::wstring bookmark_one_xml = bookmarkAsXml(bookmark_one);
reportEvent(APPLICATION_CHANNEL, "Something interesting happened");
const utils::Identifier uuid_two = IdGenerator::getIdGenerator()->generate();
- auto state_manager_two = test_plan->getStateManagerProvider()->getCoreComponentStateManager(uuid_two);
+ auto state_manager_two = test_plan->getStateStorage()->getStateManager(uuid_two);
std::unique_ptr<Bookmark> bookmark_two = createBookmark(*test_plan, APPLICATION_CHANNEL, uuid_two, state_manager_two.get());
std::wstring bookmark_two_xml = bookmarkAsXml(bookmark_two);
@@ -208,7 +208,7 @@ TEST_CASE("Bookmark::getNewBookmarkXml() updates the bookmark", "[add_event]") {
LogTestController::getInstance().setTrace<TestPlan>();
const utils::Identifier uuid = IdGenerator::getIdGenerator()->generate();
- auto state_manager = test_plan->getStateManagerProvider()->getCoreComponentStateManager(uuid);
+ auto state_manager = test_plan->getStateStorage()->getStateManager(uuid);
std::unique_ptr<Bookmark> bookmark = createBookmark(*test_plan, APPLICATION_CHANNEL, uuid, state_manager.get());
std::wstring bookmark_xml_before = bookmarkAsXml(bookmark);
@@ -230,13 +230,13 @@ TEST_CASE("Bookmark::saveBookmarkXml() updates the bookmark and saves it to the
GIVEN("We have two different bookmarks with two different state managers") {
utils::Identifier uuid_one = IdGenerator::getIdGenerator()->generate();
- auto state_manager_one = test_plan->getStateManagerProvider()->getCoreComponentStateManager(uuid_one);
+ auto state_manager_one = test_plan->getStateStorage()->getStateManager(uuid_one);
std::unique_ptr<Bookmark> bookmark_one = createBookmark(*test_plan, APPLICATION_CHANNEL, uuid_one, state_manager_one.get());
reportEvent(APPLICATION_CHANNEL, "Something interesting happened");
utils::Identifier uuid_two = IdGenerator::getIdGenerator()->generate();
- auto state_manager_two = test_plan->getStateManagerProvider()->getCoreComponentStateManager(uuid_two);
+ auto state_manager_two = test_plan->getStateStorage()->getStateManager(uuid_two);
std::unique_ptr<Bookmark> bookmark_two = createBookmark(*test_plan, APPLICATION_CHANNEL, uuid_two, state_manager_two.get());
REQUIRE(bookmarkAsXml(bookmark_one) != bookmarkAsXml(bookmark_two));
diff --git a/libminifi/include/controllers/keyvalue/AbstractCoreComponentStateManagerProvider.h b/libminifi/include/controllers/keyvalue/AbstractCoreComponentStateManagerProvider.h
deleted file mode 100644
index 5a725a2f2..000000000
--- a/libminifi/include/controllers/keyvalue/AbstractCoreComponentStateManagerProvider.h
+++ /dev/null
@@ -1,87 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#ifndef LIBMINIFI_INCLUDE_CONTROLLERS_KEYVALUE_ABSTRACTCORECOMPONENTSTATEMANAGERPROVIDER_H_
-#define LIBMINIFI_INCLUDE_CONTROLLERS_KEYVALUE_ABSTRACTCORECOMPONENTSTATEMANAGERPROVIDER_H_
-
-#include <unordered_map>
-#include <string>
-#include <memory>
-#include <map>
-
-#include "core/Core.h"
-#include "core/CoreComponentState.h"
-
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace controllers {
-
-class AbstractCoreComponentStateManagerProvider : public core::CoreComponentStateManagerProvider {
- public:
- std::unique_ptr<core::CoreComponentStateManager> getCoreComponentStateManager(const utils::Identifier& uuid) override;
-
- std::map<utils::Identifier, std::unordered_map<std::string, std::string>> getAllCoreComponentStates() override;
-
- class AbstractCoreComponentStateManager : public core::CoreComponentStateManager {
- public:
- AbstractCoreComponentStateManager(AbstractCoreComponentStateManagerProvider* provider, const utils::Identifier& id);
-
- bool set(const core::CoreComponentState& kvs) override;
- bool get(core::CoreComponentState& kvs) override;
- bool clear() override;
- bool persist() override;
-
- bool isTransactionInProgress() const override;
- bool beginTransaction() override;
- bool commit() override;
- bool rollback() override;
-
- private:
- enum class ChangeType {
- NONE,
- SET,
- CLEAR
- };
-
- AbstractCoreComponentStateManagerProvider* provider_;
- utils::Identifier id_;
- bool state_valid_;
- core::CoreComponentState state_;
- bool transaction_in_progress_;
- ChangeType change_type_;
- core::CoreComponentState state_to_set_;
- };
-
- protected:
- virtual bool setImpl(const utils::Identifier& key, const std::string& serialized_state) = 0;
- virtual bool getImpl(const utils::Identifier& key, std::string& serialized_state) = 0;
- virtual bool getImpl(std::map<utils::Identifier, std::string>& kvs) = 0;
- virtual bool removeImpl(const utils::Identifier& key) = 0;
- virtual bool persistImpl() = 0;
-
- private:
- std::mutex mutex_;
-};
-
-} // namespace controllers
-} // namespace minifi
-} // namespace nifi
-} // namespace apache
-} // namespace org
-
-#endif // LIBMINIFI_INCLUDE_CONTROLLERS_KEYVALUE_ABSTRACTCORECOMPONENTSTATEMANAGERPROVIDER_H_
diff --git a/libminifi/include/controllers/keyvalue/AbstractAutoPersistingKeyValueStoreService.h b/libminifi/include/controllers/keyvalue/AutoPersistor.h
similarity index 61%
rename from libminifi/include/controllers/keyvalue/AbstractAutoPersistingKeyValueStoreService.h
rename to libminifi/include/controllers/keyvalue/AutoPersistor.h
index 35a87bb03..3cefe8c51 100644
--- a/libminifi/include/controllers/keyvalue/AbstractAutoPersistingKeyValueStoreService.h
+++ b/libminifi/include/controllers/keyvalue/AutoPersistor.h
@@ -16,47 +16,46 @@
*/
#pragma once
+#include <condition_variable>
+#include <memory>
+#include <mutex>
#include <string>
#include <thread>
-#include <mutex>
-#include <memory>
#include <utility>
-#include "PersistableKeyValueStoreService.h"
+#include "core/ConfigurableComponent.h"
#include "core/Core.h"
-#include "properties/Configure.h"
-#include "core/logging/Logger.h"
#include "core/logging/LoggerFactory.h"
#include "utils/Export.h"
namespace org::apache::nifi::minifi::controllers {
-class AbstractAutoPersistingKeyValueStoreService : virtual public PersistableKeyValueStoreService {
+/**
+ * Persists in given intervals.
+ * Has an own thread, so stop() must be called before destruction of data used by persist_.
+ */
+class AutoPersistor {
public:
- explicit AbstractAutoPersistingKeyValueStoreService(std::string name, const utils::Identifier& uuid = {});
-
- ~AbstractAutoPersistingKeyValueStoreService() override;
+ ~AutoPersistor();
- static constexpr const char* AlwaysPersistPropertyName = "Always Persist";
- static constexpr const char* AutoPersistenceIntervalPropertyName = "Auto Persistence Interval";
+ void start(bool always_persist, std::chrono::milliseconds auto_persistence_interval, std::function<bool()> persist);
+ void stop();
- void onEnable() override;
- void notifyStop() override;
+ [[nodiscard]] bool isAlwaysPersisting() const {
+ return always_persist_;
+ }
- protected:
- bool always_persist_;
- std::chrono::milliseconds auto_persistence_interval_;
+ private:
+ void persistingThreadFunc();
+ bool always_persist_ = false;
+ std::chrono::milliseconds auto_persistence_interval_{0};
std::thread persisting_thread_;
- bool running_;
+ bool running_ = false;
std::mutex persisting_mutex_;
std::condition_variable persisting_cv_;
- void persistingThreadFunc();
-
- private:
- std::shared_ptr<core::logging::Logger> logger_ = core::logging::LoggerFactory<AbstractAutoPersistingKeyValueStoreService>::getLogger();
-
- void stopPersistingThread();
+ std::function<bool()> persist_;
+ std::shared_ptr<core::logging::Logger> logger_ = core::logging::LoggerFactory<AutoPersistor>::getLogger();
};
} // namespace org::apache::nifi::minifi::controllers
diff --git a/libminifi/include/controllers/keyvalue/KeyValueStateManager.h b/libminifi/include/controllers/keyvalue/KeyValueStateManager.h
new file mode 100644
index 000000000..19df88993
--- /dev/null
+++ b/libminifi/include/controllers/keyvalue/KeyValueStateManager.h
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <memory>
+#include <string>
+#include <unordered_map>
+
+#include "core/StateManager.h"
+#include "core/Core.h"
+#include "utils/gsl.h"
+
+namespace org::apache::nifi::minifi::controllers {
+
+class KeyValueStateStorage;
+
+class KeyValueStateManager final : public core::StateManager {
+ public:
+ KeyValueStateManager(const utils::Identifier& id, gsl::not_null<KeyValueStateStorage*> storage);
+
+ bool set(const core::StateManager::State& kvs) override;
+ bool get(core::StateManager::State& kvs) override;
+ bool clear() override;
+ bool persist() override;
+
+ [[nodiscard]] bool isTransactionInProgress() const override;
+ bool beginTransaction() override;
+ bool commit() override;
+ bool rollback() override;
+
+ private:
+ enum class ChangeType {
+ NONE,
+ SET,
+ CLEAR
+ };
+
+ gsl::not_null<KeyValueStateStorage*> storage_;
+ std::optional<core::StateManager::State> state_;
+ bool transaction_in_progress_;
+ ChangeType change_type_;
+ core::StateManager::State state_to_set_;
+};
+
+} // namespace org::apache::nifi::minifi::controllers
diff --git a/libminifi/include/controllers/keyvalue/KeyValueStoreService.h b/libminifi/include/controllers/keyvalue/KeyValueStateStorage.h
similarity index 54%
rename from libminifi/include/controllers/keyvalue/KeyValueStoreService.h
rename to libminifi/include/controllers/keyvalue/KeyValueStateStorage.h
index 489b977a8..c0f2e5109 100644
--- a/libminifi/include/controllers/keyvalue/KeyValueStoreService.h
+++ b/libminifi/include/controllers/keyvalue/KeyValueStateStorage.h
@@ -14,44 +14,57 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-#ifndef LIBMINIFI_INCLUDE_CONTROLLERS_KEYVALUE_KEYVALUESTORESERVICE_H_
-#define LIBMINIFI_INCLUDE_CONTROLLERS_KEYVALUE_KEYVALUESTORESERVICE_H_
-#include <unordered_map>
-#include <unordered_set>
+#pragma once
+
+#include <memory>
#include <string>
-#include <cstdint>
-#include <functional>
+#include <unordered_map>
-#include "core/Core.h"
-#include "properties/Configure.h"
#include "core/controller/ControllerService.h"
+#include "core/Core.h"
+#include "core/logging/LoggerFactory.h"
+#include "core/StateManager.h"
+#include "core/StateStorage.h"
namespace org::apache::nifi::minifi::controllers {
-class KeyValueStoreService : public core::controller::ControllerService {
+constexpr const char* ALWAYS_PERSIST_PROPERTY_NAME = "Always Persist";
+constexpr const char* AUTO_PERSISTENCE_INTERVAL_PROPERTY_NAME = "Auto Persistence Interval";
+
+class KeyValueStateStorage : public core::StateStorage, public core::controller::ControllerService {
public:
- explicit KeyValueStoreService(std::string name, const utils::Identifier& uuid = {});
+ explicit KeyValueStateStorage(const std::string& name, const utils::Identifier& uuid = {});
- ~KeyValueStoreService() override;
+ static core::StateManager::State deserialize(const std::string& serialized);
+ static std::string serialize(const core::StateManager::State& kvs);
- void yield() override;
- bool isRunning() override;
- bool isWorkAvailable() override;
+ std::unique_ptr<core::StateManager> getStateManager(const utils::Identifier& uuid) override;
+ std::unordered_map<utils::Identifier, core::StateManager::State> getAllStates() override;
- virtual bool set(const std::string& key, const std::string& value) = 0;
+ void yield() override {
+ }
- virtual bool get(const std::string& key, std::string& value) = 0;
+ bool isRunning() override {
+ return getState() == core::controller::ControllerServiceState::ENABLED;
+ }
- virtual bool get(std::unordered_map<std::string, std::string>& kvs) = 0;
+ bool isWorkAvailable() override {
+ return false;
+ }
+ virtual bool set(const std::string& key, const std::string& value) = 0;
+ virtual bool get(const std::string& key, std::string& value) = 0;
+ virtual bool get(std::unordered_map<std::string, std::string>& kvs) = 0;
virtual bool remove(const std::string& key) = 0;
-
virtual bool clear() = 0;
-
virtual bool update(const std::string& key, const std::function<bool(bool /*exists*/, std::string& /*value*/)>& update_func) = 0;
+ virtual bool persist() = 0;
+
+ private:
+ bool getAll(std::unordered_map<utils::Identifier, std::string>& kvs);
+
+ std::shared_ptr<core::logging::Logger> logger_ = core::logging::LoggerFactory<KeyValueStateStorage>::getLogger();
};
} // namespace org::apache::nifi::minifi::controllers
-
-#endif // LIBMINIFI_INCLUDE_CONTROLLERS_KEYVALUE_KEYVALUESTORESERVICE_H_
diff --git a/libminifi/include/controllers/keyvalue/PersistableKeyValueStoreService.h b/libminifi/include/controllers/keyvalue/PersistableKeyValueStoreService.h
deleted file mode 100644
index 80b49f60f..000000000
--- a/libminifi/include/controllers/keyvalue/PersistableKeyValueStoreService.h
+++ /dev/null
@@ -1,49 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#ifndef LIBMINIFI_INCLUDE_CONTROLLERS_KEYVALUE_PERSISTABLEKEYVALUESTORESERVICE_H_
-#define LIBMINIFI_INCLUDE_CONTROLLERS_KEYVALUE_PERSISTABLEKEYVALUESTORESERVICE_H_
-
-#include <string>
-#include <unordered_map>
-#include <map>
-
-#include "KeyValueStoreService.h"
-#include "AbstractCoreComponentStateManagerProvider.h"
-#include "core/Core.h"
-#include "properties/Configure.h"
-
-namespace org::apache::nifi::minifi::controllers {
-
-class PersistableKeyValueStoreService : public KeyValueStoreService, public AbstractCoreComponentStateManagerProvider {
- public:
- explicit PersistableKeyValueStoreService(std::string name, const utils::Identifier& uuid = {});
-
- ~PersistableKeyValueStoreService() override;
-
- virtual bool persist() = 0;
-
- protected:
- bool setImpl(const utils::Identifier& key, const std::string& serialized_state) override;
- bool getImpl(const utils::Identifier& key, std::string& serialized_state) override;
- bool getImpl(std::map<utils::Identifier, std::string>& kvs) override;
- bool removeImpl(const utils::Identifier& key) override;
- bool persistImpl() override;
-};
-
-} // namespace org::apache::nifi::minifi::controllers
-
-#endif // LIBMINIFI_INCLUDE_CONTROLLERS_KEYVALUE_PERSISTABLEKEYVALUESTORESERVICE_H_
diff --git a/libminifi/include/core/CoreComponentState.h b/libminifi/include/core/CoreComponentState.h
deleted file mode 100644
index a0662a6d9..000000000
--- a/libminifi/include/core/CoreComponentState.h
+++ /dev/null
@@ -1,82 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#ifndef LIBMINIFI_INCLUDE_CORE_CORECOMPONENTSTATE_H_
-#define LIBMINIFI_INCLUDE_CORE_CORECOMPONENTSTATE_H_
-
-#include "Core.h"
-
-#include <cstdint>
-#include <map>
-#include <memory>
-#include <optional>
-#include <string>
-#include <unordered_map>
-
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace core {
-
-using CoreComponentState = std::unordered_map<std::string, std::string>;
-
-class CoreComponentStateManager {
- public:
- virtual ~CoreComponentStateManager() = default;
-
- virtual bool set(const CoreComponentState& kvs) = 0;
- virtual bool get(CoreComponentState& kvs) = 0;
-
- std::optional<std::unordered_map<std::string, std::string>> get() {
- std::unordered_map<std::string, std::string> out;
- if (get(out)) {
- return out;
- } else {
- return std::nullopt;
- }
- }
-
- virtual bool clear() = 0;
- virtual bool persist() = 0;
-
- virtual bool isTransactionInProgress() const = 0;
- virtual bool beginTransaction() = 0;
- virtual bool commit() = 0;
- virtual bool rollback() = 0;
-};
-
-class CoreComponentStateManagerProvider {
- public:
- virtual ~CoreComponentStateManagerProvider() = default;
-
- virtual std::unique_ptr<CoreComponentStateManager> getCoreComponentStateManager(const utils::Identifier& uuid) = 0;
-
- virtual std::unique_ptr<CoreComponentStateManager> getCoreComponentStateManager(const CoreComponent& component) {
- return getCoreComponentStateManager(component.getUUID());
- }
-
- virtual std::map<utils::Identifier, CoreComponentState> getAllCoreComponentStates() = 0;
-};
-
-} // namespace core
-} // namespace minifi
-} // namespace nifi
-} // namespace apache
-} // namespace org
-
-#endif // LIBMINIFI_INCLUDE_CORE_CORECOMPONENTSTATE_H_
diff --git a/libminifi/include/core/ProcessContext.h b/libminifi/include/core/ProcessContext.h
index 2f4c23e5c..f008e6669 100644
--- a/libminifi/include/core/ProcessContext.h
+++ b/libminifi/include/core/ProcessContext.h
@@ -34,21 +34,19 @@
#include "core/controller/ControllerServiceProvider.h"
#include "core/controller/ControllerServiceLookup.h"
#include "core/logging/LoggerFactory.h"
-#include "controllers/keyvalue/AbstractAutoPersistingKeyValueStoreService.h"
+#include "controllers/keyvalue/KeyValueStateStorage.h"
#include "ProcessorNode.h"
#include "core/Repository.h"
#include "core/FlowFile.h"
-#include "core/CoreComponentState.h"
+#include "core/StateStorage.h"
#include "utils/file/FileUtils.h"
#include "utils/PropertyErrors.h"
#include "VariableRegistry.h"
namespace org::apache::nifi::minifi::core {
-// ProcessContext Class
class ProcessContext : public controller::ControllerServiceLookup, public core::VariableRegistry, public std::enable_shared_from_this<VariableRegistry> {
public:
- // Constructor
/*!
* Create a new process context associated with the processor/controller service/state manager
*/
@@ -63,10 +61,9 @@ class ProcessContext : public controller::ControllerServiceLookup, public core::
configure_(std::make_shared<minifi::Configure>()),
initialized_(false) {
repo_ = repo;
- state_manager_provider_ = getStateManagerProvider(logger_, controller_service_provider_, nullptr);
+ state_storage_ = getStateStorage(logger_, controller_service_provider_, nullptr);
}
- // Constructor
/*!
* Create a new process context associated with the processor/controller service/state manager
*/
@@ -82,13 +79,12 @@ class ProcessContext : public controller::ControllerServiceLookup, public core::
configure_(configuration),
initialized_(false) {
repo_ = repo;
- state_manager_provider_ = getStateManagerProvider(logger_, controller_service_provider_, configuration);
+ state_storage_ = getStateStorage(logger_, controller_service_provider_, configuration);
if (!configure_) {
configure_ = std::make_shared<minifi::Configure>();
}
}
- // Destructor
- virtual ~ProcessContext() = default;
+
// Get Processor associated with the Process Context
std::shared_ptr<ProcessorNode> getProcessorNode() const {
return processor_node_;
@@ -155,17 +151,13 @@ class ProcessContext : public controller::ControllerServiceLookup, public core::
bool setProperty(const Property& property, std::string value) {
return setProperty(property.getName(), value);
}
- // Whether the relationship is supported
- bool isSupportedRelationship(Relationship relationship) const {
- return processor_node_->isSupportedRelationship(relationship);
- }
// Check whether the relationship is auto terminated
bool isAutoTerminated(Relationship relationship) const {
return processor_node_->isAutoTerminated(relationship);
}
// Get ProcessContext Maximum Concurrent Tasks
- uint8_t getMaxConcurrentTasks(void) const {
+ uint8_t getMaxConcurrentTasks() const {
return processor_node_->getMaxConcurrentTasks();
}
// Yield based on the yield period
@@ -246,14 +238,14 @@ class ProcessContext : public controller::ControllerServiceLookup, public core::
return initialized_;
}
- static constexpr char const* DefaultStateManagerProviderName = "defaultstatemanagerprovider";
+ static constexpr char const* DefaultStateStorageName = "defaultstatestorage";
- CoreComponentStateManager* getStateManager() {
- if (state_manager_provider_ == nullptr) {
+ StateManager* getStateManager() {
+ if (state_storage_ == nullptr) {
return nullptr;
}
if (!state_manager_) {
- state_manager_ = state_manager_provider_->getCoreComponentStateManager(*processor_node_);
+ state_manager_ = state_storage_->getStateManager(*processor_node_);
}
return state_manager_.get();
}
@@ -262,64 +254,64 @@ class ProcessContext : public controller::ControllerServiceLookup, public core::
return state_manager_ != nullptr;
}
- static std::shared_ptr<core::CoreComponentStateManagerProvider> getOrCreateDefaultStateManagerProvider(
+ static std::shared_ptr<core::StateStorage> getOrCreateDefaultStateStorage(
controller::ControllerServiceProvider* controller_service_provider,
const std::shared_ptr<minifi::Configure>& configuration) {
static std::mutex mutex;
std::lock_guard<std::mutex> lock(mutex);
/* See if we have already created a default provider */
- std::shared_ptr<core::controller::ControllerServiceNode> node = controller_service_provider->getControllerServiceNode(DefaultStateManagerProviderName);
+ std::shared_ptr<core::controller::ControllerServiceNode> node = controller_service_provider->getControllerServiceNode(DefaultStateStorageName);
if (node != nullptr) {
- return std::dynamic_pointer_cast<core::CoreComponentStateManagerProvider>(node->getControllerServiceImplementation());
+ return std::dynamic_pointer_cast<core::StateStorage>(node->getControllerServiceImplementation());
}
/* Try to get configuration options for default provider */
- std::string always_persist, auto_persistence_interval;
- configuration->get(Configure::nifi_state_management_provider_local_always_persist, always_persist);
- configuration->get(Configure::nifi_state_management_provider_local_auto_persistence_interval, auto_persistence_interval);
- const auto path = configuration->get(Configure::nifi_state_management_provider_local_path);
+ std::string always_persist;
+ configuration->get(Configure::nifi_state_storage_local_always_persist, Configure::nifi_state_storage_local_always_persist_old, always_persist);
+ std::string auto_persistence_interval;
+ configuration->get(Configure::nifi_state_storage_local_auto_persistence_interval, Configure::nifi_state_storage_local_auto_persistence_interval_old, auto_persistence_interval);
+
+ const auto path = configuration->getWithFallback(Configure::nifi_state_storage_local_path, Configure::nifi_state_storage_local_path_old);
- /* Function to help creating a provider */
+ /* Function to help creating a state storage */
auto create_provider = [&](
const std::string& type,
const std::string& longType,
- const std::unordered_map<std::string, std::string>& extraProperties) -> std::shared_ptr<core::CoreComponentStateManagerProvider> {
- node = controller_service_provider->createControllerService(type, longType, DefaultStateManagerProviderName, true /*firstTimeAdded*/);
+ const std::unordered_map<std::string, std::string>& extraProperties) -> std::shared_ptr<core::StateStorage> {
+ node = controller_service_provider->createControllerService(type, longType, DefaultStateStorageName, true /*firstTimeAdded*/);
if (node == nullptr) {
return nullptr;
}
node->initialize();
- auto provider = node->getControllerServiceImplementation();
- if (provider == nullptr) {
+ auto storage = node->getControllerServiceImplementation();
+ if (storage == nullptr) {
return nullptr;
}
- if (!always_persist.empty() && !provider->setProperty(
- controllers::AbstractAutoPersistingKeyValueStoreService::AlwaysPersistPropertyName, always_persist)) {
+ if (!always_persist.empty() && !storage->setProperty(controllers::ALWAYS_PERSIST_PROPERTY_NAME, always_persist)) {
return nullptr;
}
- if (!auto_persistence_interval.empty() && !provider->setProperty(
- controllers::AbstractAutoPersistingKeyValueStoreService::AutoPersistenceIntervalPropertyName, auto_persistence_interval)) {
+ if (!auto_persistence_interval.empty() && !storage->setProperty(controllers::AUTO_PERSISTENCE_INTERVAL_PROPERTY_NAME, auto_persistence_interval)) {
return nullptr;
}
for (const auto& extraProperty : extraProperties) {
- if (!provider->setProperty(extraProperty.first, extraProperty.second)) {
+ if (!storage->setProperty(extraProperty.first, extraProperty.second)) {
return nullptr;
}
}
if (!node->enable()) {
return nullptr;
}
- return std::dynamic_pointer_cast<core::CoreComponentStateManagerProvider>(provider);
+ return std::dynamic_pointer_cast<core::StateStorage>(storage);
};
std::string preferredType;
- configuration->get(minifi::Configure::nifi_state_management_provider_local_class_name, preferredType);
+ configuration->get(minifi::Configure::nifi_state_storage_local_class_name, minifi::Configure::nifi_state_storage_local_class_name_old, preferredType);
/* Try to create a RocksDB-backed provider */
- if (preferredType.empty() || preferredType == "RocksDbPersistableKeyValueStoreService") {
- auto provider = create_provider("RocksDbPersistableKeyValueStoreService",
- "org.apache.nifi.minifi.controllers.RocksDbPersistableKeyValueStoreService",
+ if (preferredType.empty() || preferredType == "RocksDbPersistableKeyValueStoreService" || preferredType == "RocksDbStateStorage") {
+ auto provider = create_provider("RocksDbStateStorage",
+ "org.apache.nifi.minifi.controllers.RocksDbStateStorage",
{{"Directory", path.value_or("corecomponentstate")}});
if (provider != nullptr) {
return provider;
@@ -327,9 +319,9 @@ class ProcessContext : public controller::ControllerServiceLookup, public core::
}
/* Fall back to a locked unordered map-backed provider */
- if (preferredType.empty() || preferredType == "UnorderedMapPersistableKeyValueStoreService") {
- auto provider = create_provider("UnorderedMapPersistableKeyValueStoreService",
- "org.apache.nifi.minifi.controllers.UnorderedMapPersistableKeyValueStoreService",
+ if (preferredType.empty() || preferredType == "UnorderedMapPersistableKeyValueStoreService" || preferredType == "PersistentMapStateStorage") {
+ auto provider = create_provider("PersistentMapStateStorage",
+ "org.apache.nifi.minifi.controllers.PersistentMapStateStorage",
{{"File", path.value_or("corecomponentstate.txt")}});
if (provider != nullptr) {
return provider;
@@ -337,9 +329,9 @@ class ProcessContext : public controller::ControllerServiceLookup, public core::
}
/* Fall back to volatile memory-backed provider */
- if (preferredType.empty() || preferredType == "UnorderedMapKeyValueStoreService") {
- auto provider = create_provider("UnorderedMapKeyValueStoreService",
- "org.apache.nifi.minifi.controllers.UnorderedMapKeyValueStoreService",
+ if (preferredType.empty() || preferredType == "UnorderedMapKeyValueStoreService" || preferredType == "VolatileMapStateStorage") {
+ auto provider = create_provider("VolatileMapStateStorage",
+ "org.apache.nifi.minifi.controllers.VolatileMapStateStorage",
{});
if (provider != nullptr) {
return provider;
@@ -350,27 +342,27 @@ class ProcessContext : public controller::ControllerServiceLookup, public core::
return nullptr;
}
- static std::shared_ptr<core::CoreComponentStateManagerProvider> getStateManagerProvider(
+ static std::shared_ptr<core::StateStorage> getStateStorage(
const std::shared_ptr<logging::Logger>& logger,
controller::ControllerServiceProvider* const controller_service_provider,
const std::shared_ptr<minifi::Configure>& configuration) {
if (controller_service_provider == nullptr) {
return nullptr;
}
- std::string requestedStateManagerName;
- if (configuration != nullptr && configuration->get(minifi::Configure::nifi_state_management_provider_local, requestedStateManagerName)) {
- auto node = controller_service_provider->getControllerServiceNode(requestedStateManagerName);
+ std::string requestedStateStorageName;
+ if (configuration != nullptr && configuration->get(minifi::Configure::nifi_state_storage_local, minifi::Configure::nifi_state_storage_local_old, requestedStateStorageName)) {
+ auto node = controller_service_provider->getControllerServiceNode(requestedStateStorageName);
if (node == nullptr) {
- logger->log_error("Failed to find the CoreComponentStateManagerProvider %s defined by %s", requestedStateManagerName, minifi::Configure::nifi_state_management_provider_local);
+ logger->log_error("Failed to find the StateStorage %s defined by %s", requestedStateStorageName, minifi::Configure::nifi_state_storage_local);
return nullptr;
}
- return std::dynamic_pointer_cast<core::CoreComponentStateManagerProvider>(node->getControllerServiceImplementation());
+ return std::dynamic_pointer_cast<core::StateStorage>(node->getControllerServiceImplementation());
} else {
- auto state_manager_provider = getOrCreateDefaultStateManagerProvider(controller_service_provider, configuration);
- if (state_manager_provider == nullptr) {
- logger->log_error("Failed to create default CoreComponentStateManagerProvider");
+ auto state_storage = getOrCreateDefaultStateStorage(controller_service_provider, configuration);
+ if (state_storage == nullptr) {
+ logger->log_error("Failed to create default StateStorage");
}
- return state_manager_provider;
+ return state_storage;
}
}
@@ -381,8 +373,8 @@ class ProcessContext : public controller::ControllerServiceLookup, public core::
}
controller::ControllerServiceProvider* controller_service_provider_;
- std::shared_ptr<core::CoreComponentStateManagerProvider> state_manager_provider_;
- std::unique_ptr<CoreComponentStateManager> state_manager_;
+ std::shared_ptr<core::StateStorage> state_storage_;
+ std::unique_ptr<StateManager> state_manager_;
std::shared_ptr<core::Repository> repo_;
std::shared_ptr<core::Repository> flow_repo_;
std::shared_ptr<core::ContentRepository> content_repo_;
diff --git a/libminifi/include/core/ProcessSession.h b/libminifi/include/core/ProcessSession.h
index 208f7f100..49d8efee7 100644
--- a/libminifi/include/core/ProcessSession.h
+++ b/libminifi/include/core/ProcessSession.h
@@ -215,7 +215,7 @@ class ProcessSession : public ReferenceContainer {
std::shared_ptr<ContentSession> content_session_;
- CoreComponentStateManager* stateManager_;
+ StateManager* stateManager_;
static std::shared_ptr<utils::IdGenerator> id_generator_;
diff --git a/libminifi/include/core/StateManager.h b/libminifi/include/core/StateManager.h
new file mode 100644
index 000000000..89ef8b2cd
--- /dev/null
+++ b/libminifi/include/core/StateManager.h
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include "Core.h"
+
+#include <memory>
+#include <optional>
+#include <string>
+#include <unordered_map>
+
+namespace org::apache::nifi::minifi::core {
+
+/**
+ * Stores state for one component.
+ * Supported operations: get(), set(), clear(), persist().
+ * Behavior can be transactional. Use beginTransaction() to enter a transaction and commit() or rollback() to conclude it.
+ */
+class StateManager {
+ public:
+ using State = std::unordered_map<std::string, std::string>;
+
+ explicit StateManager(const utils::Identifier& id)
+ : id_(id) {
+ }
+
+ virtual ~StateManager() = default;
+
+ virtual bool set(const State& kvs) = 0;
+ virtual bool get(State& kvs) = 0;
+
+ std::optional<State> get() {
+ if (State out; get(out)) {
+ return out;
+ }
+ return std::nullopt;
+ }
+
+ virtual bool clear() = 0;
+ virtual bool persist() = 0;
+
+ [[nodiscard]] virtual bool isTransactionInProgress() const = 0;
+
+ virtual bool beginTransaction() = 0;
+ virtual bool commit() = 0;
+ virtual bool rollback() = 0;
+
+ protected:
+ utils::Identifier id_;
+};
+
+} // namespace org::apache::nifi::minifi::core
diff --git a/libminifi/src/controllers/keyvalue/KeyValueStoreService.cpp b/libminifi/include/core/StateStorage.h
similarity index 52%
rename from libminifi/src/controllers/keyvalue/KeyValueStoreService.cpp
rename to libminifi/include/core/StateStorage.h
index 717ddbeb1..6870b47b9 100644
--- a/libminifi/src/controllers/keyvalue/KeyValueStoreService.cpp
+++ b/libminifi/include/core/StateStorage.h
@@ -15,25 +15,31 @@
* limitations under the License.
*/
-#include "controllers/keyvalue/KeyValueStoreService.h"
+#pragma once
-namespace org::apache::nifi::minifi::controllers {
+#include "Core.h"
+#include "StateManager.h"
-KeyValueStoreService::KeyValueStoreService(std::string name, const utils::Identifier& uuid /*= utils::Identifier()*/)
- : ControllerService(std::move(name), uuid) {
-}
+#include <memory>
+#include <string>
+#include <unordered_map>
-KeyValueStoreService::~KeyValueStoreService() = default;
+namespace org::apache::nifi::minifi::core {
-void KeyValueStoreService::yield() {
-}
+/**
+ * Serves as a state storage background for the entire application, all StateManagers are created by it and use it.
+ */
+class StateStorage {
+ public:
+ virtual ~StateStorage() = default;
+
+ virtual std::unique_ptr<StateManager> getStateManager(const utils::Identifier& uuid) = 0;
-bool KeyValueStoreService::isRunning() {
- return getState() == core::controller::ControllerServiceState::ENABLED;
-}
+ std::unique_ptr<StateManager> getStateManager(const CoreComponent& component) {
+ return getStateManager(component.getUUID());
+ }
-bool KeyValueStoreService::isWorkAvailable() {
- return false;
-}
+ virtual std::unordered_map<utils::Identifier, StateManager::State> getAllStates() = 0;
+};
-} // namespace org::apache::nifi::minifi::controllers
+} // namespace org::apache::nifi::minifi::core
diff --git a/libminifi/include/properties/Configuration.h b/libminifi/include/properties/Configuration.h
index 24d0d28a8..eb2f598dc 100644
--- a/libminifi/include/properties/Configuration.h
+++ b/libminifi/include/properties/Configuration.h
@@ -121,11 +121,16 @@ class Configuration : public Properties {
static constexpr const char *nifi_c2_mqtt_update_topic = "nifi.c2.mqtt.update.topic";
// state management options
- static constexpr const char *nifi_state_management_provider_local = "nifi.state.management.provider.local";
- static constexpr const char *nifi_state_management_provider_local_class_name = "nifi.state.management.provider.local.class.name";
- static constexpr const char *nifi_state_management_provider_local_always_persist = "nifi.state.management.provider.local.always.persist";
- static constexpr const char *nifi_state_management_provider_local_auto_persistence_interval = "nifi.state.management.provider.local.auto.persistence.interval";
- static constexpr const char *nifi_state_management_provider_local_path = "nifi.state.management.provider.local.path";
+ static constexpr const char *nifi_state_storage_local = "nifi.state.storage.local";
+ static constexpr const char *nifi_state_storage_local_old = "nifi.state.management.provider.local";
+ static constexpr const char *nifi_state_storage_local_class_name = "nifi.state.storage.local.class.name";
+ static constexpr const char *nifi_state_storage_local_class_name_old = "nifi.state.management.provider.local.class.name";
+ static constexpr const char *nifi_state_storage_local_always_persist = "nifi.state.storage.local.always.persist";
+ static constexpr const char *nifi_state_storage_local_always_persist_old = "nifi.state.management.provider.local.always.persist";
+ static constexpr const char *nifi_state_storage_local_auto_persistence_interval = "nifi.state.storage.local.auto.persistence.interval";
+ static constexpr const char *nifi_state_storage_local_auto_persistence_interval_old = "nifi.state.management.provider.local.auto.persistence.interval";
+ static constexpr const char *nifi_state_storage_local_path = "nifi.state.storage.local.path";
+ static constexpr const char *nifi_state_storage_local_path_old = "nifi.state.management.provider.local.path";
// disk space watchdog options
static constexpr const char *minifi_disk_space_watchdog_enable = "minifi.disk.space.watchdog.enable";
diff --git a/libminifi/include/properties/Configure.h b/libminifi/include/properties/Configure.h
index 76d8b8fc2..46a6eff6b 100644
--- a/libminifi/include/properties/Configure.h
+++ b/libminifi/include/properties/Configure.h
@@ -28,20 +28,20 @@
struct ConfigTestAccessor;
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
+namespace org::apache::nifi::minifi {
class Configure : public Configuration, public core::AgentIdentificationProvider {
friend struct ::ConfigTestAccessor;
public:
explicit Configure(std::optional<Decryptor> decryptor = std::nullopt, std::shared_ptr<core::logging::LoggerProperties> logger_properties = {})
- : Configuration{}, decryptor_(std::move(decryptor)), logger_properties_(std::move(logger_properties)) {}
+ : decryptor_(std::move(decryptor))
+ , logger_properties_(std::move(logger_properties)) {
+ }
bool get(const std::string& key, std::string& value) const;
bool get(const std::string& key, const std::string& alternate_key, std::string& value) const;
std::optional<std::string> get(const std::string& key) const;
+ std::optional<std::string> getWithFallback(const std::string& key, const std::string& alternate_key) const;
std::optional<std::string> getRawValue(const std::string& key) const;
std::optional<std::string> getAgentClass() const override;
@@ -56,7 +56,7 @@ class Configure : public Configuration, public core::AgentIdentificationProvider
private:
// WARNING! a test utility
void setLoggerProperties(std::shared_ptr<core::logging::LoggerProperties> new_properties) {
- logger_properties_ = new_properties;
+ logger_properties_ = std::move(new_properties);
}
bool isEncrypted(const std::string& key) const;
@@ -68,7 +68,4 @@ class Configure : public Configuration, public core::AgentIdentificationProvider
std::shared_ptr<core::logging::LoggerProperties> logger_properties_;
};
-} // namespace minifi
-} // namespace nifi
-} // namespace apache
-} // namespace org
+} // namespace org::apache::nifi::minifi
diff --git a/libminifi/include/utils/ListingStateManager.h b/libminifi/include/utils/ListingStateManager.h
index d97e1492e..7fa9501a9 100644
--- a/libminifi/include/utils/ListingStateManager.h
+++ b/libminifi/include/utils/ListingStateManager.h
@@ -25,7 +25,7 @@
#include <chrono>
#include <utility>
-#include "core/CoreComponentState.h"
+#include "core/StateManager.h"
#include "core/logging/Logger.h"
#include "core/logging/LoggerFactory.h"
@@ -49,7 +49,7 @@ struct ListingState {
class ListingStateManager {
public:
- explicit ListingStateManager(core::CoreComponentStateManager* state_manager)
+ explicit ListingStateManager(core::StateManager* state_manager)
: state_manager_(state_manager) {
}
@@ -63,7 +63,7 @@ class ListingStateManager {
[[nodiscard]] static uint64_t getLatestListedKeyTimestampInMilliseconds(const std::unordered_map<std::string, std::string> &state);
[[nodiscard]] static std::unordered_set<std::string> getLatestListedKeys(const std::unordered_map<std::string, std::string> &state);
- core::CoreComponentStateManager* state_manager_;
+ core::StateManager* state_manager_;
std::shared_ptr<core::logging::Logger> logger_{core::logging::LoggerFactory<ListingStateManager>::getLogger()};
};
diff --git a/libminifi/src/Configuration.cpp b/libminifi/src/Configuration.cpp
index baba87119..69b24e9e8 100644
--- a/libminifi/src/Configuration.cpp
+++ b/libminifi/src/Configuration.cpp
@@ -98,11 +98,16 @@ const std::vector<core::ConfigurationProperty> Configuration::CONFIGURATION_PROP
core::ConfigurationProperty{Configuration::nifi_c2_mqtt_connector_service},
core::ConfigurationProperty{Configuration::nifi_c2_mqtt_heartbeat_topic},
core::ConfigurationProperty{Configuration::nifi_c2_mqtt_update_topic},
- core::ConfigurationProperty{Configuration::nifi_state_management_provider_local},
- core::ConfigurationProperty{Configuration::nifi_state_management_provider_local_class_name},
- core::ConfigurationProperty{Configuration::nifi_state_management_provider_local_always_persist, gsl::make_not_null(core::StandardValidators::get().BOOLEAN_VALIDATOR.get())},
- core::ConfigurationProperty{Configuration::nifi_state_management_provider_local_auto_persistence_interval, gsl::make_not_null(core::StandardValidators::get().TIME_PERIOD_VALIDATOR.get())},
- core::ConfigurationProperty{Configuration::nifi_state_management_provider_local_path},
+ core::ConfigurationProperty{Configuration::nifi_state_storage_local},
+ core::ConfigurationProperty{Configuration::nifi_state_storage_local_old},
+ core::ConfigurationProperty{Configuration::nifi_state_storage_local_class_name},
+ core::ConfigurationProperty{Configuration::nifi_state_storage_local_class_name_old},
+ core::ConfigurationProperty{Configuration::nifi_state_storage_local_always_persist, gsl::make_not_null(core::StandardValidators::get().BOOLEAN_VALIDATOR.get())},
+ core::ConfigurationProperty{Configuration::nifi_state_storage_local_always_persist_old, gsl::make_not_null(core::StandardValidators::get().BOOLEAN_VALIDATOR.get())},
+ core::ConfigurationProperty{Configuration::nifi_state_storage_local_auto_persistence_interval, gsl::make_not_null(core::StandardValidators::get().TIME_PERIOD_VALIDATOR.get())},
+ core::ConfigurationProperty{Configuration::nifi_state_storage_local_auto_persistence_interval_old, gsl::make_not_null(core::StandardValidators::get().TIME_PERIOD_VALIDATOR.get())},
+ core::ConfigurationProperty{Configuration::nifi_state_storage_local_path},
+ core::ConfigurationProperty{Configuration::nifi_state_storage_local_path_old},
core::ConfigurationProperty{Configuration::minifi_disk_space_watchdog_enable, gsl::make_not_null(core::StandardValidators::get().BOOLEAN_VALIDATOR.get())},
core::ConfigurationProperty{Configuration::minifi_disk_space_watchdog_interval, gsl::make_not_null(core::StandardValidators::get().TIME_PERIOD_VALIDATOR.get())},
core::ConfigurationProperty{Configuration::minifi_disk_space_watchdog_stop_threshold, gsl::make_not_null(core::StandardValidators::get().UNSIGNED_LONG_VALIDATOR.get())},
diff --git a/libminifi/src/Configure.cpp b/libminifi/src/Configure.cpp
index 7067c2684..60f27c809 100644
--- a/libminifi/src/Configure.cpp
+++ b/libminifi/src/Configure.cpp
@@ -24,10 +24,7 @@
#include "utils/StringUtils.h"
#include "properties/Configuration.h"
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
+namespace org::apache::nifi::minifi {
bool Configure::get(const std::string& key, std::string& value) const {
if (auto opt_value = getRawValue(key)) {
@@ -64,6 +61,14 @@ std::optional<std::string> Configure::get(const std::string& key) const {
return std::nullopt;
}
+std::optional<std::string> Configure::getWithFallback(const std::string& key, const std::string& alternate_key) const {
+ std::string value;
+ if (get(key, alternate_key, value)) {
+ return value;
+ }
+ return std::nullopt;
+}
+
std::optional<std::string> Configure::getRawValue(const std::string& key) const {
static constexpr std::string_view log_prefix = "nifi.log.";
if (utils::StringUtils::startsWith(key, log_prefix)) {
@@ -129,7 +134,4 @@ bool Configure::commitChanges() {
return success;
}
-} /* namespace minifi */
-} /* namespace nifi */
-} /* namespace apache */
-} /* namespace org */
+} // namespace org::apache::nifi::minifi
diff --git a/libminifi/src/c2/C2Agent.cpp b/libminifi/src/c2/C2Agent.cpp
index fa1716894..76f5be0ad 100644
--- a/libminifi/src/c2/C2Agent.cpp
+++ b/libminifi/src/c2/C2Agent.cpp
@@ -31,7 +31,7 @@
#include "c2/ControllerSocketProtocol.h"
#include "core/ProcessContext.h"
-#include "core/CoreComponentState.h"
+#include "core/StateManager.h"
#include "core/state/UpdateController.h"
#include "core/logging/Logger.h"
#include "core/logging/LoggerConfiguration.h"
@@ -450,11 +450,11 @@ void C2Agent::handle_clear(const C2ContentResponse &resp) {
}
case ClearOperand::CORECOMPONENTSTATE: {
for (const auto& corecomponent : resp.operation_arguments) {
- auto state_manager_provider = core::ProcessContext::getStateManagerProvider(logger_, controller_, configuration_);
- if (state_manager_provider != nullptr) {
- update_sink_->executeOnComponent(corecomponent.second.to_string(), [this, &state_manager_provider] (state::StateController& component) {
+ auto state_storage = core::ProcessContext::getStateStorage(logger_, controller_, configuration_);
+ if (state_storage != nullptr) {
+ update_sink_->executeOnComponent(corecomponent.second.to_string(), [this, &state_storage] (state::StateController& component) {
logger_->log_debug("Clearing state for component %s", component.getComponentName());
- auto state_manager = state_manager_provider->getCoreComponentStateManager(component.getComponentUUID());
+ auto state_manager = state_storage->getStateManager(component.getComponentUUID());
if (state_manager != nullptr) {
component.stop();
state_manager->clear();
@@ -465,7 +465,7 @@ void C2Agent::handle_clear(const C2ContentResponse &resp) {
}
});
} else {
- logger_->log_error("Failed to get StateManagerProvider");
+ logger_->log_error("Failed to get StateStorage");
}
}
break;
@@ -562,9 +562,9 @@ void C2Agent::handle_describe(const C2ContentResponse &resp) {
response.setLabel("corecomponentstate");
C2Payload states(Operation::ACKNOWLEDGE, resp.ident, true);
states.setLabel("corecomponentstate");
- auto state_manager_provider = core::ProcessContext::getStateManagerProvider(logger_, controller_, configuration_);
- if (state_manager_provider != nullptr) {
- auto core_component_states = state_manager_provider->getAllCoreComponentStates();
+ auto state_storage = core::ProcessContext::getStateStorage(logger_, controller_, configuration_);
+ if (state_storage != nullptr) {
+ auto core_component_states = state_storage->getAllStates();
for (const auto& core_component_state : core_component_states) {
C2Payload state(Operation::ACKNOWLEDGE, resp.ident, true);
state.setLabel(core_component_state.first.to_string());
diff --git a/libminifi/src/controllers/keyvalue/AbstractCoreComponentStateManagerProvider.cpp b/libminifi/src/controllers/keyvalue/AbstractCoreComponentStateManagerProvider.cpp
deleted file mode 100644
index b9e4df383..000000000
--- a/libminifi/src/controllers/keyvalue/AbstractCoreComponentStateManagerProvider.cpp
+++ /dev/null
@@ -1,222 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include "controllers/keyvalue/AbstractCoreComponentStateManagerProvider.h"
-#include "Exception.h"
-
-#include <memory>
-
-#include "rapidjson/rapidjson.h"
-#include "rapidjson/document.h"
-#include "rapidjson/stringbuffer.h"
-#include "rapidjson/writer.h"
-
-#undef GetObject // windows.h #defines GetObject = GetObjectA or GetObjectW, which conflicts with rapidjson
-
-namespace {
-using org::apache::nifi::minifi::core::CoreComponentState;
-
-std::string serialize(const CoreComponentState &kvs) {
- rapidjson::Document doc(rapidjson::kObjectType);
- rapidjson::Document::AllocatorType &alloc = doc.GetAllocator();
- for (const auto &kv : kvs) {
- doc.AddMember(rapidjson::StringRef(kv.first.c_str(), kv.first.size()),
- rapidjson::StringRef(kv.second.c_str(), kv.second.size()), alloc);
- }
-
- rapidjson::StringBuffer buffer;
- rapidjson::Writer<rapidjson::StringBuffer> writer(buffer);
- doc.Accept(writer);
-
- return buffer.GetString();
-}
-
-CoreComponentState deserialize(const std::string &serialized) {
- rapidjson::StringStream stream(serialized.c_str());
- rapidjson::Document doc;
- rapidjson::ParseResult res = doc.ParseStream(stream);
- if (!res || !doc.IsObject()) {
- using org::apache::nifi::minifi::Exception;
- using org::apache::nifi::minifi::FILE_OPERATION_EXCEPTION;
- throw Exception(FILE_OPERATION_EXCEPTION, "Could not deserialize saved state, error during JSON parsing.");
- }
-
- CoreComponentState retState;
- for (const auto &kv : doc.GetObject()) {
- retState[kv.name.GetString()] = kv.value.GetString();
- }
-
- return retState;
-}
-} // anonymous namespace
-
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace controllers {
-
-AbstractCoreComponentStateManagerProvider::AbstractCoreComponentStateManager::AbstractCoreComponentStateManager(
- AbstractCoreComponentStateManagerProvider* provider,
- const utils::Identifier& id)
- : provider_(provider)
- , id_(id)
- , state_valid_(false)
- , transaction_in_progress_(false)
- , change_type_(ChangeType::NONE) {
- std::string serialized;
- if (provider_->getImpl(id_, serialized)) {
- state_ = deserialize(serialized);
- state_valid_ = true;
- }
-}
-
-bool AbstractCoreComponentStateManagerProvider::AbstractCoreComponentStateManager::set(const core::CoreComponentState& kvs) {
- bool autoCommit = false;
- if (!transaction_in_progress_) {
- autoCommit = true;
- transaction_in_progress_ = true;
- }
-
- change_type_ = ChangeType::SET;
- state_to_set_ = kvs;
-
- if (autoCommit) {
- return commit();
- }
- return true;
-}
-
-bool AbstractCoreComponentStateManagerProvider::AbstractCoreComponentStateManager::get(core::CoreComponentState& kvs) {
- if (!state_valid_) {
- return false;
- }
- // not allowed, if there were modifications (dirty read)
- if (change_type_ != ChangeType::NONE) {
- return false;
- }
- kvs = state_;
- return true;
-}
-
-bool AbstractCoreComponentStateManagerProvider::AbstractCoreComponentStateManager::clear() {
- if (!state_valid_) {
- return false;
- }
-
- bool autoCommit = false;
- if (!transaction_in_progress_) {
- autoCommit = true;
- transaction_in_progress_ = true;
- }
-
- change_type_ = ChangeType::CLEAR;
- state_to_set_.clear();
-
- if (autoCommit) {
- return commit();
- }
- return true;
-}
-
-bool AbstractCoreComponentStateManagerProvider::AbstractCoreComponentStateManager::persist() {
- return provider_->persistImpl();
-}
-
-bool AbstractCoreComponentStateManagerProvider::AbstractCoreComponentStateManager::isTransactionInProgress() const {
- return transaction_in_progress_;
-}
-
-bool AbstractCoreComponentStateManagerProvider::AbstractCoreComponentStateManager::beginTransaction() {
- if (transaction_in_progress_) {
- return false;
- }
- transaction_in_progress_ = true;
- return true;
-}
-
-bool AbstractCoreComponentStateManagerProvider::AbstractCoreComponentStateManager::commit() {
- if (!transaction_in_progress_) {
- return false;
- }
-
- bool success = true;
-
- // actually make the pending changes
- if (change_type_ == ChangeType::SET) {
- if (provider_->setImpl(id_, serialize(state_to_set_))) {
- state_valid_ = true;
- state_ = state_to_set_;
- } else {
- success = false;
- }
- } else if (change_type_ == ChangeType::CLEAR) {
- if (!state_valid_) { // NOLINT(bugprone-branch-clone)
- success = false;
- } else if (provider_->removeImpl(id_)) {
- state_valid_ = false;
- state_.clear();
- } else {
- success = false;
- }
- }
-
- if (success && change_type_ != ChangeType::NONE) {
- success = persist();
- }
-
- change_type_ = ChangeType::NONE;
- state_to_set_.clear();
- transaction_in_progress_ = false;
- return success;
-}
-
-bool AbstractCoreComponentStateManagerProvider::AbstractCoreComponentStateManager::rollback() {
- if (!transaction_in_progress_) {
- return false;
- }
-
- change_type_ = ChangeType::NONE;
- state_to_set_.clear();
- transaction_in_progress_ = false;
- return true;
-}
-
-std::unique_ptr<core::CoreComponentStateManager> AbstractCoreComponentStateManagerProvider::getCoreComponentStateManager(const utils::Identifier& uuid) {
- std::lock_guard<std::mutex> guard(mutex_);
- return std::make_unique<AbstractCoreComponentStateManager>(this, uuid);
-}
-
-std::map<utils::Identifier, core::CoreComponentState> AbstractCoreComponentStateManagerProvider::getAllCoreComponentStates() {
- std::map<utils::Identifier, std::string> all_serialized;
- if (!getImpl(all_serialized)) {
- return {};
- }
-
- std::map<utils::Identifier, core::CoreComponentState> all_deserialized;
- for (const auto& serialized : all_serialized) {
- all_deserialized.emplace(serialized.first, deserialize(serialized.second));
- }
-
- return all_deserialized;
-}
-
-} // namespace controllers
-} // namespace minifi
-} // namespace nifi
-} // namespace apache
-} // namespace org
diff --git a/libminifi/src/controllers/keyvalue/AbstractAutoPersistingKeyValueStoreService.cpp b/libminifi/src/controllers/keyvalue/AutoPersistor.cpp
similarity index 52%
rename from libminifi/src/controllers/keyvalue/AbstractAutoPersistingKeyValueStoreService.cpp
rename to libminifi/src/controllers/keyvalue/AutoPersistor.cpp
index 4a8535815..d5dcded6a 100644
--- a/libminifi/src/controllers/keyvalue/AbstractAutoPersistingKeyValueStoreService.cpp
+++ b/libminifi/src/controllers/keyvalue/AutoPersistor.cpp
@@ -15,72 +15,49 @@
* limitations under the License.
*/
-#include "controllers/keyvalue/AbstractAutoPersistingKeyValueStoreService.h"
-
#include <cinttypes>
+#include "controllers/keyvalue/AutoPersistor.h"
+#include "core/PropertyBuilder.h"
+#include "core/TypedValues.h"
+
using namespace std::literals::chrono_literals;
namespace org::apache::nifi::minifi::controllers {
-AbstractAutoPersistingKeyValueStoreService::AbstractAutoPersistingKeyValueStoreService(std::string name, const utils::Identifier& uuid /*= utils::Identifier()*/)
- : PersistableKeyValueStoreService(std::move(name), uuid)
- , always_persist_(false)
- , auto_persistence_interval_(0U)
- , running_(false) {
-}
-
-AbstractAutoPersistingKeyValueStoreService::~AbstractAutoPersistingKeyValueStoreService() {
- stopPersistingThread();
+AutoPersistor::~AutoPersistor() {
+ stop();
}
-void AbstractAutoPersistingKeyValueStoreService::stopPersistingThread() {
+void AutoPersistor::stop() {
std::unique_lock<std::mutex> lock(persisting_mutex_);
if (persisting_thread_.joinable()) {
running_ = false;
- persisting_cv_.notify_one();
lock.unlock();
+ persisting_cv_.notify_one();
persisting_thread_.join();
}
}
-void AbstractAutoPersistingKeyValueStoreService::onEnable() {
+void AutoPersistor::start(bool always_persist, std::chrono::milliseconds auto_persistence_interval, std::function<bool()> persist) {
std::unique_lock<std::mutex> lock(persisting_mutex_);
- if (configuration_ == nullptr) {
- logger_->log_debug("Cannot enable AbstractAutoPersistingKeyValueStoreService");
- return;
- }
-
- std::string value;
- if (!getProperty(AlwaysPersistPropertyName, value)) {
- logger_->log_error("Always Persist attribute is missing or invalid");
- } else {
- always_persist_ = utils::StringUtils::toBool(value).value_or(false);
- }
- core::TimePeriodValue auto_persistence_interval;
- if (!getProperty(AutoPersistenceIntervalPropertyName, auto_persistence_interval)) {
- logger_->log_error("Auto Persistence Interval attribute is missing or invalid");
- } else {
- auto_persistence_interval_ = auto_persistence_interval.getMilliseconds();
- }
+ always_persist_ = always_persist;
+ auto_persistence_interval_ = auto_persistence_interval;
+ persist_ = std::move(persist);
if (!always_persist_ && auto_persistence_interval_ != 0s) {
if (!persisting_thread_.joinable()) {
logger_->log_trace("Starting auto persistence thread");
running_ = true;
- persisting_thread_ = std::thread(&AbstractAutoPersistingKeyValueStoreService::persistingThreadFunc, this);
+ persisting_thread_ = std::thread(&AutoPersistor::persistingThreadFunc, this);
}
}
- logger_->log_trace("Enabled AbstractAutoPersistingKeyValueStoreService");
+ logger_->log_trace("Enabled AutoPersistor");
}
-void AbstractAutoPersistingKeyValueStoreService::notifyStop() {
- stopPersistingThread();
-}
-
-void AbstractAutoPersistingKeyValueStoreService::persistingThreadFunc() {
+void AutoPersistor::persistingThreadFunc() {
std::unique_lock<std::mutex> lock(persisting_mutex_);
while (true) {
@@ -94,7 +71,11 @@ void AbstractAutoPersistingKeyValueStoreService::persistingThreadFunc() {
return;
}
- persist();
+ if (!persist_) {
+ logger_->log_error("Persist function is empty");
+ } else if (!persist_()) {
+ logger_->log_error("Persisting failed");
+ }
}
}
diff --git a/libminifi/src/controllers/keyvalue/KeyValueStateManager.cpp b/libminifi/src/controllers/keyvalue/KeyValueStateManager.cpp
new file mode 100644
index 000000000..bf2f1f3df
--- /dev/null
+++ b/libminifi/src/controllers/keyvalue/KeyValueStateManager.cpp
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <memory>
+
+#include "controllers/keyvalue/KeyValueStateManager.h"
+#include "controllers/keyvalue/KeyValueStateStorage.h"
+
+namespace org::apache::nifi::minifi::controllers {
+
+KeyValueStateManager::KeyValueStateManager(
+ const utils::Identifier& id,
+ gsl::not_null<KeyValueStateStorage*> storage)
+ : StateManager(id),
+ storage_(storage),
+ transaction_in_progress_(false),
+ change_type_(ChangeType::NONE) {
+ std::string serialized;
+ if (storage_->get(id_.to_string(), serialized)) {
+ state_ = KeyValueStateStorage::deserialize(serialized);
+ }
+}
+
+bool KeyValueStateManager::set(const core::StateManager::State& kvs) {
+ bool auto_commit = false;
+ if (!transaction_in_progress_) {
+ auto_commit = true;
+ transaction_in_progress_ = true;
+ }
+
+ change_type_ = ChangeType::SET;
+ state_to_set_ = kvs;
+
+ if (auto_commit) {
+ return commit();
+ }
+ return true;
+}
+
+bool KeyValueStateManager::get(core::StateManager::State& kvs) {
+ if (!state_) {
+ return false;
+ }
+ // not allowed, if there were modifications (dirty read)
+ if (change_type_ != ChangeType::NONE) {
+ return false;
+ }
+ kvs = *state_;
+ return true;
+}
+
+bool KeyValueStateManager::clear() {
+ if (!state_) {
+ return false;
+ }
+
+ bool auto_commit = false;
+ if (!transaction_in_progress_) {
+ auto_commit = true;
+ transaction_in_progress_ = true;
+ }
+
+ change_type_ = ChangeType::CLEAR;
+ state_to_set_.clear();
+
+ if (auto_commit) {
+ return commit();
+ }
+ return true;
+}
+
+bool KeyValueStateManager::persist() {
+ return storage_->persist();
+}
+
+bool KeyValueStateManager::isTransactionInProgress() const {
+ return transaction_in_progress_;
+}
+
+bool KeyValueStateManager::beginTransaction() {
+ if (transaction_in_progress_) {
+ return false;
+ }
+ transaction_in_progress_ = true;
+ return true;
+}
+
+bool KeyValueStateManager::commit() {
+ if (!transaction_in_progress_) {
+ return false;
+ }
+
+ bool success = true;
+
+ // actually make the pending changes
+ if (change_type_ == ChangeType::SET) {
+ if (storage_->set(id_.to_string(), KeyValueStateStorage::serialize(state_to_set_))) {
+ state_ = state_to_set_;
+ } else {
+ success = false;
+ }
+ } else if (change_type_ == ChangeType::CLEAR) {
+ if (state_ && storage_->remove(id_.to_string())) {
+ state_.reset();
+ } else {
+ success = false;
+ }
+ }
+
+ if (success && change_type_ != ChangeType::NONE) {
+ success = persist();
+ }
+
+ change_type_ = ChangeType::NONE;
+ state_to_set_.clear();
+ transaction_in_progress_ = false;
+ return success;
+}
+
+bool KeyValueStateManager::rollback() {
+ if (!transaction_in_progress_) {
+ return false;
+ }
+
+ change_type_ = ChangeType::NONE;
+ state_to_set_.clear();
+ transaction_in_progress_ = false;
+ return true;
+}
+
+} // namespace org::apache::nifi::minifi::controllers
diff --git a/libminifi/src/controllers/keyvalue/KeyValueStateStorage.cpp b/libminifi/src/controllers/keyvalue/KeyValueStateStorage.cpp
new file mode 100644
index 000000000..e45fa2133
--- /dev/null
+++ b/libminifi/src/controllers/keyvalue/KeyValueStateStorage.cpp
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <memory>
+
+#include "Exception.h"
+#include "controllers/keyvalue/KeyValueStateManager.h"
+#include "controllers/keyvalue/KeyValueStateStorage.h"
+#include "rapidjson/rapidjson.h"
+#include "rapidjson/document.h"
+#include "rapidjson/stringbuffer.h"
+#include "rapidjson/writer.h"
+#include "utils/gsl.h"
+
+#undef GetObject // windows.h #defines GetObject = GetObjectA or GetObjectW, which conflicts with rapidjson
+
+namespace org::apache::nifi::minifi::controllers {
+
+std::string KeyValueStateStorage::serialize(const core::StateManager::State& kvs) {
+ rapidjson::Document doc(rapidjson::kObjectType);
+ rapidjson::Document::AllocatorType &alloc = doc.GetAllocator();
+ for (const auto &kv : kvs) {
+ doc.AddMember(rapidjson::StringRef(kv.first.c_str(), kv.first.size()),
+ rapidjson::StringRef(kv.second.c_str(), kv.second.size()), alloc);
+ }
+
+ rapidjson::StringBuffer buffer;
+ rapidjson::Writer<rapidjson::StringBuffer> writer(buffer);
+ doc.Accept(writer);
+
+ return buffer.GetString();
+}
+
+core::StateManager::State KeyValueStateStorage::deserialize(const std::string& serialized) {
+ rapidjson::Document doc;
+ rapidjson::ParseResult res = doc.Parse(serialized.c_str(), serialized.length());
+ if (!res || !doc.IsObject()) {
+ using org::apache::nifi::minifi::Exception;
+ using org::apache::nifi::minifi::FILE_OPERATION_EXCEPTION;
+ throw Exception(FILE_OPERATION_EXCEPTION, "Could not deserialize saved state, error during JSON parsing.");
+ }
+
+ core::StateManager::State retState;
+ for (const auto &kv : doc.GetObject()) {
+ retState[kv.name.GetString()] = kv.value.GetString();
+ }
+
+ return retState;
+}
+
+KeyValueStateStorage::KeyValueStateStorage(const std::string& name, const utils::Identifier& uuid)
+ : ControllerService(name, uuid) {
+}
+
+std::unique_ptr<core::StateManager> KeyValueStateStorage::getStateManager(const utils::Identifier& uuid) {
+ return std::make_unique<KeyValueStateManager>(uuid, gsl::make_not_null(this));
+}
+
+std::unordered_map<utils::Identifier, core::StateManager::State> KeyValueStateStorage::getAllStates() {
+ std::unordered_map<utils::Identifier, std::string> all_serialized;
+ if (!getAll(all_serialized)) {
+ return {};
+ }
+
+ std::unordered_map<utils::Identifier, core::StateManager::State> all_deserialized;
+ for (const auto& serialized : all_serialized) {
+ all_deserialized.emplace(serialized.first, deserialize(serialized.second));
+ }
+
+ return all_deserialized;
+}
+
+bool KeyValueStateStorage::getAll(std::unordered_map<utils::Identifier, std::string>& kvs) {
+ std::unordered_map<std::string, std::string> states;
+ if (!get(states)) {
+ return false;
+ }
+ kvs.clear();
+ for (const auto& state : states) {
+ const auto optional_uuid = utils::Identifier::parse(state.first);
+ if (optional_uuid) {
+ kvs[optional_uuid.value()] = state.second;
+ } else {
+ logger_->log_error("Found non-UUID key \"%s\" in storage implementation", state.first);
+ }
+ }
+ return true;
+}
+
+} // namespace org::apache::nifi::minifi::controllers
diff --git a/libminifi/src/controllers/keyvalue/PersistableKeyValueStoreService.cpp b/libminifi/src/controllers/keyvalue/PersistableKeyValueStoreService.cpp
deleted file mode 100644
index 7a17c010b..000000000
--- a/libminifi/src/controllers/keyvalue/PersistableKeyValueStoreService.cpp
+++ /dev/null
@@ -1,63 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include "controllers/keyvalue/PersistableKeyValueStoreService.h"
-#include "core/logging/LoggerConfiguration.h"
-
-namespace org::apache::nifi::minifi::controllers {
-
-PersistableKeyValueStoreService::PersistableKeyValueStoreService(std::string name, const utils::Identifier& uuid /*= utils::Identifier()*/)
- : KeyValueStoreService(std::move(name), uuid) {
-}
-
-PersistableKeyValueStoreService::~PersistableKeyValueStoreService() = default;
-
-bool PersistableKeyValueStoreService::setImpl(const utils::Identifier& key, const std::string& serialized_state) {
- return set(key.to_string(), serialized_state);
-}
-
-bool PersistableKeyValueStoreService::getImpl(const utils::Identifier& key, std::string& serialized_state) {
- return get(key.to_string(), serialized_state);
-}
-
-bool PersistableKeyValueStoreService::getImpl(std::map<utils::Identifier, std::string>& kvs) {
- std::unordered_map<std::string, std::string> states;
- if (!get(states)) {
- return false;
- }
- kvs.clear();
- for (const auto& state : states) {
- const auto optional_uuid = utils::Identifier::parse(state.first);
- if (optional_uuid) {
- kvs[optional_uuid.value()] = state.second;
- } else {
- core::logging::LoggerFactory<PersistableKeyValueStoreService>::getLogger()
- ->log_error("Found non-UUID key \"%s\" in storage implementation", state.first);
- }
- }
- return true;
-}
-
-bool PersistableKeyValueStoreService::removeImpl(const utils::Identifier& key) {
- return remove(key.to_string());
-}
-
-bool PersistableKeyValueStoreService::persistImpl() {
- return persist();
-}
-
-} // namespace org::apache::nifi::minifi::controllers
diff --git a/libminifi/test/StatefulProcessor.h b/libminifi/test/StatefulProcessor.h
index 2afefcd89..0fcfe18f3 100644
--- a/libminifi/test/StatefulProcessor.h
+++ b/libminifi/test/StatefulProcessor.h
@@ -21,7 +21,7 @@
#include <utility>
#include <vector>
#include "core/Processor.h"
-#include "core/CoreComponentState.h"
+#include "core/StateManager.h"
namespace org::apache::nifi::minifi::processors {
@@ -41,13 +41,13 @@ class StatefulProcessor : public core::Processor {
void onSchedule(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSessionFactory>&) override;
void onTrigger(const std::shared_ptr<core::ProcessContext>&, const std::shared_ptr<core::ProcessSession>&) override;
- using HookType = std::function<void(core::CoreComponentStateManager&)>;
+ using HookType = std::function<void(core::StateManager&)>;
void setHooks(HookType onScheduleHook, std::vector<HookType> onTriggerHooks);
[[nodiscard]] bool hasFinishedHooks() const;
private:
mutable std::mutex mutex_;
- core::CoreComponentStateManager* state_manager_;
+ core::StateManager* state_manager_;
HookType on_schedule_hook_;
std::vector<HookType> on_trigger_hooks_;
size_t on_trigger_hook_index_ = 0;
diff --git a/libminifi/test/TestBase.cpp b/libminifi/test/TestBase.cpp
index 11246b7df..b5c48dc1d 100644
--- a/libminifi/test/TestBase.cpp
+++ b/libminifi/test/TestBase.cpp
@@ -198,16 +198,16 @@ TestPlan::TestPlan(std::shared_ptr<minifi::core::ContentRepository> content_repo
stream_factory = org::apache::nifi::minifi::io::StreamFactory::getInstance(std::make_shared<minifi::Configure>());
controller_services_ = std::make_shared<minifi::core::controller::ControllerServiceMap>();
controller_services_provider_ = std::make_shared<minifi::core::controller::StandardControllerServiceProvider>(controller_services_, nullptr, configuration_);
- /* Inject the default state provider ahead of ProcessContext to make sure we have a unique state directory */
+ /* Inject the default state storage ahead of ProcessContext to make sure we have a unique state directory */
if (state_dir == nullptr) {
state_dir_ = std::make_unique<TempDirectory>();
} else {
state_dir_ = std::make_unique<TempDirectory>(state_dir);
}
- if (!configuration_->get(minifi::Configure::nifi_state_management_provider_local_path)) {
- configuration_->set(minifi::Configure::nifi_state_management_provider_local_path, state_dir_->getPath().string());
+ if (!configuration_->get(minifi::Configure::nifi_state_storage_local_path)) {
+ configuration_->set(minifi::Configure::nifi_state_storage_local_path, state_dir_->getPath().string());
}
- state_manager_provider_ = minifi::core::ProcessContext::getOrCreateDefaultStateManagerProvider(controller_services_provider_.get(), configuration_);
+ state_storage_ = minifi::core::ProcessContext::getOrCreateDefaultStateStorage(controller_services_provider_.get(), configuration_);
}
TestPlan::~TestPlan() {
@@ -621,7 +621,7 @@ TestController::TestController()
std::shared_ptr<TestPlan> TestController::createPlan(PlanConfig config) {
if (!config.configuration) {
config.configuration = std::make_shared<minifi::Configure>();
- config.configuration->set(minifi::Configure::nifi_state_management_provider_local_class_name, "UnorderedMapKeyValueStoreService");
+ config.configuration->set(minifi::Configure::nifi_state_storage_local_class_name, "VolatileMapStateStorage");
config.configuration->set(minifi::Configure::nifi_dbcontent_repository_directory_default, createTempDirectory().string());
}
diff --git a/libminifi/test/TestBase.h b/libminifi/test/TestBase.h
index 58530a772..f6989309e 100644
--- a/libminifi/test/TestBase.h
+++ b/libminifi/test/TestBase.h
@@ -47,7 +47,7 @@ namespace core = minifi::core;
namespace org::apache::nifi::minifi {
class Connection;
namespace core {
-class CoreComponentStateManagerProvider;
+class StateStorage;
class ContentRepository;
class FlowFile;
class Processor;
@@ -275,8 +275,8 @@ class TestPlan {
return state_dir_->getPath();
}
- [[nodiscard]] std::shared_ptr<core::CoreComponentStateManagerProvider> getStateManagerProvider() const {
- return state_manager_provider_;
+ [[nodiscard]] std::shared_ptr<core::StateStorage> getStateStorage() const {
+ return state_storage_;
}
std::string getContent(const std::shared_ptr<const minifi::core::FlowFile>& file) const { return getContent(*file); }
@@ -303,7 +303,7 @@ class TestPlan {
std::shared_ptr<minifi::core::controller::ControllerServiceMap> controller_services_;
std::shared_ptr<minifi::core::controller::ControllerServiceProvider> controller_services_provider_;
- std::shared_ptr<minifi::core::CoreComponentStateManagerProvider> state_manager_provider_;
+ std::shared_ptr<minifi::core::StateStorage> state_storage_;
std::recursive_mutex mutex;
diff --git a/libminifi/test/flow-tests/CycleTest.cpp b/libminifi/test/flow-tests/CycleTest.cpp
index 889fd0176..1be43c293 100644
--- a/libminifi/test/flow-tests/CycleTest.cpp
+++ b/libminifi/test/flow-tests/CycleTest.cpp
@@ -96,9 +96,9 @@ Connections:
Remote Processing Groups:
Controller Services:
- - name: defaultstatemanagerprovider
+ - name: defaultstatestorage
id: 2438e3c8-015a-1000-79ca-83af40ec1994
- class: UnorderedMapPersistableKeyValueStoreService
+ class: PersistentMapStateStorage
Properties:
Auto Persistence Interval:
- value: 0 sec
diff --git a/libminifi/test/flow-tests/FlowControllerTests.cpp b/libminifi/test/flow-tests/FlowControllerTests.cpp
index 464b12d2f..517c863be 100644
--- a/libminifi/test/flow-tests/FlowControllerTests.cpp
+++ b/libminifi/test/flow-tests/FlowControllerTests.cpp
@@ -83,9 +83,9 @@ Connections:
Remote Processing Groups:
Controller Services:
- - name: defaultstatemanagerprovider
+ - name: defaultstatestorage
id: 2438e3c8-015a-1000-79ca-83af40ec1995
- class: UnorderedMapPersistableKeyValueStoreService
+ class: PersistentMapStateStorage
Properties:
Auto Persistence Interval:
- value: 0 sec
diff --git a/libminifi/test/flow-tests/LoopTest.cpp b/libminifi/test/flow-tests/LoopTest.cpp
index aa7ca2c48..0d2fd0b22 100644
--- a/libminifi/test/flow-tests/LoopTest.cpp
+++ b/libminifi/test/flow-tests/LoopTest.cpp
@@ -74,9 +74,9 @@ Connections:
Remote Processing Groups:
Controller Services:
- - name: defaultstatemanagerprovider
+ - name: defaultstatestorage
id: 2438e3c8-015a-1000-79ca-83af40ec1996
- class: UnorderedMapPersistableKeyValueStoreService
+ class: PersistentMapStateStorage
Properties:
Auto Persistence Interval:
- value: 0 sec
diff --git a/libminifi/test/flow-tests/MultiLoopTest.cpp b/libminifi/test/flow-tests/MultiLoopTest.cpp
index e30b077b4..a64dabbe4 100644
--- a/libminifi/test/flow-tests/MultiLoopTest.cpp
+++ b/libminifi/test/flow-tests/MultiLoopTest.cpp
@@ -83,9 +83,9 @@ Connections:
Remote Processing Groups:
Controller Services:
- - name: defaultstatemanagerprovider
+ - name: defaultstatestorage
id: 2438e3c8-015a-1000-79ca-83af40ec1997
- class: UnorderedMapPersistableKeyValueStoreService
+ class: PersistentMapStateStorage
Properties:
Auto Persistence Interval:
- value: 0 sec
diff --git a/libminifi/test/integration/IntegrationBase.h b/libminifi/test/integration/IntegrationBase.h
index 2b674546b..8fd5a7553 100644
--- a/libminifi/test/integration/IntegrationBase.h
+++ b/libminifi/test/integration/IntegrationBase.h
@@ -146,7 +146,7 @@ void IntegrationBase::run(const std::optional<std::filesystem::path>& test_file_
if (test_file_location) {
configuration->set(minifi::Configure::nifi_flow_configuration_file, test_file_location->string());
}
- configuration->set(minifi::Configure::nifi_state_management_provider_local_class_name, "UnorderedMapKeyValueStoreService");
+ configuration->set(minifi::Configure::nifi_state_storage_local_class_name, "VolatileMapStateStorage");
configureC2();
configureFullHeartbeat();
@@ -179,10 +179,10 @@ void IntegrationBase::run(const std::optional<std::filesystem::path>& test_file_
auto controller_service_provider = flow_config->getControllerServiceProvider();
char state_dir_name_template[] = "/var/tmp/integrationstate.XXXXXX";
state_dir = utils::file::create_temp_directory(state_dir_name_template);
- if (!configuration->get(minifi::Configure::nifi_state_management_provider_local_path)) {
- configuration->set(minifi::Configure::nifi_state_management_provider_local_path, state_dir.string());
+ if (!configuration->get(minifi::Configure::nifi_state_storage_local_path)) {
+ configuration->set(minifi::Configure::nifi_state_storage_local_path, state_dir.string());
}
- core::ProcessContext::getOrCreateDefaultStateManagerProvider(controller_service_provider.get(), configuration);
+ core::ProcessContext::getOrCreateDefaultStateStorage(controller_service_provider.get(), configuration);
std::shared_ptr<core::ProcessGroup> pg(flow_config->getRoot());
queryRootProcessGroup(pg);
diff --git a/libminifi/test/integration/StateTransactionalityTests.cpp b/libminifi/test/integration/StateTransactionalityTests.cpp
index 299cf4c5e..97602a07c 100644
--- a/libminifi/test/integration/StateTransactionalityTests.cpp
+++ b/libminifi/test/integration/StateTransactionalityTests.cpp
@@ -131,10 +131,10 @@ const std::unordered_map<std::string, HookCollection> testCasesToHookLists {
{"State_is_recorded_after_committing", {
{},
{
- [] (core::CoreComponentStateManager& stateManager) {
+ [] (core::StateManager& stateManager) {
assert(stateManager.set(exampleState));
},
- [] (core::CoreComponentStateManager& stateManager) {
+ [] (core::StateManager& stateManager) {
std::unordered_map<std::string, std::string> state;
assert(stateManager.get(state));
assert(state == exampleState);
@@ -145,14 +145,14 @@ const std::unordered_map<std::string, HookCollection> testCasesToHookLists {
{"State_is_discarded_after_rolling_back", {
{},
{
- [](core::CoreComponentStateManager& stateManager) {
+ [](core::StateManager& stateManager) {
assert(stateManager.set(exampleState));
},
- [](core::CoreComponentStateManager& stateManager) {
+ [](core::StateManager& stateManager) {
assert(stateManager.set(exampleState2));
throw std::runtime_error("Triggering rollback");
},
- [](core::CoreComponentStateManager& stateManager) {
+ [](core::StateManager& stateManager) {
std::unordered_map<std::string, std::string> state;
assert(stateManager.get(state));
assert(state == exampleState);
@@ -162,7 +162,7 @@ const std::unordered_map<std::string, HookCollection> testCasesToHookLists {
}},
{
"Get_in_onSchedule_without_previous_state", {
- [](core::CoreComponentStateManager& stateManager) {
+ [](core::StateManager& stateManager) {
std::unordered_map<std::string, std::string> state;
assert(!stateManager.get(state));
assert(state.empty());
@@ -173,11 +173,11 @@ const std::unordered_map<std::string, HookCollection> testCasesToHookLists {
},
{
"Set_in_onSchedule", {
- [](core::CoreComponentStateManager& stateManager) {
+ [](core::StateManager& stateManager) {
assert(stateManager.set(exampleState));
},
{
- [](core::CoreComponentStateManager& stateManager) {
+ [](core::StateManager& stateManager) {
std::unordered_map<std::string, std::string> state;
assert(stateManager.get(state));
assert(state == exampleState);
@@ -188,13 +188,13 @@ const std::unordered_map<std::string, HookCollection> testCasesToHookLists {
},
{
"Clear_in_onSchedule", {
- [](core::CoreComponentStateManager& stateManager) {
+ [](core::StateManager& stateManager) {
assert(!stateManager.clear());
assert(stateManager.set(exampleState));
assert(stateManager.clear());
},
{
- [](core::CoreComponentStateManager& stateManager) {
+ [](core::StateManager& stateManager) {
std::unordered_map<std::string, std::string> state;
assert(!stateManager.get(state));
assert(state.empty());
@@ -206,7 +206,7 @@ const std::unordered_map<std::string, HookCollection> testCasesToHookLists {
{
"Persist_in_onSchedule", {
{
- [](core::CoreComponentStateManager& stateManager) {
+ [](core::StateManager& stateManager) {
assert(stateManager.persist());
}
},
@@ -218,7 +218,7 @@ const std::unordered_map<std::string, HookCollection> testCasesToHookLists {
"Manual_beginTransaction", {
{},
{
- [](core::CoreComponentStateManager& stateManager) {
+ [](core::StateManager& stateManager) {
assert(!stateManager.beginTransaction());
}
},
@@ -229,7 +229,7 @@ const std::unordered_map<std::string, HookCollection> testCasesToHookLists {
"Manual_commit", {
{},
{
- [](core::CoreComponentStateManager& stateManager) {
+ [](core::StateManager& stateManager) {
assert(stateManager.set(exampleState));
assert(stateManager.commit());
}
@@ -241,7 +241,7 @@ const std::unordered_map<std::string, HookCollection> testCasesToHookLists {
"Manual_rollback", {
{},
{
- [](core::CoreComponentStateManager& stateManager) {
+ [](core::StateManager& stateManager) {
assert(stateManager.rollback());
}
},
@@ -252,7 +252,7 @@ const std::unordered_map<std::string, HookCollection> testCasesToHookLists {
"Get_without_previous_state", {
{},
{
- [](core::CoreComponentStateManager& stateManager) {
+ [](core::StateManager& stateManager) {
std::unordered_map<std::string, std::string> state;
assert(!stateManager.get(state));
assert(state.empty());
@@ -265,10 +265,10 @@ const std::unordered_map<std::string, HookCollection> testCasesToHookLists {
"(set),(get,get)", {
{},
{
- [](core::CoreComponentStateManager& stateManager) {
+ [](core::StateManager& stateManager) {
assert(stateManager.set(exampleState));
},
- [](core::CoreComponentStateManager& stateManager) {
+ [](core::StateManager& stateManager) {
std::unordered_map<std::string, std::string> state;
assert(stateManager.get(state));
assert(state == exampleState);
@@ -283,10 +283,10 @@ const std::unordered_map<std::string, HookCollection> testCasesToHookLists {
"(set),(get,set)", {
{},
{
- [](core::CoreComponentStateManager& stateManager) {
+ [](core::StateManager& stateManager) {
assert(stateManager.set(exampleState));
},
- [](core::CoreComponentStateManager& stateManager) {
+ [](core::StateManager& stateManager) {
std::unordered_map<std::string, std::string> state;
assert(stateManager.get(state));
assert(state == exampleState);
@@ -300,10 +300,10 @@ const std::unordered_map<std::string, HookCollection> testCasesToHookLists {
"(set),(get,clear)", {
{},
{
- [](core::CoreComponentStateManager& stateManager) {
+ [](core::StateManager& stateManager) {
assert(stateManager.set(exampleState));
},
- [](core::CoreComponentStateManager& stateManager) {
+ [](core::StateManager& stateManager) {
std::unordered_map<std::string, std::string> state;
assert(stateManager.get(state));
assert(state == exampleState);
@@ -317,10 +317,10 @@ const std::unordered_map<std::string, HookCollection> testCasesToHookLists {
"(set),(get,persist)", {
{},
{
- [](core::CoreComponentStateManager& stateManager) {
+ [](core::StateManager& stateManager) {
assert(stateManager.set(exampleState));
},
- [](core::CoreComponentStateManager& stateManager) {
+ [](core::StateManager& stateManager) {
std::unordered_map<std::string, std::string> state;
assert(stateManager.get(state));
assert(state == exampleState);
@@ -334,7 +334,7 @@ const std::unordered_map<std::string, HookCollection> testCasesToHookLists {
"(set,!get)", {
{},
{
- [](core::CoreComponentStateManager& stateManager) {
+ [](core::StateManager& stateManager) {
assert(stateManager.set(exampleState));
std::unordered_map<std::string, std::string> state;
assert(!stateManager.get(state));
@@ -348,7 +348,7 @@ const std::unordered_map<std::string, HookCollection> testCasesToHookLists {
"(set,set)", {
{},
{
- [](core::CoreComponentStateManager& stateManager) {
+ [](core::StateManager& stateManager) {
assert(stateManager.set(exampleState));
assert(stateManager.set(exampleState));
},
@@ -360,7 +360,7 @@ const std::unordered_map<std::string, HookCollection> testCasesToHookLists {
"(set,!clear)", {
{},
{
- [](core::CoreComponentStateManager& stateManager) {
+ [](core::StateManager& stateManager) {
assert(stateManager.set(exampleState));
assert(!stateManager.clear());
},
@@ -372,7 +372,7 @@ const std::unordered_map<std::string, HookCollection> testCasesToHookLists {
"(set,persist)", {
{},
{
- [](core::CoreComponentStateManager& stateManager) {
+ [](core::StateManager& stateManager) {
assert(stateManager.set(exampleState));
assert(stateManager.persist());
},
@@ -384,10 +384,10 @@ const std::unordered_map<std::string, HookCollection> testCasesToHookLists {
"(set),(clear,!get)", {
{},
{
- [](core::CoreComponentStateManager& stateManager) {
+ [](core::StateManager& stateManager) {
assert(stateManager.set(exampleState));
},
- [](core::CoreComponentStateManager& stateManager) {
+ [](core::StateManager& stateManager) {
assert(stateManager.clear());
std::unordered_map<std::string, std::string> state;
assert(!stateManager.get(state));
@@ -401,21 +401,21 @@ const std::unordered_map<std::string, HookCollection> testCasesToHookLists {
"(set),(clear),(!get),(set),(get)", {
{},
{
- [](core::CoreComponentStateManager& stateManager) {
+ [](core::StateManager& stateManager) {
assert(stateManager.set(exampleState));
},
- [](core::CoreComponentStateManager& stateManager) {
+ [](core::StateManager& stateManager) {
assert(stateManager.clear());
},
- [](core::CoreComponentStateManager& stateManager) {
+ [](core::StateManager& stateManager) {
std::unordered_map<std::string, std::string> state;
assert(!stateManager.get(state));
assert(state.empty());
},
- [](core::CoreComponentStateManager& stateManager) {
+ [](core::StateManager& stateManager) {
assert(stateManager.set(exampleState));
},
- [](core::CoreComponentStateManager& stateManager) {
+ [](core::StateManager& stateManager) {
std::unordered_map<std::string, std::string> state;
assert(stateManager.get(state));
assert(state == exampleState);
@@ -428,10 +428,10 @@ const std::unordered_map<std::string, HookCollection> testCasesToHookLists {
"(set),(clear,set)", {
{},
{
- [](core::CoreComponentStateManager& stateManager) {
+ [](core::StateManager& stateManager) {
assert(stateManager.set(exampleState));
},
- [](core::CoreComponentStateManager& stateManager) {
+ [](core::StateManager& stateManager) {
assert(stateManager.clear());
assert(stateManager.set(exampleState));
},
@@ -443,13 +443,13 @@ const std::unordered_map<std::string, HookCollection> testCasesToHookLists {
"(set),(clear),(!clear)", {
{},
{
- [](core::CoreComponentStateManager& stateManager) {
+ [](core::StateManager& stateManager) {
assert(stateManager.set(exampleState));
},
- [](core::CoreComponentStateManager& stateManager) {
+ [](core::StateManager& stateManager) {
assert(stateManager.clear());
},
- [](core::CoreComponentStateManager& stateManager) {
+ [](core::StateManager& stateManager) {
assert(!stateManager.clear());
},
},
@@ -460,13 +460,13 @@ const std::unordered_map<std::string, HookCollection> testCasesToHookLists {
"(set),(clear),(persist)", {
{},
{
- [](core::CoreComponentStateManager& stateManager) {
+ [](core::StateManager& stateManager) {
assert(stateManager.set(exampleState));
},
- [](core::CoreComponentStateManager& stateManager) {
+ [](core::StateManager& stateManager) {
assert(stateManager.clear());
},
- [](core::CoreComponentStateManager& stateManager) {
+ [](core::StateManager& stateManager) {
assert(stateManager.persist());
},
},
@@ -477,13 +477,13 @@ const std::unordered_map<std::string, HookCollection> testCasesToHookLists {
"(persist),(set),(get)", {
{},
{
- [](core::CoreComponentStateManager& stateManager) {
+ [](core::StateManager& stateManager) {
assert(stateManager.persist());
},
- [](core::CoreComponentStateManager& stateManager) {
+ [](core::StateManager& stateManager) {
assert(stateManager.set(exampleState));
},
- [](core::CoreComponentStateManager& stateManager) {
+ [](core::StateManager& stateManager) {
std::unordered_map<std::string, std::string> state;
assert(stateManager.get(state));
assert(state == exampleState);
@@ -496,13 +496,13 @@ const std::unordered_map<std::string, HookCollection> testCasesToHookLists {
"(persist),(set),(clear)", {
{},
{
- [](core::CoreComponentStateManager& stateManager) {
+ [](core::StateManager& stateManager) {
assert(stateManager.persist());
},
- [](core::CoreComponentStateManager& stateManager) {
+ [](core::StateManager& stateManager) {
assert(stateManager.set(exampleState));
},
- [](core::CoreComponentStateManager& stateManager) {
+ [](core::StateManager& stateManager) {
assert(stateManager.clear());
},
},
@@ -513,10 +513,10 @@ const std::unordered_map<std::string, HookCollection> testCasesToHookLists {
"(persist),(persist)", {
{},
{
- [](core::CoreComponentStateManager& stateManager) {
+ [](core::StateManager& stateManager) {
assert(stateManager.persist());
},
- [](core::CoreComponentStateManager& stateManager) {
+ [](core::StateManager& stateManager) {
assert(stateManager.persist());
},
},
@@ -527,8 +527,8 @@ const std::unordered_map<std::string, HookCollection> testCasesToHookLists {
"No_change_2_rounds", {
{},
{
- [](core::CoreComponentStateManager&) {},
- [](core::CoreComponentStateManager&) {},
+ [](core::StateManager&) {},
+ [](core::StateManager&) {},
},
standardLogChecker
},
@@ -537,7 +537,7 @@ const std::unordered_map<std::string, HookCollection> testCasesToHookLists {
"(!clear)", {
{},
{
- [](core::CoreComponentStateManager& stateManager) {
+ [](core::StateManager& stateManager) {
assert(!stateManager.clear());
},
},
@@ -548,10 +548,10 @@ const std::unordered_map<std::string, HookCollection> testCasesToHookLists {
"(set),(get,throw)", {
{},
{
- [](core::CoreComponentStateManager& stateManager) {
+ [](core::StateManager& stateManager) {
assert(stateManager.set(exampleState));
},
- [](core::CoreComponentStateManager& stateManager) {
+ [](core::StateManager& stateManager) {
std::unordered_map<std::string, std::string> state;
assert(stateManager.get(state));
assert(state == exampleState);
@@ -565,14 +565,14 @@ const std::unordered_map<std::string, HookCollection> testCasesToHookLists {
"(set),(clear,throw),(get)", {
{},
{
- [](core::CoreComponentStateManager& stateManager) {
+ [](core::StateManager& stateManager) {
assert(stateManager.set(exampleState));
},
- [](core::CoreComponentStateManager& stateManager) {
+ [](core::StateManager& stateManager) {
assert(stateManager.clear());
throw std::runtime_error("Triggering rollback");
},
- [](core::CoreComponentStateManager& stateManager) {
+ [](core::StateManager& stateManager) {
std::unordered_map<std::string, std::string> state;
assert(stateManager.get(state));
assert(state == exampleState);
@@ -585,7 +585,7 @@ const std::unordered_map<std::string, HookCollection> testCasesToHookLists {
"(set),(clear,throw),(get)", {
{},
{
- [](core::CoreComponentStateManager& stateManager) {
+ [](core::StateManager& stateManager) {
assert(stateManager.persist());
throw std::runtime_error("Triggering rollback");
},
diff --git a/libminifi/test/keyvalue-tests/CMakeLists.txt b/libminifi/test/keyvalue-tests/CMakeLists.txt
index 83961cb60..b8c42bb7a 100644
--- a/libminifi/test/keyvalue-tests/CMakeLists.txt
+++ b/libminifi/test/keyvalue-tests/CMakeLists.txt
@@ -36,8 +36,8 @@ ENDFOREACH()
message("-- Finished building ${KEYVALUE_INT_TEST_COUNT} keyvalue controller test file(s)...")
-add_test(NAME UnorderedMapPersistableKeyValueStoreServiceTest COMMAND PersistableKeyValueStoreServiceTest --config-yaml "${TEST_RESOURCES}/UnorderedMapPersistableKeyValueStoreServiceTest.yml")
+add_test(NAME PersistentMapStateStorageTest COMMAND PersistentStateStorageTest --config-yaml "${TEST_RESOURCES}/PersistentMapStateStorage.yml")
if (NOT DISABLE_ROCKSDB)
- add_test(NAME RocksdDbPersistableKeyValueStoreServiceTest COMMAND PersistableKeyValueStoreServiceTest --config-yaml "${TEST_RESOURCES}/RocksDbPersistableKeyValueStoreServiceTest.yml")
+ add_test(NAME RocksDbStateStorageTest COMMAND PersistentStateStorageTest --config-yaml "${TEST_RESOURCES}/RocksDbStateStorage.yml")
endif()
-add_test(NAME UnorderedMapKeyValueStoreServiceTest COMMAND UnorderedMapKeyValueStoreServiceTest --config-yaml "${TEST_RESOURCES}/UnorderedMapKeyValueStoreServiceTest.yml")
+add_test(NAME VolatileMapStateStorageTest COMMAND VolatileMapStateStorageTest --config-yaml "${TEST_RESOURCES}/VolatileMapStateStorage.yml")
diff --git a/libminifi/test/keyvalue-tests/PersistableKeyValueStoreServiceTest.cpp b/libminifi/test/keyvalue-tests/PersistentStateStorageTest.cpp
similarity index 80%
rename from libminifi/test/keyvalue-tests/PersistableKeyValueStoreServiceTest.cpp
rename to libminifi/test/keyvalue-tests/PersistentStateStorageTest.cpp
index 33e02d913..6c945ef3c 100644
--- a/libminifi/test/keyvalue-tests/PersistableKeyValueStoreServiceTest.cpp
+++ b/libminifi/test/keyvalue-tests/PersistentStateStorageTest.cpp
@@ -28,8 +28,8 @@
#include "core/controller/ControllerService.h"
#include "core/ProcessGroup.h"
#include "core/yaml/YamlConfiguration.h"
-#include "controllers/keyvalue/PersistableKeyValueStoreService.h"
-#include "controllers/keyvalue/AbstractAutoPersistingKeyValueStoreService.h"
+#include "controllers/keyvalue/KeyValueStateStorage.h"
+#include "controllers/keyvalue/AutoPersistor.h"
#include "unit/ProvenanceTestHelper.h"
#include "repository/VolatileContentRepository.h"
@@ -41,7 +41,7 @@ int main(int argc, char* argv[]) {
auto cli = session.cli()
| Catch::clara::Opt{config_yaml, "config-yaml"}
["--config-yaml"]
- ("path to the config.yaml containing the PersistableKeyValueStoreService controller service configuration");
+ ("path to the config.yaml containing the StateStorage controller service configuration");
session.cli(cli);
int ret = session.applyCommandLine(argc, argv);
@@ -50,25 +50,25 @@ int main(int argc, char* argv[]) {
}
if (config_yaml.empty()) {
- std::cerr << "Missing --config-yaml <path>. It must contain the path to the config.yaml containing the PersistableKeyValueStoreService controller service configuration." << std::endl;
+ std::cerr << "Missing --config-yaml <path>. It must contain the path to the config.yaml containing the StateStorage controller service configuration." << std::endl;
return -1;
}
return session.run();
}
-class PersistableKeyValueStoreServiceTestsFixture {
+class PersistentStateStorageTestsFixture {
public:
- PersistableKeyValueStoreServiceTestsFixture() {
+ PersistentStateStorageTestsFixture() {
LogTestController::getInstance().setTrace<TestPlan>();
- LogTestController::getInstance().setTrace<minifi::controllers::PersistableKeyValueStoreService>();
- LogTestController::getInstance().setTrace<minifi::controllers::AbstractAutoPersistingKeyValueStoreService>();
+ LogTestController::getInstance().setTrace<minifi::controllers::KeyValueStateStorage>();
+ LogTestController::getInstance().setTrace<minifi::controllers::AutoPersistor>();
std::filesystem::current_path(testController.createTempDirectory());
loadYaml();
}
- virtual ~PersistableKeyValueStoreServiceTestsFixture() {
+ virtual ~PersistentStateStorageTestsFixture() {
LogTestController::getInstance().reset();
}
@@ -102,7 +102,7 @@ class PersistableKeyValueStoreServiceTestsFixture {
REQUIRE(persistable_key_value_store_service_node != nullptr);
persistable_key_value_store_service_node->enable();
- controller = std::dynamic_pointer_cast<minifi::controllers::PersistableKeyValueStoreService>(
+ controller = std::dynamic_pointer_cast<minifi::controllers::KeyValueStateStorage>(
persistable_key_value_store_service_node->getControllerServiceImplementation());
REQUIRE(controller != nullptr);
}
@@ -118,13 +118,13 @@ class PersistableKeyValueStoreServiceTestsFixture {
std::unique_ptr<core::ProcessGroup> process_group;
std::shared_ptr<core::controller::ControllerServiceNode> persistable_key_value_store_service_node;
- std::shared_ptr<minifi::controllers::PersistableKeyValueStoreService> controller;
+ std::shared_ptr<minifi::controllers::KeyValueStateStorage> controller;
TestController testController;
};
-TEST_CASE_METHOD(PersistableKeyValueStoreServiceTestsFixture, "PersistableKeyValueStoreServiceTestsFixture set and get", "[basic]") {
+TEST_CASE_METHOD(PersistentStateStorageTestsFixture, "PersistentStateStorageTestsFixture set and get", "[basic]") {
const char* key = "foobar";
const char* value = "234";
REQUIRE(true == controller->set(key, value));
@@ -141,7 +141,7 @@ TEST_CASE_METHOD(PersistableKeyValueStoreServiceTestsFixture, "PersistableKeyVal
REQUIRE(value == res);
}
-TEST_CASE_METHOD(PersistableKeyValueStoreServiceTestsFixture, "PersistableKeyValueStoreServiceTestsFixture special characters", "[basic]") {
+TEST_CASE_METHOD(PersistentStateStorageTestsFixture, "PersistentStateStorageTestsFixture special characters", "[basic]") {
const char* key = "[]{}()==\\=\n\n";
const char* value = ":./'\\=!\n=[]{}()";
REQUIRE(true == controller->set(key, value));
@@ -158,7 +158,7 @@ TEST_CASE_METHOD(PersistableKeyValueStoreServiceTestsFixture, "PersistableKeyVal
REQUIRE(value == res);
}
-TEST_CASE_METHOD(PersistableKeyValueStoreServiceTestsFixture, "PersistableKeyValueStoreServiceTestsFixture set and get all", "[basic]") {
+TEST_CASE_METHOD(PersistentStateStorageTestsFixture, "PersistentStateStorageTestsFixture set and get all", "[basic]") {
const std::unordered_map<std::string, std::string> kvs = {
{"foobar", "234"},
{"buzz", "value"},
@@ -179,7 +179,7 @@ TEST_CASE_METHOD(PersistableKeyValueStoreServiceTestsFixture, "PersistableKeyVal
REQUIRE(kvs == kvs_res);
}
-TEST_CASE_METHOD(PersistableKeyValueStoreServiceTestsFixture, "PersistableKeyValueStoreServiceTestsFixture set and overwrite", "[basic]") {
+TEST_CASE_METHOD(PersistentStateStorageTestsFixture, "PersistentStateStorageTestsFixture set and overwrite", "[basic]") {
const char* key = "foobar";
const char* value = "234";
const char* new_value = "baz";
@@ -198,7 +198,7 @@ TEST_CASE_METHOD(PersistableKeyValueStoreServiceTestsFixture, "PersistableKeyVal
REQUIRE(new_value == res);
}
-TEST_CASE_METHOD(PersistableKeyValueStoreServiceTestsFixture, "PersistableKeyValueStoreServiceTestsFixture set and remove", "[basic]") {
+TEST_CASE_METHOD(PersistentStateStorageTestsFixture, "PersistentStateStorageTestsFixture set and remove", "[basic]") {
const char* key = "foobar";
const char* value = "234";
REQUIRE(true == controller->set(key, value));
@@ -216,7 +216,7 @@ TEST_CASE_METHOD(PersistableKeyValueStoreServiceTestsFixture, "PersistableKeyVal
}
-TEST_CASE_METHOD(PersistableKeyValueStoreServiceTestsFixture, "PersistableKeyValueStoreServiceTestsFixture set and clear", "[basic]") {
+TEST_CASE_METHOD(PersistentStateStorageTestsFixture, "PersistentStateStorageTestsFixture set and clear", "[basic]") {
const std::unordered_map<std::string, std::string> kvs = {
{"foobar", "234"},
{"buzz", "value"},
diff --git a/libminifi/test/keyvalue-tests/UnorderedMapKeyValueStoreServiceTest.cpp b/libminifi/test/keyvalue-tests/VolatileMapStateStorageTest.cpp
similarity index 78%
rename from libminifi/test/keyvalue-tests/UnorderedMapKeyValueStoreServiceTest.cpp
rename to libminifi/test/keyvalue-tests/VolatileMapStateStorageTest.cpp
index 6f0dc9f45..6edc57df0 100644
--- a/libminifi/test/keyvalue-tests/UnorderedMapKeyValueStoreServiceTest.cpp
+++ b/libminifi/test/keyvalue-tests/VolatileMapStateStorageTest.cpp
@@ -21,14 +21,13 @@
#include <string>
#include "../TestBase.h"
#include "../Catch.h"
-#include "core/controller/ControllerService.h"
+#include "controllers/keyvalue/AutoPersistor.h"
+#include "controllers/keyvalue/KeyValueStateStorage.h"
#include "core/ProcessGroup.h"
#include "core/yaml/YamlConfiguration.h"
#include "unit/ProvenanceTestHelper.h"
#include "repository/VolatileContentRepository.h"
-#include "Catch.h"
-
namespace {
std::string config_yaml; // NOLINT
}
@@ -39,7 +38,7 @@ int main(int argc, char* argv[]) {
auto cli = session.cli()
| Catch::clara::Opt{config_yaml, "config-yaml"}
["--config-yaml"]
- ("path to the config.yaml containing the UnorderedMapKeyValueStoreServiceTest controller service configuration");
+ ("path to the config.yaml containing the VolatileMapStateStorageTest controller service configuration");
session.cli(cli);
int ret = session.applyCommandLine(argc, argv);
@@ -48,19 +47,19 @@ int main(int argc, char* argv[]) {
}
if (config_yaml.empty()) {
- std::cerr << "Missing --config-yaml <path>. It must contain the path to the config.yaml containing the UnorderedMapKeyValueStoreServiceTest controller service configuration." << std::endl;
+ std::cerr << "Missing --config-yaml <path>. It must contain the path to the config.yaml containing the VolatileMapStateStorageTest controller service configuration." << std::endl;
return -1;
}
return session.run();
}
-class UnorderedMapKeyValueStoreServiceTestFixture {
+class VolatileMapStateStorageTestFixture {
public:
- UnorderedMapKeyValueStoreServiceTestFixture() {
+ VolatileMapStateStorageTestFixture() {
LogTestController::getInstance().setTrace<TestPlan>();
- LogTestController::getInstance().setTrace<minifi::controllers::PersistableKeyValueStoreService>();
- LogTestController::getInstance().setTrace<minifi::controllers::AbstractAutoPersistingKeyValueStoreService>();
+ LogTestController::getInstance().setTrace<minifi::controllers::KeyValueStateStorage>();
+ LogTestController::getInstance().setTrace<minifi::controllers::AutoPersistor>();
std::filesystem::current_path(testController.createTempDirectory());
@@ -72,12 +71,12 @@ class UnorderedMapKeyValueStoreServiceTestFixture {
REQUIRE(key_value_store_service_node != nullptr);
key_value_store_service_node->enable();
- controller = std::dynamic_pointer_cast<minifi::controllers::PersistableKeyValueStoreService>(
+ controller = std::dynamic_pointer_cast<minifi::controllers::KeyValueStateStorage>(
key_value_store_service_node->getControllerServiceImplementation());
REQUIRE(controller != nullptr);
}
- virtual ~UnorderedMapKeyValueStoreServiceTestFixture() {
+ virtual ~VolatileMapStateStorageTestFixture() {
LogTestController::getInstance().reset();
}
@@ -93,12 +92,12 @@ class UnorderedMapKeyValueStoreServiceTestFixture {
std::unique_ptr<core::ProcessGroup> process_group;
std::shared_ptr<core::controller::ControllerServiceNode> key_value_store_service_node;
- std::shared_ptr<minifi::controllers::PersistableKeyValueStoreService> controller;
+ std::shared_ptr<minifi::controllers::KeyValueStateStorage> controller;
TestController testController;
};
-TEST_CASE_METHOD(UnorderedMapKeyValueStoreServiceTestFixture, "UnorderedMapKeyValueStoreServiceTest set and get", "[basic]") {
+TEST_CASE_METHOD(VolatileMapStateStorageTestFixture, "VolatileMapStateStorageTest set and get", "[basic]") {
const char* key = "foobar";
const char* value = "234";
REQUIRE(true == controller->set(key, value));
@@ -108,7 +107,7 @@ TEST_CASE_METHOD(UnorderedMapKeyValueStoreServiceTestFixture, "UnorderedMapKeyVa
REQUIRE(value == res);
}
-TEST_CASE_METHOD(UnorderedMapKeyValueStoreServiceTestFixture, "UnorderedMapKeyValueStoreServiceTestFixture set and get all", "[basic]") {
+TEST_CASE_METHOD(VolatileMapStateStorageTestFixture, "VolatileMapStateStorageTestFixture set and get all", "[basic]") {
const std::unordered_map<std::string, std::string> kvs = {
{"foobar", "234"},
{"buzz", "value"},
@@ -122,7 +121,7 @@ TEST_CASE_METHOD(UnorderedMapKeyValueStoreServiceTestFixture, "UnorderedMapKeyVa
REQUIRE(kvs == kvs_res);
}
-TEST_CASE_METHOD(UnorderedMapKeyValueStoreServiceTestFixture, "UnorderedMapKeyValueStoreServiceTestFixture set and overwrite", "[basic]") {
+TEST_CASE_METHOD(VolatileMapStateStorageTestFixture, "VolatileMapStateStorageTestFixture set and overwrite", "[basic]") {
const char* key = "foobar";
const char* value = "234";
const char* new_value = "baz";
@@ -134,7 +133,7 @@ TEST_CASE_METHOD(UnorderedMapKeyValueStoreServiceTestFixture, "UnorderedMapKeyVa
REQUIRE(new_value == res);
}
-TEST_CASE_METHOD(UnorderedMapKeyValueStoreServiceTestFixture, "UnorderedMapKeyValueStoreServiceTestFixture set and remove", "[basic]") {
+TEST_CASE_METHOD(VolatileMapStateStorageTestFixture, "VolatileMapStateStorageTestFixture set and remove", "[basic]") {
const char* key = "foobar";
const char* value = "234";
REQUIRE(true == controller->set(key, value));
@@ -145,7 +144,7 @@ TEST_CASE_METHOD(UnorderedMapKeyValueStoreServiceTestFixture, "UnorderedMapKeyVa
}
-TEST_CASE_METHOD(UnorderedMapKeyValueStoreServiceTestFixture, "UnorderedMapKeyValueStoreServiceTestFixture set and clear", "[basic]") {
+TEST_CASE_METHOD(VolatileMapStateStorageTestFixture, "VolatileMapStateStorageTestFixture set and clear", "[basic]") {
const std::unordered_map<std::string, std::string> kvs = {
{"foobar", "234"},
{"buzz", "value"},
diff --git a/libminifi/test/resources/UnorderedMapPersistableKeyValueStoreServiceTest.yml b/libminifi/test/resources/PersistentMapStateStorage.yml
similarity index 95%
rename from libminifi/test/resources/UnorderedMapPersistableKeyValueStoreServiceTest.yml
rename to libminifi/test/resources/PersistentMapStateStorage.yml
index 61d8f2609..e1c11a484 100644
--- a/libminifi/test/resources/UnorderedMapPersistableKeyValueStoreServiceTest.yml
+++ b/libminifi/test/resources/PersistentMapStateStorage.yml
@@ -26,7 +26,7 @@ Connections: []
Controller Services:
- name: testcontroller
id: 2438e3c8-015a-1000-79ca-83af40ec1994
- class: UnorderedMapPersistableKeyValueStoreService
+ class: PersistentMapStateStorage
Properties:
Auto Persistence Interval:
- value: 0 sec
diff --git a/libminifi/test/resources/RocksDbPersistableKeyValueStoreServiceTest.yml b/libminifi/test/resources/RocksDbStateStorage.yml
similarity index 95%
rename from libminifi/test/resources/RocksDbPersistableKeyValueStoreServiceTest.yml
rename to libminifi/test/resources/RocksDbStateStorage.yml
index 5819063c9..6600445d4 100644
--- a/libminifi/test/resources/RocksDbPersistableKeyValueStoreServiceTest.yml
+++ b/libminifi/test/resources/RocksDbStateStorage.yml
@@ -26,7 +26,7 @@ Connections: []
Controller Services:
- name: testcontroller
id: 2438e3c8-015a-1000-79ca-83af40ec1994
- class: RocksDbPersistableKeyValueStoreService
+ class: RocksDbStateStorage
Properties:
Auto Persistence Interval:
- value: 0 sec
diff --git a/libminifi/test/resources/UnorderedMapKeyValueStoreServiceTest.yml b/libminifi/test/resources/VolatileMapStateStorage.yml
similarity index 95%
rename from libminifi/test/resources/UnorderedMapKeyValueStoreServiceTest.yml
rename to libminifi/test/resources/VolatileMapStateStorage.yml
index 45a6171c7..7f372cc4f 100644
--- a/libminifi/test/resources/UnorderedMapKeyValueStoreServiceTest.yml
+++ b/libminifi/test/resources/VolatileMapStateStorage.yml
@@ -26,6 +26,6 @@ Connections: []
Controller Services:
- name: testcontroller
id: 2438e3c8-015a-1000-79ca-83af40ec1994
- class: UnorderedMapKeyValueStoreService
+ class: VolatileMapStateStorage
Remote Processing Groups: []