You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ab...@apache.org on 2020/04/27 19:49:10 UTC
[nifi-minifi-cpp] branch master updated: MINIFICPP-550 - Implement
RocksDB controller service and component state storage
This is an automated email from the ASF dual-hosted git repository.
aboda pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
The following commit(s) were added to refs/heads/master by this push:
new fb2bf6a MINIFICPP-550 - Implement RocksDB controller service and component state storage
fb2bf6a is described below
commit fb2bf6ac31e48f88b5b01cd2b75a1b74ca5c1e30
Author: Daniel Bakai <ba...@gmail.com>
AuthorDate: Thu Jul 4 12:30:37 2019 +0200
MINIFICPP-550 - Implement RocksDB controller service and component state storage
Signed-off-by: Arpad Boda <ab...@apache.org>
This closes #605
---
CMakeLists.txt | 3 +
conf/minifi.properties | 11 +
.../tests/C2DescribeCoreComponentStateTest.cpp | 116 +++++++
extensions/http-curl/tests/CMakeLists.txt | 1 +
extensions/rocksdb-repos/CMakeLists.txt | 2 +-
extensions/rocksdb-repos/RocksDBLoader.h | 5 +
.../RocksDbPersistableKeyValueStoreService.cpp | 193 +++++++++++
.../RocksDbPersistableKeyValueStoreService.h | 87 +++++
extensions/sftp/processors/ListSFTP.cpp | 385 +++++++++------------
extensions/sftp/processors/ListSFTP.h | 14 +-
extensions/sftp/tests/CMakeLists.txt | 3 +
extensions/sftp/tests/FetchSFTPTests.cpp | 4 +-
extensions/sftp/tests/ListSFTPTests.cpp | 119 ++++++-
extensions/sftp/tests/ListThenFetchSFTPTests.cpp | 4 +-
extensions/sftp/tests/PutSFTPTests.cpp | 6 +-
extensions/sql/processors/ExecuteSQL.cpp | 4 +-
extensions/sql/processors/ExecuteSQL.h | 4 +-
extensions/sql/processors/PutSQL.cpp | 2 +-
extensions/sql/processors/PutSQL.h | 2 +-
extensions/sql/processors/QueryDatabaseTable.cpp | 177 ++++------
extensions/sql/processors/QueryDatabaseTable.h | 8 +-
extensions/standard-processors/CMakeLists.txt | 2 +-
.../UnorderedMapKeyValueStoreService.cpp | 111 ++++++
.../controllers/UnorderedMapKeyValueStoreService.h | 75 ++++
...UnorderedMapPersistableKeyValueStoreService.cpp | 260 ++++++++++++++
.../UnorderedMapPersistableKeyValueStoreService.h | 88 +++++
.../standard-processors/processors/TailFile.cpp | 114 ++++--
.../standard-processors/processors/TailFile.h | 9 +-
.../tests/unit/TailFileTests.cpp | 254 +++++++-------
extensions/windows-event-log/Bookmark.cpp | 116 +++----
extensions/windows-event-log/Bookmark.h | 9 +-
.../windows-event-log/ConsumeWindowsEventLog.cpp | 13 +-
.../windows-event-log/ConsumeWindowsEventLog.h | 4 +-
.../windows-event-log/wel/UnicodeConversion.h | 5 +
libminifi/CMakeLists.txt | 2 +-
libminifi/include/FlowController.h | 10 +
.../AbstractAutoPersistingKeyValueStoreService.h | 75 ++++
.../AbstractCoreComponentStateManagerProvider.h | 76 ++++
.../controllers/keyvalue/KeyValueStoreService.h | 66 ++++
.../keyvalue/PersistableKeyValueStoreService.h | 56 +++
libminifi/include/core/ConfigurableComponent.h | 8 +-
libminifi/include/core/CoreComponentState.h | 67 ++++
libminifi/include/core/ProcessContext.h | 113 ++++++
.../include/core/controller/ControllerService.h | 3 +
.../core/controller/ControllerServiceProvider.h | 7 +
.../controller/StandardControllerServiceProvider.h | 14 +-
libminifi/include/properties/Configure.h | 5 +
libminifi/src/Configure.cpp | 3 +
libminifi/src/FlowController.cpp | 16 +
libminifi/src/c2/C2Agent.cpp | 47 +++
.../AbstractAutoPersistingKeyValueStoreService.cpp | 135 ++++++++
.../AbstractCoreComponentStateManagerProvider.cpp | 143 ++++++++
.../controllers/keyvalue/KeyValueStoreService.cpp | 52 +--
.../keyvalue/PersistableKeyValueStoreService.cpp | 60 ++++
libminifi/src/core/ConfigurableComponent.cpp | 13 +
libminifi/src/core/FlowConfiguration.cpp | 4 +
libminifi/test/TestBase.cpp | 58 +++-
libminifi/test/TestBase.h | 30 +-
libminifi/test/integration/IntegrationBase.h | 6 +
libminifi/test/keyvalue-tests/CMakeLists.txt | 42 +++
.../PersistableKeyValueStoreServiceTest.cpp | 254 ++++++++++++++
.../RocksDbPersistableKeyValueStoreServiceTest.yml | 23 +-
.../resources/TestC2DescribeCoreComponentState.yml | 46 +++
...deredMapPersistableKeyValueStoreServiceTest.yml | 23 +-
64 files changed, 2977 insertions(+), 690 deletions(-)
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 5283124..f394bd2 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -705,6 +705,9 @@ if (NOT SKIP_TESTS)
include(BuildTests)
endif()
+## Add KeyValueStorageService tests
+registerTest("${TEST_DIR}/keyvalue-tests")
+
include(BuildDocs)
include(DockerConfig)
diff --git a/conf/minifi.properties b/conf/minifi.properties
index 3fa2c19..9580c75 100644
--- a/conf/minifi.properties
+++ b/conf/minifi.properties
@@ -37,6 +37,17 @@ nifi.database.content.repository.directory.default=${MINIFI_HOME}/content_reposi
#nifi.rest.api.user.name=admin
#nifi.rest.api.password=password
+# State storage configuration #
+## The default state storage can be overridden by specifying a controller service instance
+## that implements CoreComponentStateManagementProvider
+## (e.g. an instance of RocksDbPersistableKeyValueStoreService or UnorderedMapPersistableKeyValueStoreService)
+#nifi.state.management.provider.local=
+## To make the default state storage persist every state change, set this to true
+## this comes at a performance penalty, but makes sure no state is lost even on unclean shutdowns
+#nifi.state.management.provider.local.always.persist=true
+## To change the frequency at which the default state storage is persisted, modify the following
+#nifi.state.management.provider.local.auto.persistence.interval=1 min
+
## Enabling C2 Uncomment each of the following options
## define those with missing options
#nifi.c2.enable=true
diff --git a/extensions/http-curl/tests/C2DescribeCoreComponentStateTest.cpp b/extensions/http-curl/tests/C2DescribeCoreComponentStateTest.cpp
new file mode 100644
index 0000000..b0e7d7d
--- /dev/null
+++ b/extensions/http-curl/tests/C2DescribeCoreComponentStateTest.cpp
@@ -0,0 +1,116 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#undef NDEBUG
+#include <string>
+#include "TestBase.h"
+#include "HTTPIntegrationBase.h"
+#include "HTTPHandlers.h"
+#include "processors/TailFile.h"
+#include "state/ProcessorController.h"
+#include "utils/file/FileUtils.h"
+
+class VerifyC2DescribeCoreComponentState : public VerifyC2Describe {
+ public:
+ explicit VerifyC2DescribeCoreComponentState(bool isSecure)
+ : VerifyC2Describe(isSecure) {
+ char format[] = "/var/tmp/ssth.XXXXXX";
+ temp_dir_ = testController.createTempDirectory(format);
+
+ test_file_1_ = utils::file::FileUtils::concat_path(temp_dir_, "test1.txt");
+ test_file_2_ = utils::file::FileUtils::concat_path(temp_dir_, "test2.txt");
+
+ std::ofstream f1(test_file_1_);
+ f1 << "foo";
+
+ std::ofstream f2(test_file_2_);
+ f2 << "foobar";
+ }
+
+ protected:
+ void updateProperties(std::shared_ptr<minifi::FlowController> flow_controller) override {
+ std::dynamic_pointer_cast<minifi::state::ProcessorController>(flow_controller->getComponents("TailFile1")[0])
+ ->getProcessor()->setProperty(minifi::processors::TailFile::FileName, test_file_1_);
+ std::dynamic_pointer_cast<minifi::state::ProcessorController>(flow_controller->getComponents("TailFile2")[0])
+ ->getProcessor()->setProperty(minifi::processors::TailFile::FileName, test_file_2_);
+ }
+
+ TestController testController;
+ std::string temp_dir_;
+ std::string test_file_1_;
+ std::string test_file_2_;
+};
+
+class DescribeCoreComponentStateHandler: public HeartbeatHandler {
+ public:
+
+ explicit DescribeCoreComponentStateHandler(bool isSecure)
+ : HeartbeatHandler(isSecure) {
+ }
+
+ virtual void handleHeartbeat(const rapidjson::Document&, struct mg_connection * conn) {
+ sendHeartbeatResponse("DESCRIBE", "corecomponentstate", "889345", conn);
+ }
+
+ virtual void handleAcknowledge(const rapidjson::Document& root) {
+ assert(root.HasMember("corecomponentstate"));
+
+ auto assertExpectedTailFileState = [&](const char* uuid, const char* name, const char* position) {
+ assert(root["corecomponentstate"].HasMember(uuid));
+ const auto& tf = root["corecomponentstate"][uuid];
+ assert(tf.HasMember("file.0.name"));
+ assert(std::string(tf["file.0.name"].GetString()) == name);
+ assert(tf.HasMember("file.0.position"));
+ assert(std::string(tf["file.0.position"].GetString()) == position);
+ assert(tf.HasMember("file.0.current"));
+ assert(strlen(tf["file.0.current"].GetString()) > 0U);
+ };
+
+ assertExpectedTailFileState("2438e3c8-015a-1000-79ca-83af40ec1993", "test1.txt", "3");
+ assertExpectedTailFileState("2438e3c8-015a-1000-79ca-83af40ec1994", "test2.txt", "6");
+ }
+};
+
+int main(int argc, char **argv) {
+ std::string key_dir, test_file_location, url;
+ url = "http://localhost:0/api/heartbeat";
+ if (argc > 1) {
+ test_file_location = argv[1];
+ if (argc > 2) {
+ url = "https://localhost:0/api/heartbeat";
+ key_dir = argv[2];
+ }
+ }
+
+ bool isSecure = false;
+ if (url.find("https") != std::string::npos) {
+ isSecure = true;
+ }
+
+ VerifyC2DescribeCoreComponentState harness(isSecure);
+
+ harness.setKeyDir(key_dir);
+
+ DescribeCoreComponentStateHandler responder(isSecure);
+
+ harness.setUrl(url, &responder);
+
+ harness.run(test_file_location);
+
+ return 0;
+}
diff --git a/extensions/http-curl/tests/CMakeLists.txt b/extensions/http-curl/tests/CMakeLists.txt
index ff848b5..f01c4f9 100644
--- a/extensions/http-curl/tests/CMakeLists.txt
+++ b/extensions/http-curl/tests/CMakeLists.txt
@@ -73,6 +73,7 @@ add_test(NAME HttpGetIntegrationTest COMMAND HttpGetIntegrationTest "${TEST_RESO
add_test(NAME C2UpdateTest COMMAND C2UpdateTest "${TEST_RESOURCES}/TestHTTPGet.yml" "${TEST_RESOURCES}/")
add_test(NAME C2JstackTest COMMAND C2JstackTest "${TEST_RESOURCES}/TestHTTPGet.yml" "${TEST_RESOURCES}/")
add_test(NAME C2DescribeManifestTest COMMAND C2DescribeManifestTest "${TEST_RESOURCES}/TestHTTPGet.yml" "${TEST_RESOURCES}/")
+add_test(NAME C2DescribeCoreComponentStateTest COMMAND C2DescribeCoreComponentStateTest "${TEST_RESOURCES}/TestC2DescribeCoreComponentState.yml" "${TEST_RESOURCES}/")
add_test(NAME C2UpdateAgentTest COMMAND C2UpdateAgentTest "${TEST_RESOURCES}/TestHTTPGet.yml" "${TEST_RESOURCES}/")
add_test(NAME C2FailedUpdateTest COMMAND C2FailedUpdateTest "${TEST_RESOURCES}/TestHTTPGet.yml" "${TEST_RESOURCES}/TestBad.yml" "${TEST_RESOURCES}/")
add_test(NAME C2NullConfiguration COMMAND C2NullConfiguration "${TEST_RESOURCES}/TestNull.yml" "${TEST_RESOURCES}/")
diff --git a/extensions/rocksdb-repos/CMakeLists.txt b/extensions/rocksdb-repos/CMakeLists.txt
index 3c5395e..cd6b9ab 100644
--- a/extensions/rocksdb-repos/CMakeLists.txt
+++ b/extensions/rocksdb-repos/CMakeLists.txt
@@ -19,7 +19,7 @@
include(${CMAKE_SOURCE_DIR}/extensions/ExtensionHeader.txt)
-file(GLOB SOURCES "*.cpp")
+file(GLOB SOURCES "*.cpp" "controllers/*.cpp")
add_library(minifi-rocksdb-repos STATIC ${SOURCES})
set_property(TARGET minifi-rocksdb-repos PROPERTY POSITION_INDEPENDENT_CODE ON)
diff --git a/extensions/rocksdb-repos/RocksDBLoader.h b/extensions/rocksdb-repos/RocksDBLoader.h
index 4d3f30f..5ed957f 100644
--- a/extensions/rocksdb-repos/RocksDBLoader.h
+++ b/extensions/rocksdb-repos/RocksDBLoader.h
@@ -22,6 +22,7 @@
#include "FlowFileRepository.h"
#include "ProvenanceRepository.h"
#include "RocksDbStream.h"
+#include "controllers/RocksDbPersistableKeyValueStoreService.h"
#include "core/ClassLoader.h"
class RocksDBFactory : public core::ObjectFactory {
@@ -50,9 +51,11 @@ class RocksDBFactory : public core::ObjectFactory {
class_names.push_back("DatabaseContentRepository");
class_names.push_back("FlowFileRepository");
class_names.push_back("ProvenanceRepository");
+ class_names.push_back("RocksDbPersistableKeyValueStoreService");
class_names.push_back("databasecontentrepository");
class_names.push_back("flowfilerepository");
class_names.push_back("provenancerepository");
+ class_names.push_back("rocksdbpersistablekeyvaluestoreservice");
return class_names;
}
@@ -65,6 +68,8 @@ class RocksDBFactory : public core::ObjectFactory {
return std::unique_ptr<ObjectFactory>(new core::DefautObjectFactory<core::repository::FlowFileRepository>());
} else if (name == "provenancerepository") {
return std::unique_ptr<ObjectFactory>(new core::DefautObjectFactory<minifi::provenance::ProvenanceRepository>());
+ } else if (name == "rocksdbpersistablekeyvaluestoreservice") {
+ return std::unique_ptr<ObjectFactory>(new core::DefautObjectFactory<minifi::controllers::RocksDbPersistableKeyValueStoreService>());
} else {
return nullptr;
}
diff --git a/extensions/rocksdb-repos/controllers/RocksDbPersistableKeyValueStoreService.cpp b/extensions/rocksdb-repos/controllers/RocksDbPersistableKeyValueStoreService.cpp
new file mode 100644
index 0000000..ef04230
--- /dev/null
+++ b/extensions/rocksdb-repos/controllers/RocksDbPersistableKeyValueStoreService.cpp
@@ -0,0 +1,193 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "RocksDbPersistableKeyValueStoreService.h"
+
+#include "utils/StringUtils.h"
+
+#include <fstream>
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace controllers {
+
+core::Property RocksDbPersistableKeyValueStoreService::Directory(
+ core::PropertyBuilder::createProperty("Directory")->withDescription("Path to a directory for the database")
+ ->isRequired(true)->build());
+
+RocksDbPersistableKeyValueStoreService::RocksDbPersistableKeyValueStoreService(const std::string& name, const std::string& id)
+ : controllers::KeyValueStoreService(name, id)
+ , controllers::AbstractAutoPersistingKeyValueStoreService(name, id)
+ , logger_(logging::LoggerFactory<RocksDbPersistableKeyValueStoreService>::getLogger()) {
+}
+
+RocksDbPersistableKeyValueStoreService::RocksDbPersistableKeyValueStoreService(const std::string& name, utils::Identifier uuid /*= utils::Identifier()*/)
+ : controllers::KeyValueStoreService(name, uuid)
+ , controllers::AbstractAutoPersistingKeyValueStoreService(name, uuid)
+ , logger_(logging::LoggerFactory<RocksDbPersistableKeyValueStoreService>::getLogger()) {
+}
+
+void RocksDbPersistableKeyValueStoreService::initialize() {
+ AbstractAutoPersistingKeyValueStoreService::initialize();
+ std::set<core::Property> supportedProperties;
+ supportedProperties.insert(Directory);
+ updateSupportedProperties(supportedProperties);
+}
+
+void RocksDbPersistableKeyValueStoreService::onEnable() {
+ if (configuration_ == nullptr) {
+ logger_->log_debug("Cannot enable RocksDbPersistableKeyValueStoreService");
+ return;
+ }
+
+ AbstractAutoPersistingKeyValueStoreService::onEnable();
+
+ if (!getProperty(Directory.getName(), directory_)) {
+ logger_->log_error("Invalid or missing property: Directory");
+ return;
+ }
+
+ db_.reset();
+ rocksdb::Options options;
+ options.create_if_missing = true;
+ options.use_direct_io_for_flush_and_compaction = true;
+ options.use_direct_reads = true;
+ // Use the same buffer settings as the FlowFileRepository
+ options.write_buffer_size = 8 << 20;
+ options.max_write_buffer_number = 20;
+ options.min_write_buffer_number_to_merge = 1;
+ if (!always_persist_) {
+ options.manual_wal_flush = true;
+ }
+ rocksdb::DB* db = nullptr;
+ rocksdb::Status status = rocksdb::DB::Open(options, directory_, &db);
+ if (status.ok()) {
+ db_.reset(db);
+ logger_->log_trace("Successfully opened RocksDB database at %s", directory_.c_str());
+ } else {
+ logger_->log_error("Failed to open RocksDB database at %s, error: %s", directory_.c_str(), status.getState());
+ return;
+ }
+
+ if (always_persist_) {
+ default_write_options.sync = true;
+ }
+
+ logger_->log_trace("Enabled RocksDbPersistableKeyValueStoreService");
+}
+
+void RocksDbPersistableKeyValueStoreService::notifyStop() {
+ AbstractAutoPersistingKeyValueStoreService::notifyStop();
+
+ db_.reset();
+}
+
+bool RocksDbPersistableKeyValueStoreService::set(const std::string& key, const std::string& value) {
+ if (!db_) {
+ return false;
+ }
+ rocksdb::Status status = db_->Put(default_write_options, key, value);
+ if (!status.ok()) {
+ logger_->log_error("Failed to Put key %s to RocksDB database at %s, error: %s", key.c_str(), directory_.c_str(), status.getState());
+ return false;
+ }
+ return true;
+}
+
+bool RocksDbPersistableKeyValueStoreService::get(const std::string& key, std::string& value) {
+ if (!db_) {
+ return false;
+ }
+ rocksdb::Status status = db_->Get(rocksdb::ReadOptions(), key, &value);
+ if (!status.ok()) {
+ logger_->log_error("Failed to Get key %s from RocksDB database at %s, error: %s", key.c_str(), directory_.c_str(), status.getState());
+ return false;
+ }
+ return true;
+}
+
+bool RocksDbPersistableKeyValueStoreService::get(std::unordered_map<std::string, std::string>& kvs) {
+ if (!db_) {
+ return false;
+ }
+ kvs.clear();
+ std::unique_ptr<rocksdb::Iterator> it = std::unique_ptr<rocksdb::Iterator>(db_->NewIterator(rocksdb::ReadOptions()));
+ for (it->SeekToFirst(); it->Valid(); it->Next()) {
+ kvs.emplace(it->key().ToString(), it->value().ToString());
+ }
+ if (!it->status().ok()) {
+ logger_->log_error("Encountered error when iterating through RocksDB database at %s, error: %s", directory_.c_str(), it->status().getState());
+ return false;
+ }
+ return true;
+}
+
+bool RocksDbPersistableKeyValueStoreService::remove(const std::string& key) {
+ if (!db_) {
+ return false;
+ }
+ rocksdb::Status status = db_->Delete(default_write_options, key);
+ if (!status.ok()) {
+ logger_->log_error("Failed to Delete from RocksDB database at %s, error: %s", directory_.c_str(), status.getState());
+ return false;
+ }
+ return true;
+}
+
+bool RocksDbPersistableKeyValueStoreService::clear() {
+ if (!db_) {
+ return false;
+ }
+ std::unique_ptr<rocksdb::Iterator> it = std::unique_ptr<rocksdb::Iterator>(db_->NewIterator(rocksdb::ReadOptions()));
+ for (it->SeekToFirst(); it->Valid(); it->Next()) {
+ rocksdb::Status status = db_->Delete(default_write_options, it->key());
+ if (!status.ok()) {
+ logger_->log_error("Failed to Delete from RocksDB database at %s, error: %s", directory_.c_str(), status.getState());
+ return false;
+ }
+ }
+ if (!it->status().ok()) {
+ logger_->log_error("Encountered error when iterating through RocksDB database at %s, error: %s", directory_.c_str(), it->status().getState());
+ return false;
+ }
+ return true;
+}
+
+bool RocksDbPersistableKeyValueStoreService::update(const std::string& key, const std::function<bool(bool /*exists*/, std::string& /*value*/)>& update_func) {
+ if (!db_) {
+ return false;
+ }
+ throw std::logic_error("Unsupported method");
+}
+
+bool RocksDbPersistableKeyValueStoreService::persist() {
+ if (!db_) {
+ return false;
+ }
+ if (always_persist_) {
+ return true;
+ }
+ return db_->FlushWAL(true /*sync*/).ok();
+}
+
+} /* namespace controllers */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
diff --git a/extensions/rocksdb-repos/controllers/RocksDbPersistableKeyValueStoreService.h b/extensions/rocksdb-repos/controllers/RocksDbPersistableKeyValueStoreService.h
new file mode 100644
index 0000000..65ea554
--- /dev/null
+++ b/extensions/rocksdb-repos/controllers/RocksDbPersistableKeyValueStoreService.h
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_CONTROLLERS_KEYVALUE_RocksDbPersistableKeyValueStoreService_H_
+#define LIBMINIFI_INCLUDE_CONTROLLERS_KEYVALUE_RocksDbPersistableKeyValueStoreService_H_
+
+#include "controllers/keyvalue/AbstractAutoPersistingKeyValueStoreService.h"
+#include "core/Core.h"
+#include "properties/Configure.h"
+#include "core/logging/Logger.h"
+#include "core/logging/LoggerConfiguration.h"
+
+#include "rocksdb/db.h"
+#include "rocksdb/options.h"
+#include "rocksdb/slice.h"
+
+#include <unordered_map>
+#include <string>
+#include <mutex>
+#include <memory>
+#include <utility>
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace controllers {
+
+class RocksDbPersistableKeyValueStoreService : public AbstractAutoPersistingKeyValueStoreService {
+ public:
+ explicit RocksDbPersistableKeyValueStoreService(const std::string& name, const std::string& id);
+ explicit RocksDbPersistableKeyValueStoreService(const std::string& name, utils::Identifier uuid = utils::Identifier());
+
+ virtual ~RocksDbPersistableKeyValueStoreService() = default;
+
+ static core::Property Directory;
+
+ virtual void initialize() override;
+ virtual void onEnable() override;
+ virtual void notifyStop() override;
+
+ virtual bool set(const std::string& key, const std::string& value) override;
+
+ virtual bool get(const std::string& key, std::string& value) override;
+
+ virtual bool get(std::unordered_map<std::string, std::string>& kvs) override;
+
+ virtual bool remove(const std::string& key) override;
+
+ virtual bool clear() override;
+
+ virtual bool update(const std::string& key, const std::function<bool(bool /*exists*/, std::string& /*value*/)>& update_func) override;
+
+ virtual bool persist() override;
+
+ protected:
+ std::string directory_;
+
+ std::unique_ptr<rocksdb::DB> db_;
+ rocksdb::WriteOptions default_write_options;
+
+ private:
+ std::shared_ptr<logging::Logger> logger_;
+};
+
+REGISTER_RESOURCE(RocksDbPersistableKeyValueStoreService, "A key-value service implemented by RocksDB");
+
+} /* namespace controllers */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif /* LIBMINIFI_INCLUDE_CONTROLLERS_KEYVALUE_RocksDbPersistableKeyValueStoreService_H_ */
diff --git a/extensions/sftp/processors/ListSFTP.cpp b/extensions/sftp/processors/ListSFTP.cpp
index 6a00d03..6ac89f2 100644
--- a/extensions/sftp/processors/ListSFTP.cpp
+++ b/extensions/sftp/processors/ListSFTP.cpp
@@ -125,10 +125,6 @@ core::Property ListSFTP::MinimumFileSize(
core::Property ListSFTP::MaximumFileSize(
core::PropertyBuilder::createProperty("Maximum File Size")->withDescription("The maximum size that a file must be in order to be pulled")
->isRequired(false)->build());
-core::Property ListSFTP::StateFile(
- core::PropertyBuilder::createProperty("State File")->withDescription("Specifies the file that should be used for storing state about"
- " what data has been ingested so that upon restart MiNiFi can resume from where it left off")
- ->isRequired(true)->withDefaultValue("ListSFTP")->build());
core::Relationship ListSFTP::Success("success", "All FlowFiles that are received are routed to success");
@@ -170,7 +166,6 @@ void ListSFTP::initialize() {
properties.insert(MaximumFileAge);
properties.insert(MinimumFileSize);
properties.insert(MaximumFileSize);
- properties.insert(StateFile);
setSupportedProperties(properties);
// Set the supported relationships
@@ -203,6 +198,11 @@ ListSFTP::~ListSFTP() {
void ListSFTP::onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) {
parseCommonPropertiesOnSchedule(context);
+ state_manager_ = context->getStateManager();
+ if (state_manager_ == nullptr) {
+ throw Exception(PROCESSOR_EXCEPTION, "Failed to get StateManager");
+ }
+
std::string value;
context->getProperty(ListingStrategy.getName(), listing_strategy_);
if (!last_listing_strategy_.empty() && last_listing_strategy_ != listing_strategy_) {
@@ -270,45 +270,6 @@ void ListSFTP::onSchedule(const std::shared_ptr<core::ProcessContext> &context,
logger_->log_error("Maximum File Size attribute is invalid");
}
}
- context->getProperty(StateFile.getName(), value);
- if (listing_strategy_ == LISTING_STRATEGY_TRACKING_TIMESTAMPS) {
- std::stringstream ss;
- ss << value << "." << getUUIDStr() << ".TrackingTimestamps";
- auto new_tracking_timestamps_state_filename = ss.str();
- if (new_tracking_timestamps_state_filename != tracking_timestamps_state_filename_) {
- if (!tracking_timestamps_state_filename_.empty()) {
- if (unlink(tracking_timestamps_state_filename_.c_str()) != 0) {
- logger_->log_error("Unable to delete old Tracking Timestamps state file \"%s\"",
- tracking_timestamps_state_filename_.c_str());
- }
- }
- }
- tracking_timestamps_state_filename_ = new_tracking_timestamps_state_filename;
- } else if (listing_strategy_ == LISTING_STRATEGY_TRACKING_ENTITIES) {
- std::stringstream ss;
- ss << value << "." << getUUIDStr() << ".TrackingEntities";
- auto new_tracking_entities_state_filename = ss.str();
- ss << ".json";
- auto new_tracking_entities_state_json_filename = ss.str();
- if (new_tracking_entities_state_filename != tracking_entities_state_filename_) {
- if (!tracking_entities_state_filename_.empty()) {
- if (unlink(tracking_entities_state_filename_.c_str()) != 0) {
- logger_->log_error("Unable to delete old Tracking Entities state file \"%s\"",
- tracking_entities_state_filename_.c_str());
- }
- }
- if (!tracking_entities_state_json_filename_.empty()) {
- if (unlink(tracking_entities_state_json_filename_.c_str()) != 0) {
- logger_->log_error("Unable to delete old Tracking Entities json state file \"%s\"",
- tracking_entities_state_json_filename_.c_str());
- }
- }
- }
- tracking_entities_state_filename_ = new_tracking_entities_state_filename;
- tracking_entities_state_json_filename_ = new_tracking_entities_state_json_filename;
- } else {
- logger_->log_error("Unknown Listing Strategy: \"%s\"", listing_strategy_.c_str());
- }
startKeepaliveThreadIfNeeded();
}
@@ -520,31 +481,24 @@ ListSFTP::ListedEntity::ListedEntity(uint64_t timestamp_, uint64_t size_)
, size(size_) {
}
-bool ListSFTP::persistTrackingTimestampsCache(const std::string& hostname, const std::string& username, const std::string& remote_path) {
- std::ofstream file(tracking_timestamps_state_filename_);
- if (!file.is_open()) {
- logger_->log_error("Failed to store state to Tracking Timestamps state file \"%s\"", tracking_timestamps_state_filename_.c_str());
- return false;
- }
- file << "hostname=" << hostname << "\n";
- file << "username=" << username << "\n";
- file << "remote_path=" << remote_path << "\n";
- file << "listing.timestamp=" << last_listed_latest_entry_timestamp_ << "\n";
- file << "processed.timestamp=" << last_processed_latest_entry_timestamp_ << "\n";
+bool ListSFTP::persistTrackingTimestampsCache(const std::shared_ptr<core::ProcessContext>& context, const std::string& hostname, const std::string& username, const std::string& remote_path) {
+ std::unordered_map<std::string, std::string> state;
+ state["listing_strategy"] = LISTING_STRATEGY_TRACKING_TIMESTAMPS;
+ state["hostname"] = hostname;
+ state["username"] = username;
+ state["remote_path"] = remote_path;
+ state["listing.timestamp"] = std::to_string(last_listed_latest_entry_timestamp_);
+ state["processed.timestamp"] = std::to_string(last_processed_latest_entry_timestamp_);
size_t i = 0;
for (const auto& identifier : latest_identifiers_processed_) {
- file << "id." << i << "=" << identifier << "\n";
+ state["id." + std::to_string(i)] = identifier;
++i;
}
- return true;
+ return state_manager_->set(state);
}
-bool ListSFTP::updateFromTrackingTimestampsCache(const std::string& hostname, const std::string& username, const std::string& remote_path) {
- std::ifstream file(tracking_timestamps_state_filename_);
- if (!file.is_open()) {
- logger_->log_error("Failed to open Tracking Timestamps state file \"%s\"", tracking_timestamps_state_filename_.c_str());
- return false;
- }
+bool ListSFTP::updateFromTrackingTimestampsCache(const std::shared_ptr<core::ProcessContext>& context, const std::string& hostname, const std::string& username, const std::string& remote_path) {
+ std::string state_listing_strategy;
std::string state_hostname;
std::string state_username;
std::string state_remote_path;
@@ -552,53 +506,68 @@ bool ListSFTP::updateFromTrackingTimestampsCache(const std::string& hostname, co
uint64_t state_processed_timestamp;
std::set<std::string> state_ids;
- std::string line;
- while (std::getline(file, line)) {
- size_t separator_pos = line.find('=');
- if (separator_pos == std::string::npos) {
- logger_->log_warn("None key-value line found in Tracking Timestamps state file \"%s\": \"%s\"", tracking_timestamps_state_filename_.c_str(), line.c_str());
- }
- std::string key = line.substr(0, separator_pos);
- std::string value = line.substr(separator_pos + 1);
- if (key == "hostname") {
- state_hostname = std::move(value);
- } else if (key == "username") {
- state_username = std::move(value);
- } else if (key == "remote_path") {
- state_remote_path = std::move(value);
- } else if (key == "listing.timestamp") {
- try {
- state_listing_timestamp = stoull(value);
- } catch (...) {
- logger_->log_error("listing.timestamp is not an uint64 in Tracking Timestamps state file \"%s\"", tracking_timestamps_state_filename_.c_str());
- return false;
- }
- } else if (key == "processed.timestamp") {
- try {
- state_processed_timestamp = stoull(value);
- } catch (...) {
- logger_->log_error("processed.timestamp is not an uint64 in Tracking Timestamps state file \"%s\"", tracking_timestamps_state_filename_.c_str());
- return false;
- }
- } else if (key.compare(0, strlen("id."), "id.") == 0) {
- state_ids.emplace(std::move(value));
- } else {
- logger_->log_warn("Unknown key found in Tracking Timestamps state file \"%s\": \"%s\"", tracking_timestamps_state_filename_.c_str(), key.c_str());
+ std::unordered_map<std::string, std::string> state_map;
+ if (!state_manager_->get(state_map)) {
+ logger_->log_info("Found no stored state");
+ return false;
+ }
+
+ try {
+ state_listing_strategy = state_map.at("listing_strategy");
+ } catch (...) {
+ logger_->log_error("listing_strategy is missing from state");
+ return false;
+ }
+ try {
+ state_hostname = state_map.at("hostname");
+ } catch (...) {
+ logger_->log_error("hostname is missing from state");
+ return false;
+ }
+ try {
+ state_username = state_map.at("username");
+ } catch (...) {
+ logger_->log_error("username is missing from state");
+ return false;
+ }
+ try {
+ state_remote_path = state_map.at("remote_path");
+ } catch (...) {
+ logger_->log_error("remote_path is missing from state");
+ return false;
+ }
+ try {
+ state_listing_timestamp = stoull(state_map.at("listing.timestamp"));
+ } catch (...) {
+ logger_->log_error("listing.timestamp is missing from state or is invalid");
+ return false;
+ }
+ try {
+ state_processed_timestamp = stoull(state_map.at("processed.timestamp"));
+ } catch (...) {
+ logger_->log_error("processed.timestamp is missing from state or is invalid");
+ return false;
+ }
+ for (const auto &kv : state_map) {
+ if (kv.first.compare(0, strlen("id."), "id.") == 0) {
+ state_ids.emplace(kv.second);
}
}
- file.close();
- if (state_hostname != hostname ||
+ if (state_listing_strategy != listing_strategy_ ||
+ state_hostname != hostname ||
state_username != username ||
state_remote_path != remote_path) {
- logger_->log_error("Tracking Timestamps state file \"%s\" was created with different settings than the current ones, ignoring. "
- "Hostname: \"%s\" vs. \"%s\", "
- "Username: \"%s\" vs. \"%s\", "
- "Remote Path: \"%s\" vs. \"%s\"",
- tracking_timestamps_state_filename_.c_str(),
- state_hostname, hostname,
- state_username, username,
- state_remote_path, remote_path);
+ logger_->log_error(
+ "Processor state was persisted with different settings than the current ones, ignoring. "
+ "Listing Strategy: \"%s\" vs. \"%s\", "
+ "Hostname: \"%s\" vs. \"%s\", "
+ "Username: \"%s\" vs. \"%s\", "
+ "Remote Path: \"%s\" vs. \"%s\"",
+ state_listing_strategy, listing_strategy_,
+ state_hostname, hostname,
+ state_username, username,
+ state_remote_path, remote_path);
return false;
}
@@ -620,11 +589,11 @@ void ListSFTP::listByTrackingTimestamps(
uint64_t min_timestamp_to_list = last_listed_latest_entry_timestamp_;
/* Load state from cache file if needed */
- if (!already_loaded_from_cache_ && !tracking_timestamps_state_filename_.empty()) {
- if (updateFromTrackingTimestampsCache(hostname, username, remote_path)) {
- logger_->log_debug("Successfully loaded Tracking Timestamps state file \"%s\"", tracking_timestamps_state_filename_.c_str());
+ if (!already_loaded_from_cache_) {
+ if (updateFromTrackingTimestampsCache(context, hostname, username, remote_path)) {
+ logger_->log_debug("Successfully loaded state");
} else {
- logger_->log_debug("Failed to load Tracking Timestamps state file \"%s\"", tracking_timestamps_state_filename_.c_str());
+ logger_->log_debug("Failed to load state");
}
already_loaded_from_cache_ = true;
}
@@ -762,9 +731,7 @@ void ListSFTP::listByTrackingTimestamps(
if (latest_listed_entry_timestamp_this_cycle != last_listed_latest_entry_timestamp_ || processed_new_files) {
last_listed_latest_entry_timestamp_ = latest_listed_entry_timestamp_this_cycle;
- if (!tracking_timestamps_state_filename_.empty()) {
- persistTrackingTimestampsCache(hostname, username, remote_path);
- }
+ persistTrackingTimestampsCache(context, hostname, username, remote_path);
}
} else {
logger_->log_debug("There are no files to list. Yielding.");
@@ -773,140 +740,100 @@ void ListSFTP::listByTrackingTimestamps(
}
}
-bool ListSFTP::persistTrackingEntitiesCache(const std::string& hostname, const std::string& username, const std::string& remote_path) {
- std::ofstream file(tracking_entities_state_filename_);
- if (!file.is_open()) {
- logger_->log_error("Failed to store Tracking Entities state to state file \"%s\"", tracking_entities_state_filename_.c_str());
- return false;
- }
- file << "hostname=" << hostname << "\n";
- file << "username=" << username << "\n";
- file << "remote_path=" << remote_path << "\n";
- file << "json_state_file=" << tracking_entities_state_json_filename_ << "\n";
- file.close();
-
- std::ofstream json_file(tracking_entities_state_json_filename_);
- if (!json_file.is_open()) {
- logger_->log_error("Failed to store Tracking Entities state to state json file \"%s\"", tracking_entities_state_json_filename_.c_str());
- return false;
- }
-
- rapidjson::Document entities(rapidjson::kObjectType);
- rapidjson::Document::AllocatorType& alloc = entities.GetAllocator();
- for (const auto& already_listed_entity : already_listed_entities_) {
- rapidjson::Value entity(rapidjson::kObjectType);
- entity.AddMember("timestamp", already_listed_entity.second.timestamp, alloc);
- entity.AddMember("size", already_listed_entity.second.size, alloc);
- entities.AddMember(rapidjson::Value(already_listed_entity.first.c_str(), alloc), std::move(entity), alloc);
+bool ListSFTP::persistTrackingEntitiesCache(const std::shared_ptr<core::ProcessContext>& context, const std::string& hostname, const std::string& username, const std::string& remote_path) {
+ std::unordered_map<std::string, std::string> state;
+ state["listing_strategy"] = listing_strategy_;
+ state["hostname"] = hostname;
+ state["username"] = username;
+ state["remote_path"] = remote_path;
+ size_t i = 0;
+ for (const auto &already_listed_entity : already_listed_entities_) {
+ state["entity." + std::to_string(i) + ".name"] = already_listed_entity.first;
+ state["entity." + std::to_string(i) + ".timestamp"] = std::to_string(already_listed_entity.second.timestamp);
+ state["entity." + std::to_string(i) + ".size"] = std::to_string(already_listed_entity.second.size);
+ ++i;
}
-
- rapidjson::OStreamWrapper osw(json_file);
- rapidjson::Writer<rapidjson::OStreamWrapper> writer(osw);
- entities.Accept(writer);
-
- return true;
+ return state_manager_->set(state);
}
-bool ListSFTP::updateFromTrackingEntitiesCache(const std::string& hostname, const std::string& username, const std::string& remote_path) {
- std::ifstream file(tracking_entities_state_filename_);
- if (!file.is_open()) {
- logger_->log_error("Failed to open Tracking Entities state file \"%s\"", tracking_entities_state_filename_.c_str());
- return false;
- }
+bool ListSFTP::updateFromTrackingEntitiesCache(const std::shared_ptr<core::ProcessContext>& context, const std::string& hostname, const std::string& username, const std::string& remote_path) {
+ std::string state_listing_strategy;
std::string state_hostname;
std::string state_username;
std::string state_remote_path;
- std::string state_json_state_file;
+ std::unordered_map<std::string, ListedEntity> new_already_listed_entities;
- std::string line;
- while (std::getline(file, line)) {
- size_t separator_pos = line.find('=');
- if (separator_pos == std::string::npos) {
- logger_->log_warn("None key-value line found in Tracking Entities state file \"%s\": \"%s\"", tracking_entities_state_filename_.c_str(), line.c_str());
- continue;
- }
- std::string key = line.substr(0, separator_pos);
- std::string value = line.substr(separator_pos + 1);
- if (key == "hostname") {
- state_hostname = std::move(value);
- } else if (key == "username") {
- state_username = std::move(value);
- } else if (key == "remote_path") {
- state_remote_path = std::move(value);
- } else if (key == "json_state_file") {
- state_json_state_file = std::move(value);
- } else {
- logger_->log_warn("Unknown key found in Tracking Entities state file \"%s\": \"%s\"", tracking_entities_state_filename_.c_str(), key.c_str());
- }
+ std::unordered_map<std::string, std::string> state_map;
+ if (!state_manager_->get(state_map)) {
+ logger_->log_debug("Failed to get state from StateManager");
+ return false;
}
- file.close();
- if (state_hostname != hostname ||
- state_username != username ||
- state_remote_path != remote_path) {
- logger_->log_error("Tracking Entities state file \"%s\" was created with different settings than the current ones, ignoring. "
- "Hostname: \"%s\" vs. \"%s\", "
- "Username: \"%s\" vs. \"%s\", "
- "Remote Path: \"%s\" vs. \"%s\"",
- tracking_entities_state_filename_.c_str(),
- state_hostname, hostname,
- state_username, username,
- state_remote_path, remote_path);
+ try {
+ state_listing_strategy = state_map.at("listing_strategy");
+ } catch (...) {
+ logger_->log_error("listing_strategy is missing from state");
return false;
}
-
- if (state_json_state_file.empty()) {
- logger_->log_error("Could not found json state file path in Tracking Entities state file \"%s\"", tracking_entities_state_filename_.c_str());
+ try {
+ state_hostname = state_map.at("hostname");
+ } catch (...) {
+ logger_->log_error("hostname is missing from state");
return false;
}
-
- std::ifstream json_file(state_json_state_file);
- if (!json_file.is_open()) {
- logger_->log_error("Failed to open entities Tracking Entities state json file \"%s\"", state_json_state_file.c_str());
+ try {
+ state_username = state_map.at("username");
+ } catch (...) {
+ logger_->log_error("username is missing from state");
return false;
}
-
try {
- rapidjson::IStreamWrapper isw(json_file);
- rapidjson::Document d;
- rapidjson::ParseResult res = d.ParseStream(isw);
- if (!res) {
- logger_->log_error("Failed to parse Tracking Entities state json file \"%s\"", state_json_state_file.c_str());
- return false;
- }
- if (!d.IsObject()) {
- logger_->log_error("Tracking Entities state json file \"%s\" root is not an object", state_json_state_file.c_str());
- return false;
- }
+ state_remote_path = state_map.at("remote_path");
+ } catch (...) {
+ logger_->log_error("remote_path is missing from state");
+ return false;
+ }
- std::unordered_map<std::string, ListedEntity> new_already_listed_entities;
- for (const auto &already_listed_entity : d.GetObject()) {
- auto it = already_listed_entity.value.FindMember("timestamp");
- if (it == already_listed_entity.value.MemberEnd() || !it->value.IsUint64()) {
- logger_->log_error("Tracking Entities state json file \"%s\" timestamp missing or malformatted for entity \"%s\"",
- state_json_state_file.c_str(),
- already_listed_entity.name.GetString());
- continue;
- }
- uint64_t timestamp = it->value.GetUint64();
- it = already_listed_entity.value.FindMember("size");
- if (it == already_listed_entity.value.MemberEnd() || !it->value.IsUint64()) {
- logger_->log_error("Tracking Entities state json file \"%s\" size missing or malformatted for entity \"%s\"",
- state_json_state_file.c_str(),
- already_listed_entity.name.GetString());
- continue;
- }
- uint64_t size = it->value.GetUint64();
+ size_t i = 0;
+ while (true) {
+ std::string name;
+ try {
+ name = state_map.at("entity." + std::to_string(i) + ".name");
+ } catch (...) {
+ break;
+ }
+ try {
+ uint64_t timestamp = std::stoull(state_map.at("entity." + std::to_string(i) + ".timestamp"));
+ uint64_t size = std::stoull(state_map.at("entity." + std::to_string(i) + ".size"));
new_already_listed_entities.emplace(std::piecewise_construct,
- std::forward_as_tuple(already_listed_entity.name.GetString()),
+ std::forward_as_tuple(name),
std::forward_as_tuple(timestamp, size));
+ } catch (...) {
+ logger_->log_error("State for entity \"%s\" is missing or invalid, skipping", name);
+ continue;
}
- already_listed_entities_ = std::move(new_already_listed_entities);
- } catch (std::exception& e) {
- logger_->log_error("Exception while parsing Tracking Entities state json file \"%s\": %s", state_json_state_file.c_str(), e.what());
+ ++i;
+ }
+
+ if (state_listing_strategy != listing_strategy_ ||
+ state_hostname != hostname ||
+ state_username != username ||
+ state_remote_path != remote_path) {
+ logger_->log_error(
+ "Processor state was persisted with different settings than the current ones, ignoring. "
+ "Listing Strategy: \"%s\" vs. \"%s\", "
+ "Hostname: \"%s\" vs. \"%s\", "
+ "Username: \"%s\" vs. \"%s\", "
+ "Remote Path: \"%s\" vs. \"%s\"",
+ state_listing_strategy, listing_strategy_,
+ state_hostname, hostname,
+ state_username, username,
+ state_remote_path, remote_path);
return false;
}
+ already_listed_entities_ = std::move(new_already_listed_entities);
+
return true;
}
@@ -920,12 +847,12 @@ void ListSFTP::listByTrackingEntities(
uint64_t entity_tracking_time_window,
std::vector<Child>&& files) {
/* Load state from cache file if needed */
- if (!already_loaded_from_cache_ && !tracking_entities_state_filename_.empty()) {
- if (updateFromTrackingEntitiesCache(hostname, username, remote_path)) {
- logger_->log_debug("Successfully loaded Tracking Entities state file \"%s\"", tracking_entities_state_filename_.c_str());
+ if (!already_loaded_from_cache_) {
+ if (updateFromTrackingEntitiesCache(context, hostname, username, remote_path)) {
+ logger_->log_debug("Successfully loaded state");
initial_listing_complete_ = true;
} else {
- logger_->log_debug("Failed to load Tracking Entities state file \"%s\"", tracking_entities_state_filename_.c_str());
+ logger_->log_debug("Failed to load state");
}
already_loaded_from_cache_ = true;
}
@@ -1014,9 +941,7 @@ void ListSFTP::listByTrackingEntities(
initial_listing_complete_ = true;
- if (!tracking_entities_state_filename_.empty()) {
- persistTrackingEntitiesCache(hostname, username, remote_path);
- }
+ persistTrackingEntitiesCache(context, hostname, username, remote_path);
}
void ListSFTP::onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) {
diff --git a/extensions/sftp/processors/ListSFTP.h b/extensions/sftp/processors/ListSFTP.h
index 1efb754..e1601ed 100644
--- a/extensions/sftp/processors/ListSFTP.h
+++ b/extensions/sftp/processors/ListSFTP.h
@@ -35,6 +35,7 @@
#include "core/logging/LoggerConfiguration.h"
#include "utils/Id.h"
#include "utils/RegexUtils.h"
+#include "controllers/keyvalue/PersistableKeyValueStoreService.h"
#include "../client/SFTPClient.h"
namespace org {
@@ -81,7 +82,6 @@ class ListSFTP : public SFTPProcessorBase {
static core::Property MaximumFileAge;
static core::Property MinimumFileSize;
static core::Property MaximumFileSize;
- static core::Property StateFile;
// Supported Relationships
static core::Relationship Success;
@@ -104,6 +104,7 @@ class ListSFTP : public SFTPProcessorBase {
private:
+ std::shared_ptr<core::CoreComponentStateManager> state_manager_;
std::string listing_strategy_;
bool search_recursively_;
bool follow_symlink_;
@@ -139,15 +140,12 @@ class ListSFTP : public SFTPProcessorBase {
bool already_loaded_from_cache_;
- std::string tracking_timestamps_state_filename_;
std::chrono::time_point<std::chrono::steady_clock> last_run_time_;
uint64_t last_listed_latest_entry_timestamp_;
uint64_t last_processed_latest_entry_timestamp_;
std::set<std::string> latest_identifiers_processed_;
bool initial_listing_complete_;
- std::string tracking_entities_state_filename_;
- std::string tracking_entities_state_json_filename_;
struct ListedEntity {
uint64_t timestamp;
uint64_t size;
@@ -170,11 +168,11 @@ class ListSFTP : public SFTPProcessorBase {
const std::string& username,
const Child& child);
- bool persistTrackingTimestampsCache(const std::string& hostname, const std::string& username, const std::string& remote_path);
- bool updateFromTrackingTimestampsCache(const std::string& hostname, const std::string& username, const std::string& remote_path);
+ bool persistTrackingTimestampsCache(const std::shared_ptr<core::ProcessContext>& context, const std::string& hostname, const std::string& username, const std::string& remote_path);
+ bool updateFromTrackingTimestampsCache(const std::shared_ptr<core::ProcessContext>& context, const std::string& hostname, const std::string& username, const std::string& remote_path);
- bool persistTrackingEntitiesCache(const std::string& hostname, const std::string& username, const std::string& remote_path);
- bool updateFromTrackingEntitiesCache(const std::string& hostname, const std::string& username, const std::string& remote_path);
+ bool persistTrackingEntitiesCache(const std::shared_ptr<core::ProcessContext>& context, const std::string& hostname, const std::string& username, const std::string& remote_path);
+ bool updateFromTrackingEntitiesCache(const std::shared_ptr<core::ProcessContext>& context, const std::string& hostname, const std::string& username, const std::string& remote_path);
void listByTrackingTimestamps(
const std::shared_ptr<core::ProcessContext>& context,
diff --git a/extensions/sftp/tests/CMakeLists.txt b/extensions/sftp/tests/CMakeLists.txt
index e12970c..e9936d3 100644
--- a/extensions/sftp/tests/CMakeLists.txt
+++ b/extensions/sftp/tests/CMakeLists.txt
@@ -46,6 +46,9 @@ if (NOT SKIP_TESTS AND Java_FOUND AND Maven_FOUND AND NOT DISABLE_EXPRESSION_LAN
target_wholearchive_library(${testfilename} minifi-sftp)
target_wholearchive_library(${testfilename} minifi-expression-language-extensions)
target_wholearchive_library(${testfilename} minifi-standard-processors)
+ if (NOT DISABLE_ROCKSDB)
+ target_wholearchive_library(${testfilename} minifi-rocksdb-repos)
+ endif()
MATH(EXPR SFTP-EXTENSIONS_TEST_COUNT "${SFTP-EXTENSIONS_TEST_COUNT}+1")
add_test(NAME "${testfilename}" COMMAND "${testfilename}" WORKING_DIRECTORY ${TEST_DIR})
diff --git a/extensions/sftp/tests/FetchSFTPTests.cpp b/extensions/sftp/tests/FetchSFTPTests.cpp
index d7e95c7..89fea7c 100644
--- a/extensions/sftp/tests/FetchSFTPTests.cpp
+++ b/extensions/sftp/tests/FetchSFTPTests.cpp
@@ -59,8 +59,8 @@
class FetchSFTPTestsFixture {
public:
FetchSFTPTestsFixture()
- : src_dir(strdup("/tmp/sftps.XXXXXX"))
- , dst_dir(strdup("/tmp/sftpd.XXXXXX")) {
+ : src_dir(strdup("/var/tmp/sftps.XXXXXX"))
+ , dst_dir(strdup("/var/tmp/sftpd.XXXXXX")) {
LogTestController::getInstance().setTrace<TestPlan>();
LogTestController::getInstance().setDebug<minifi::FlowController>();
LogTestController::getInstance().setDebug<minifi::SchedulingAgent>();
diff --git a/extensions/sftp/tests/ListSFTPTests.cpp b/extensions/sftp/tests/ListSFTPTests.cpp
index f82b010..74e06cb 100644
--- a/extensions/sftp/tests/ListSFTPTests.cpp
+++ b/extensions/sftp/tests/ListSFTPTests.cpp
@@ -58,7 +58,7 @@
class ListSFTPTestsFixture {
public:
ListSFTPTestsFixture()
- : src_dir(strdup("/tmp/sftps.XXXXXX")) {
+ : src_dir(strdup("/var/tmp/sftps.XXXXXX")) {
LogTestController::getInstance().setTrace<TestPlan>();
LogTestController::getInstance().setDebug<minifi::FlowController>();
LogTestController::getInstance().setDebug<minifi::SchedulingAgent>();
@@ -89,7 +89,13 @@ class ListSFTPTestsFixture {
}
void createPlan(utils::Identifier* list_sftp_uuid = nullptr) {
- plan = testController.createPlan();
+ const std::string state_dir = plan == nullptr ? "" : plan->getStateDir();
+
+ log_attribute.reset();
+ list_sftp.reset();
+ plan.reset();
+
+ plan = testController.createPlan(nullptr /*config*/, state_dir.empty() ? nullptr : state_dir.c_str());
if (list_sftp_uuid == nullptr) {
list_sftp = plan->addProcessor(
"ListSFTP",
@@ -124,7 +130,6 @@ class ListSFTPTestsFixture {
plan->setProperty(list_sftp, "Minimum File Size", "0 B");
plan->setProperty(list_sftp, "Target System Timestamp Precision", "Seconds");
plan->setProperty(list_sftp, "Remote Path", "nifi_test/");
- plan->setProperty(list_sftp, "State File", std::string(src_dir) + "/state");
// Configure LogAttribute processor
plan->setProperty(log_attribute, "FlowFiles To Log", "0");
@@ -546,6 +551,16 @@ TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP Tracking Timestamps restore sta
testController.runSession(plan, true);
+ std::unordered_map<std::string, std::string> state;
+ REQUIRE(plan->getStateManagerProvider()->getCoreComponentStateManager(*list_sftp)->get(state));
+ REQUIRE("localhost" == state.at("hostname"));
+ REQUIRE("nifiuser" == state.at("username"));
+ REQUIRE("nifi_test" == state.at("remote_path"));
+ REQUIRE("Tracking Timestamps" == state.at("listing_strategy"));
+ REQUIRE("nifi_test/file1.ext" == state.at("id.0"));
+ REQUIRE(!state.at("listing.timestamp").empty());
+ REQUIRE(!state.at("processed.timestamp").empty());
+
REQUIRE(LogTestController::getInstance().contains("from ListSFTP to relationship success"));
REQUIRE(LogTestController::getInstance().contains("key:filename value:file1.ext"));
@@ -559,9 +574,19 @@ TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP Tracking Timestamps restore sta
testController.runSession(plan, true);
- REQUIRE(LogTestController::getInstance().contains("Successfully loaded Tracking Timestamps state file"));
+ REQUIRE(plan->getStateManagerProvider()->getCoreComponentStateManager(*list_sftp)->get(state));
+ REQUIRE("localhost" == state.at("hostname"));
+ REQUIRE("nifiuser" == state.at("username"));
+ REQUIRE("nifi_test" == state.at("remote_path"));
+ REQUIRE("Tracking Timestamps" == state.at("listing_strategy"));
+ REQUIRE("nifi_test/file2.ext" == state.at("id.0"));
+ REQUIRE(!state.at("listing.timestamp").empty());
+ REQUIRE(!state.at("processed.timestamp").empty());
+
+ REQUIRE(LogTestController::getInstance().contains("Successfully loaded state"));
REQUIRE(LogTestController::getInstance().contains("from ListSFTP to relationship success"));
REQUIRE(LogTestController::getInstance().contains("key:filename value:file2.ext"));
+ REQUIRE(!LogTestController::getInstance().contains("key:filename value:file1.ext", std::chrono::seconds(0)));
}
TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP Tracking Timestamps restore state changed configuration", "[ListSFTP][tracking-timestamps]") {
@@ -574,6 +599,13 @@ TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP Tracking Timestamps restore sta
REQUIRE(LogTestController::getInstance().contains("from ListSFTP to relationship success"));
REQUIRE(LogTestController::getInstance().contains("key:filename value:file1.ext"));
+ std::unordered_map<std::string, std::string> state;
+ REQUIRE(plan->getStateManagerProvider()->getCoreComponentStateManager(*list_sftp)->get(state));
+ REQUIRE("localhost" == state.at("hostname"));
+ REQUIRE("nifiuser" == state.at("username"));
+ REQUIRE("nifi_test" == state.at("remote_path"));
+ REQUIRE("Tracking Timestamps" == state.at("listing_strategy"));
+
utils::Identifier list_sftp_uuid;
REQUIRE(true == list_sftp->getUUID(list_sftp_uuid));
createPlan(&list_sftp_uuid);
@@ -585,7 +617,14 @@ TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP Tracking Timestamps restore sta
testController.runSession(plan, true);
- REQUIRE(LogTestController::getInstance().contains("was created with different settings than the current ones, ignoring. "
+ REQUIRE(plan->getStateManagerProvider()->getCoreComponentStateManager(*list_sftp)->get(state));
+ REQUIRE("localhost" == state.at("hostname"));
+ REQUIRE("nifiuser" == state.at("username"));
+ REQUIRE("/nifi_test" == state.at("remote_path"));
+ REQUIRE("Tracking Timestamps" == state.at("listing_strategy"));
+
+ REQUIRE(LogTestController::getInstance().contains("Processor state was persisted with different settings than the current ones, ignoring. "
+ "Listing Strategy: \"Tracking Timestamps\" vs. \"Tracking Timestamps\", "
"Hostname: \"localhost\" vs. \"localhost\", "
"Username: \"nifiuser\" vs. \"nifiuser\", "
"Remote Path: \"nifi_test\" vs. \"/nifi_test\""));
@@ -612,10 +651,11 @@ TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP Tracking Timestamps changed con
testController.runSession(plan, true);
- REQUIRE(LogTestController::getInstance().contains("was created with different settings than the current ones, ignoring. "
- "Hostname: \"localhost\" vs. \"localhost\", "
- "Username: \"nifiuser\" vs. \"nifiuser\", "
- "Remote Path: \"nifi_test\" vs. \"/nifi_test\""));
+ REQUIRE(LogTestController::getInstance().contains("Processor state was persisted with different settings than the current ones, ignoring. "
+ "Listing Strategy: \"Tracking Timestamps\" vs. \"Tracking Timestamps\", "
+ "Hostname: \"localhost\" vs. \"localhost\", "
+ "Username: \"nifiuser\" vs. \"nifiuser\", "
+ "Remote Path: \"nifi_test\" vs. \"/nifi_test\""));
REQUIRE(LogTestController::getInstance().contains("from ListSFTP to relationship success"));
REQUIRE(LogTestController::getInstance().contains("key:filename value:file1.ext"));
REQUIRE(LogTestController::getInstance().contains("key:filename value:file2.ext"));
@@ -814,13 +854,41 @@ TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP Tracking Entities restore state
plan->setProperty(list_sftp, "Listing Strategy", "Tracking Entities");
LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output);
+ std::unordered_map<std::string, std::string> state;
+ REQUIRE(plan->getStateManagerProvider()->getCoreComponentStateManager(*list_sftp)->get(state));
+ REQUIRE("localhost" == state.at("hostname"));
+ REQUIRE("nifiuser" == state.at("username"));
+ REQUIRE("nifi_test" == state.at("remote_path"));
+ REQUIRE("Tracking Entities" == state.at("listing_strategy"));
+ REQUIRE("nifi_test/file1.ext" == state.at("entity.0.name"));
+ REQUIRE("14" == state.at("entity.0.size"));
+ REQUIRE(!state.at("entity.0.timestamp").empty());
+
createFileWithModificationTimeDiff("nifi_test/file2.ext", "Test content 2", -240 /* 4 minutes ago */);
testController.runSession(plan, true);
- REQUIRE(LogTestController::getInstance().contains("Successfully loaded Tracking Entities state file"));
+ REQUIRE(plan->getStateManagerProvider()->getCoreComponentStateManager(*list_sftp)->get(state));
+ REQUIRE("localhost" == state.at("hostname"));
+ REQUIRE("nifiuser" == state.at("username"));
+ REQUIRE("nifi_test" == state.at("remote_path"));
+ REQUIRE("Tracking Entities" == state.at("listing_strategy"));
+ if (state.at("entity.0.name") == "nifi_test/file1.ext") {
+ REQUIRE("nifi_test/file1.ext" == state.at("entity.0.name"));
+ REQUIRE("nifi_test/file2.ext" == state.at("entity.1.name"));
+ } else {
+ REQUIRE("nifi_test/file2.ext" == state.at("entity.0.name"));
+ REQUIRE("nifi_test/file1.ext" == state.at("entity.1.name"));
+ }
+ REQUIRE("14" == state.at("entity.0.size"));
+ REQUIRE(!state.at("entity.0.timestamp").empty());
+ REQUIRE("14" == state.at("entity.1.size"));
+ REQUIRE(!state.at("entity.1.timestamp").empty());
+
+ REQUIRE(LogTestController::getInstance().contains("Successfully loaded state"));
REQUIRE(LogTestController::getInstance().contains("from ListSFTP to relationship success"));
REQUIRE(LogTestController::getInstance().contains("key:filename value:file2.ext"));
+ REQUIRE(!LogTestController::getInstance().contains("key:filename value:file1.ext", std::chrono::seconds(0)));
}
TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP Tracking Entities restore state changed configuration", "[ListSFTP][tracking-entities]") {
@@ -833,6 +901,13 @@ TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP Tracking Entities restore state
REQUIRE(LogTestController::getInstance().contains("from ListSFTP to relationship success"));
REQUIRE(LogTestController::getInstance().contains("key:filename value:file1.ext"));
+ std::unordered_map<std::string, std::string> state;
+ REQUIRE(plan->getStateManagerProvider()->getCoreComponentStateManager(*list_sftp)->get(state));
+ REQUIRE("localhost" == state.at("hostname"));
+ REQUIRE("nifiuser" == state.at("username"));
+ REQUIRE("nifi_test" == state.at("remote_path"));
+ REQUIRE("Tracking Entities" == state.at("listing_strategy"));
+
utils::Identifier list_sftp_uuid;
REQUIRE(true == list_sftp->getUUID(list_sftp_uuid));
createPlan(&list_sftp_uuid);
@@ -844,10 +919,17 @@ TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP Tracking Entities restore state
testController.runSession(plan, true);
- REQUIRE(LogTestController::getInstance().contains("was created with different settings than the current ones, ignoring. "
- "Hostname: \"localhost\" vs. \"localhost\", "
- "Username: \"nifiuser\" vs. \"nifiuser\", "
- "Remote Path: \"nifi_test\" vs. \"/nifi_test\""));
+ REQUIRE(plan->getStateManagerProvider()->getCoreComponentStateManager(*list_sftp)->get(state));
+ REQUIRE("localhost" == state.at("hostname"));
+ REQUIRE("nifiuser" == state.at("username"));
+ REQUIRE("/nifi_test" == state.at("remote_path"));
+ REQUIRE("Tracking Entities" == state.at("listing_strategy"));
+
+ REQUIRE(LogTestController::getInstance().contains("Processor state was persisted with different settings than the current ones, ignoring. "
+ "Listing Strategy: \"Tracking Entities\" vs. \"Tracking Entities\", "
+ "Hostname: \"localhost\" vs. \"localhost\", "
+ "Username: \"nifiuser\" vs. \"nifiuser\", "
+ "Remote Path: \"nifi_test\" vs. \"/nifi_test\""));
REQUIRE(LogTestController::getInstance().contains("from ListSFTP to relationship success"));
REQUIRE(LogTestController::getInstance().contains("key:filename value:file1.ext"));
REQUIRE(LogTestController::getInstance().contains("key:filename value:file2.ext"));
@@ -871,10 +953,11 @@ TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP Tracking Entities changed confi
testController.runSession(plan, true);
- REQUIRE(LogTestController::getInstance().contains("was created with different settings than the current ones, ignoring. "
- "Hostname: \"localhost\" vs. \"localhost\", "
- "Username: \"nifiuser\" vs. \"nifiuser\", "
- "Remote Path: \"nifi_test\" vs. \"/nifi_test\""));
+ REQUIRE(LogTestController::getInstance().contains("Processor state was persisted with different settings than the current ones, ignoring. "
+ "Listing Strategy: \"Tracking Entities\" vs. \"Tracking Entities\", "
+ "Hostname: \"localhost\" vs. \"localhost\", "
+ "Username: \"nifiuser\" vs. \"nifiuser\", "
+ "Remote Path: \"nifi_test\" vs. \"/nifi_test\""));
REQUIRE(LogTestController::getInstance().contains("from ListSFTP to relationship success"));
REQUIRE(LogTestController::getInstance().contains("key:filename value:file1.ext"));
REQUIRE(LogTestController::getInstance().contains("key:filename value:file2.ext"));
diff --git a/extensions/sftp/tests/ListThenFetchSFTPTests.cpp b/extensions/sftp/tests/ListThenFetchSFTPTests.cpp
index 0282cb9..f5155ec 100644
--- a/extensions/sftp/tests/ListThenFetchSFTPTests.cpp
+++ b/extensions/sftp/tests/ListThenFetchSFTPTests.cpp
@@ -57,8 +57,8 @@
class ListThenFetchSFTPTestsFixture {
public:
ListThenFetchSFTPTestsFixture()
- : src_dir(strdup("/tmp/sftps.XXXXXX"))
- , dst_dir(strdup("/tmp/sftpd.XXXXXX")) {
+ : src_dir(strdup("/var/tmp/sftps.XXXXXX"))
+ , dst_dir(strdup("/var/tmp/sftpd.XXXXXX")) {
LogTestController::getInstance().setTrace<TestPlan>();
LogTestController::getInstance().setDebug<minifi::FlowController>();
LogTestController::getInstance().setDebug<minifi::SchedulingAgent>();
diff --git a/extensions/sftp/tests/PutSFTPTests.cpp b/extensions/sftp/tests/PutSFTPTests.cpp
index 4d13a11..3c8bb1f 100644
--- a/extensions/sftp/tests/PutSFTPTests.cpp
+++ b/extensions/sftp/tests/PutSFTPTests.cpp
@@ -60,8 +60,8 @@
class PutSFTPTestsFixture {
public:
PutSFTPTestsFixture()
- : src_dir(strdup("/tmp/sftps.XXXXXX"))
- , dst_dir(strdup("/tmp/sftpd.XXXXXX")) {
+ : src_dir(strdup("/var/tmp/sftps.XXXXXX"))
+ , dst_dir(strdup("/var/tmp/sftpd.XXXXXX")) {
LogTestController::getInstance().setTrace<TestPlan>();
LogTestController::getInstance().setDebug<minifi::FlowController>();
LogTestController::getInstance().setDebug<minifi::SchedulingAgent>();
@@ -736,7 +736,7 @@ TEST_CASE_METHOD(PutSFTPTestsFixture, "PutSFTP connection caching reaches limit"
std::vector<std::vector<char>> dst_dirs;
std::vector<std::unique_ptr<SFTPTestServer>> sftp_servers;
- std::string tmp_dir_format("/tmp/sftpd.XXXXXX");
+ std::string tmp_dir_format("/var/tmp/sftpd.XXXXXX");
for (size_t i = 0; i < 10; i++) {
dst_dirs.emplace_back(tmp_dir_format.data(), tmp_dir_format.data() + tmp_dir_format.size() + 1);
testController.createTempDirectory(dst_dirs.back().data());
diff --git a/extensions/sql/processors/ExecuteSQL.cpp b/extensions/sql/processors/ExecuteSQL.cpp
index 7738408..fe0df48 100644
--- a/extensions/sql/processors/ExecuteSQL.cpp
+++ b/extensions/sql/processors/ExecuteSQL.cpp
@@ -81,14 +81,14 @@ void ExecuteSQL::initialize() {
setSupportedRelationships( { s_success });
}
-void ExecuteSQL::processOnSchedule(const core::ProcessContext &context) {
+void ExecuteSQL::processOnSchedule(core::ProcessContext &context) {
initOutputFormat(context);
context.getProperty(s_sqlSelectQuery.getName(), sqlSelectQuery_);
context.getProperty(s_maxRowsPerFlowFile.getName(), max_rows_);
}
-void ExecuteSQL::processOnTrigger(core::ProcessSession &session) {
+void ExecuteSQL::processOnTrigger(core::ProcessSession& session) {
auto statement = connection_->prepareStatement(sqlSelectQuery_);
auto rowset = statement->execute();
diff --git a/extensions/sql/processors/ExecuteSQL.h b/extensions/sql/processors/ExecuteSQL.h
index 764f038..2a9773d 100644
--- a/extensions/sql/processors/ExecuteSQL.h
+++ b/extensions/sql/processors/ExecuteSQL.h
@@ -46,9 +46,9 @@ class ExecuteSQL: public SQLProcessor<ExecuteSQL>, public OutputFormat {
//! Processor Name
static const std::string ProcessorName;
- void processOnSchedule(const core::ProcessContext& context);
+ void processOnSchedule(core::ProcessContext& context);
void processOnTrigger(core::ProcessSession& session);
-
+
void initialize() override;
static const core::Property s_sqlSelectQuery;
diff --git a/extensions/sql/processors/PutSQL.cpp b/extensions/sql/processors/PutSQL.cpp
index 0512cc9..5296760 100644
--- a/extensions/sql/processors/PutSQL.cpp
+++ b/extensions/sql/processors/PutSQL.cpp
@@ -73,7 +73,7 @@ void PutSQL::initialize() {
setSupportedRelationships( { s_success });
}
-void PutSQL::processOnSchedule(const core::ProcessContext& context) {
+void PutSQL::processOnSchedule(core::ProcessContext& context) {
std::string sqlStatements;
context.getProperty(s_sqlStatements.getName(), sqlStatements);
sqlStatements_ = utils::StringUtils::split(sqlStatements, ";");
diff --git a/extensions/sql/processors/PutSQL.h b/extensions/sql/processors/PutSQL.h
index 284008a..6c5494d 100644
--- a/extensions/sql/processors/PutSQL.h
+++ b/extensions/sql/processors/PutSQL.h
@@ -45,7 +45,7 @@ class PutSQL: public SQLProcessor<PutSQL> {
//! Processor Name
static const std::string ProcessorName;
- void processOnSchedule(const core::ProcessContext &context);
+ void processOnSchedule(core::ProcessContext &context);
void processOnTrigger(core::ProcessSession &session);
void initialize() override;
diff --git a/extensions/sql/processors/QueryDatabaseTable.cpp b/extensions/sql/processors/QueryDatabaseTable.cpp
index a7d7033..7cf7a41 100644
--- a/extensions/sql/processors/QueryDatabaseTable.cpp
+++ b/extensions/sql/processors/QueryDatabaseTable.cpp
@@ -25,7 +25,7 @@
#include <map>
#include <set>
#include <sstream>
-#include <stdio.h>
+#include <cstdio>
#include <string>
#include <iostream>
#include <memory>
@@ -93,7 +93,7 @@ const core::Property QueryDatabaseTable::s_maxRowsPerFlowFile(
"If the value specified is zero, then all rows are returned in a single FlowFile.")->supportsExpressionLanguage(true)->build());
const core::Property QueryDatabaseTable::s_stateDirectory(
- core::PropertyBuilder::createProperty("State Directory")->isRequired(false)->withDefaultValue("QDTState")->withDescription("Directory which contains processor state data.")->build());
+ core::PropertyBuilder::createProperty("State Directory")->isRequired(false)->withDefaultValue("QDTState")->withDescription("DEPRECATED. Only use it for state migration from the state file, supplying the legacy state directory.")->build());
const std::string QueryDatabaseTable::s_initialMaxValueDynamicPropertyPrefix("initial.maxvalue.");
@@ -102,15 +102,18 @@ const core::Relationship QueryDatabaseTable::s_success("success", "Successfully
static const std::string ResultTableName = "tablename";
static const std::string ResultRowCount = "querydbtable.row.count";
+static const std::string TABLENAME_KEY = "tablename";
+static const std::string MAXVALUE_KEY_PREFIX = "maxvalue.";
+
// State
-class State {
+class LegacyState {
public:
- State(const std::string& tableName, const std::string& stateDir, const std::string& uuid, std::shared_ptr<logging::Logger> logger)
+ LegacyState(const std::string& tableName, const std::string& stateDir, const std::string& uuid, std::shared_ptr<logging::Logger> logger)
:tableName_(tableName), logger_(logger) {
- if (!createUUIDDir(stateDir, uuid, filePath_))
- return;
- filePath_ += "State.txt";
+ filePath_ = utils::file::FileUtils::concat_path(
+ utils::file::FileUtils::concat_path(
+ utils::file::FileUtils::concat_path(stateDir, "uuid"), uuid), "State.txt");
if (!getStateFromFile())
return;
@@ -118,79 +121,30 @@ class State {
ok_ = true;
}
- ~State() {}
-
explicit operator bool() const {
return ok_;
}
- std::unordered_map<std::string, std::string> mapState() const {
+ const std::unordered_map<std::string, std::string>& getStateMap() const {
return mapState_;
}
- void writeStateToFile(const std::unordered_map<std::string, std::string>& mapState) {
- file_.seekp(std::ios::beg);
-
- file_ << tableName_ << separator();
- auto dataSize = tableName_.size() + separator().size();
-
- for (const auto& el : mapState) {
- file_ << el.first << '=' << el.second << separator();
- dataSize += el.first.size() + 1 + el.second.size() + separator().size();
+ bool moveStateFileToMigrated() {
+ if (!ok_) {
+ return false;
}
-
- // If a maxValueColumnName type is varchar, a new max value 'dataSize' can be shorter than previous max value 'dataSize_' - clear difference with ' ' to keep file format.
- if (dataSize_ > dataSize) {
- for (auto i = dataSize_ - dataSize; i > 0; i--) {
- file_ << ' ';
- }
- }
- dataSize_ = dataSize;
-
- file_.flush();
-
- mapState_ = mapState;
+ return rename(filePath_.c_str(), (filePath_ + "-migrated").c_str()) == 0;
}
private:
- static const std::string& separator() {
- static const std::string s_separator = "@!qdt!@";
- return s_separator;
- }
-
- bool createUUIDDir(const std::string& stateDir, const std::string& uuid, std::string& dir)
- {
- if (stateDir.empty()) {
- dir.clear();
- return false;
- }
-
- const auto dirSeparator = utils::file::FileUtils::get_separator();
-
- auto dirWithSlash = stateDir;
- if (stateDir.back() != dirSeparator) {
- dirWithSlash += dirSeparator;
- }
-
- dir = dirWithSlash + "uuid" + dirSeparator + uuid + dirSeparator;
-
- utils::file::FileUtils::create_dir(dir);
-
- if (!utils::file::FileUtils::is_directory(dir.c_str())) {
- logger_->log_error("Cannot create %s", dir.c_str());
- dir.clear();
- return false;
- }
-
- return true;
- }
+ static const std::string separator_;
bool getStateFromFile() {
std::string state;
std::ifstream file(filePath_);
if (!file) {
- return createEmptyStateFile();
+ return false;
}
std::stringstream ss;
@@ -198,17 +152,15 @@ class State {
state = ss.str();
- dataSize_ = state.size();
-
file.close();
std::vector<std::string> listColumnNameValue;
- size_t pos = state.find(separator(), 0);
+ size_t pos = state.find(separator_, 0);
if (pos == std::string::npos) {
logger_->log_error("Invalid data in '%s' file.", filePath_.c_str());
mapState_.clear();
- return createEmptyStateFile();
+ return false;
}
auto tableName = state.substr(0, pos);
@@ -216,20 +168,20 @@ class State {
logger_->log_warn("tableName is changed - now: '%s', in State.txt: '%s'.", tableName_.c_str(), tableName.c_str());
mapState_.clear();
- return createEmptyStateFile();
+ return false;
}
- pos += separator().size();
+ pos += separator_.size();
while (true) {
- auto newPos = state.find(separator(), pos);
+ auto newPos = state.find(separator_, pos);
if (newPos == std::string::npos)
break;
const std::string& columnNameValue = state.substr(pos, newPos - pos);
listColumnNameValue.emplace_back(columnNameValue);
- pos = newPos + separator().size();
+ pos = newPos + separator_.size();
}
for (const auto& columnNameValue : listColumnNameValue) {
@@ -237,7 +189,7 @@ class State {
if (posEQ == std::string::npos) {
logger_->log_error("Invalid data in '%s' file.", filePath_.c_str());
mapState_.clear();
- return createEmptyStateFile();
+ return false;
}
const auto& name = columnNameValue.substr(0, posEQ);
@@ -246,25 +198,6 @@ class State {
mapState_.insert({ name, value });
}
- file_.open(filePath_);
- if (!file_.is_open()) {
- logger_->log_error("Cannot open %s", filePath_.c_str());
- mapState_.clear();
- return false;
- }
-
- return true;
- }
-
- bool createEmptyStateFile() {
- file_.open(filePath_, std::ios::out);
- if (!file_.is_open()) {
- logger_->log_error("Cannot open '%s' file", filePath_.c_str());
- return false;
- }
-
- dataSize_ = 0;
-
return true;
}
@@ -272,12 +205,11 @@ class State {
std::unordered_map<std::string, std::string> mapState_;
std::shared_ptr<logging::Logger> logger_;
std::string filePath_;
- std::fstream file_;
- size_t dataSize_{};
std::string tableName_;
bool ok_{};
};
+const std::string LegacyState::separator_ = "@!qdt!@";
// QueryDatabaseTable
QueryDatabaseTable::QueryDatabaseTable(const std::string& name, utils::Identifier uuid)
@@ -295,7 +227,7 @@ void QueryDatabaseTable::initialize() {
setSupportedRelationships( { s_success });
}
-void QueryDatabaseTable::processOnSchedule(const core::ProcessContext &context) {
+void QueryDatabaseTable::processOnSchedule(core::ProcessContext &context) {
initOutputFormat(context);
context.getProperty(s_tableName.getName(), tableName_);
@@ -308,19 +240,43 @@ void QueryDatabaseTable::processOnSchedule(const core::ProcessContext &context)
context.getProperty(s_sqlQuery.getName(), sqlQuery_);
context.getProperty(s_maxRowsPerFlowFile.getName(), maxRowsPerFlowFile_);
- std::string stateDir;
- context.getProperty(s_stateDirectory.getName(), stateDir);
- if (stateDir.empty()) {
- logger_->log_error("State Directory is empty");
- return;
+ mapState_.clear();
+
+ state_manager_ = context.getStateManager();
+ if (state_manager_ == nullptr) {
+ throw Exception(PROCESSOR_EXCEPTION, "Failed to get StateManager");
}
- pState_ = std::make_unique<State>(tableName_, stateDir, getUUIDStr(), logger_);
- if (!*pState_) {
- return;
+ std::unordered_map<std::string, std::string> state_map;
+ if (state_manager_->get(state_map)) {
+ if (state_map[TABLENAME_KEY] != tableName_) {
+ state_manager_->clear();
+ } else {
+ for (auto&& elem : state_map) {
+ if (elem.first.find(MAXVALUE_KEY_PREFIX) == 0) {
+ mapState_.emplace(elem.first.substr(MAXVALUE_KEY_PREFIX.length()), std::move(elem.second));
+ }
+ }
+ }
+ } else {
+ // Try to migrate legacy state file
+ std::string stateDir;
+ context.getProperty(s_stateDirectory.getName(), stateDir);
+ if (!stateDir.empty()) {
+ LegacyState legacyState(tableName_, stateDir, getUUIDStr(), logger_);
+ if (legacyState) {
+ mapState_ = legacyState.getStateMap();
+ if (saveState() && state_manager_->persist()) {
+ logger_->log_info("State migration successful");
+ legacyState.moveStateFileToMigrated();
+ } else {
+ logger_->log_warn("Failed to persists migrated state");
+ }
+ } else {
+ logger_->log_warn("Could not migrate state from specified State Directory %s", stateDir);
+ }
+ }
}
-
- mapState_ = pState_->mapState();
// If 'listMaxValueColumnName_' doesn't match columns in mapState_, then clear mapState_.
if (listMaxValueColumnName_.size() != mapState_.size()) {
@@ -366,7 +322,7 @@ void QueryDatabaseTable::processOnSchedule(const core::ProcessContext &context)
}
}
-void QueryDatabaseTable::processOnTrigger(core::ProcessSession &session) {
+void QueryDatabaseTable::processOnTrigger(core::ProcessSession& session) {
const auto& selectQuery = getSelectQuery();
logger_->log_info("QueryDatabaseTable: selectQuery: '%s'", selectQuery.c_str());
@@ -408,7 +364,7 @@ void QueryDatabaseTable::processOnTrigger(core::ProcessSession &session) {
throw;
}
- pState_->writeStateToFile(mapState_);
+ saveState();
}
}
@@ -463,6 +419,15 @@ std::string QueryDatabaseTable::getSelectQuery() {
return ret;
}
+bool QueryDatabaseTable::saveState() {
+ std::unordered_map<std::string, std::string> state_map;
+ state_map.emplace(TABLENAME_KEY, tableName_);
+ for (const auto& elem : mapState_) {
+ state_map.emplace(MAXVALUE_KEY_PREFIX + elem.first, elem.second);
+ }
+ return state_manager_->set(state_map);
+}
+
} /* namespace processors */
} /* namespace minifi */
diff --git a/extensions/sql/processors/QueryDatabaseTable.h b/extensions/sql/processors/QueryDatabaseTable.h
index 412faab..44f1ed7 100644
--- a/extensions/sql/processors/QueryDatabaseTable.h
+++ b/extensions/sql/processors/QueryDatabaseTable.h
@@ -65,15 +65,18 @@ class QueryDatabaseTable: public SQLProcessor<QueryDatabaseTable>, public Output
return true;
}
- void processOnSchedule(const core::ProcessContext& context);
+ void processOnSchedule(core::ProcessContext& context);
void processOnTrigger(core::ProcessSession& session);
-
+
void initialize() override;
private:
std::string getSelectQuery();
+ bool saveState();
+
private:
+ std::shared_ptr<core::CoreComponentStateManager> state_manager_;
std::string tableName_;
std::string columnNames_;
std::string maxValueColumnNames_;
@@ -83,7 +86,6 @@ class QueryDatabaseTable: public SQLProcessor<QueryDatabaseTable>, public Output
std::vector<std::string> listMaxValueColumnName_;
std::unordered_map<std::string, std::string> mapState_;
std::unordered_map<std::string, soci::data_type> mapColumnType_;
- std::unique_ptr<State> pState_;
};
REGISTER_RESOURCE(QueryDatabaseTable, "QueryDatabaseTable to execute SELECT statement via ODBC.");
diff --git a/extensions/standard-processors/CMakeLists.txt b/extensions/standard-processors/CMakeLists.txt
index d49a29f..f4ca71d 100644
--- a/extensions/standard-processors/CMakeLists.txt
+++ b/extensions/standard-processors/CMakeLists.txt
@@ -20,7 +20,7 @@
include(${CMAKE_SOURCE_DIR}/extensions/ExtensionHeader.txt)
-file(GLOB SOURCES "processors/*.cpp" )
+file(GLOB SOURCES "processors/*.cpp" "controllers/*.cpp" )
add_library(minifi-standard-processors STATIC ${SOURCES})
set_property(TARGET minifi-standard-processors PROPERTY POSITION_INDEPENDENT_CODE ON)
diff --git a/extensions/standard-processors/controllers/UnorderedMapKeyValueStoreService.cpp b/extensions/standard-processors/controllers/UnorderedMapKeyValueStoreService.cpp
new file mode 100644
index 0000000..bd654a3
--- /dev/null
+++ b/extensions/standard-processors/controllers/UnorderedMapKeyValueStoreService.cpp
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "UnorderedMapKeyValueStoreService.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace controllers {
+
+UnorderedMapKeyValueStoreService::UnorderedMapKeyValueStoreService(const std::string& name, const std::string& id)
+ : KeyValueStoreService(name, id)
+ , logger_(logging::LoggerFactory<UnorderedMapKeyValueStoreService>::getLogger()) {
+}
+
+UnorderedMapKeyValueStoreService::UnorderedMapKeyValueStoreService(const std::string& name, utils::Identifier uuid /*= utils::Identifier()*/)
+ : KeyValueStoreService(name, uuid)
+ , logger_(logging::LoggerFactory<UnorderedMapKeyValueStoreService>::getLogger()) {
+}
+
+UnorderedMapKeyValueStoreService::UnorderedMapKeyValueStoreService(const std::string& name, const std::shared_ptr<Configure> &configuration)
+ : KeyValueStoreService(name)
+ , logger_(logging::LoggerFactory<UnorderedMapKeyValueStoreService>::getLogger()) {
+ setConfiguration(configuration);
+ initialize();
+}
+
+UnorderedMapKeyValueStoreService::~UnorderedMapKeyValueStoreService() {
+}
+
+bool UnorderedMapKeyValueStoreService::set(const std::string& key, const std::string& value) {
+ std::lock_guard<std::recursive_mutex> lock(mutex_);
+ map_[key] = value;
+ return true;
+}
+
+bool UnorderedMapKeyValueStoreService::get(const std::string& key, std::string& value) {
+ std::lock_guard<std::recursive_mutex> lock(mutex_);
+ auto it = map_.find(key);
+ if (it == map_.end()) {
+ return false;
+ } else {
+ value = it->second;
+ return true;
+ }
+}
+
+bool UnorderedMapKeyValueStoreService::get(std::unordered_map<std::string, std::string>& kvs) {
+ std::lock_guard<std::recursive_mutex> lock(mutex_);
+ kvs = map_;
+ return true;
+}
+
+bool UnorderedMapKeyValueStoreService::remove(const std::string& key) {
+ std::lock_guard<std::recursive_mutex> lock(mutex_);
+ return map_.erase(key) == 1U;
+}
+
+bool UnorderedMapKeyValueStoreService::clear() {
+ std::lock_guard<std::recursive_mutex> lock(mutex_);
+ map_.clear();
+ return true;
+}
+
+bool UnorderedMapKeyValueStoreService::update(const std::string& key, const std::function<bool(bool /*exists*/, std::string& /*value*/)>& update_func) {
+ std::lock_guard<std::recursive_mutex> lock(mutex_);
+ bool exists = false;
+ std::string value;
+ auto it = map_.find(key);
+ if (it != map_.end()) {
+ exists = true;
+ value = it->second;
+ }
+ try {
+ if (!update_func(exists, value)) {
+ return false;
+ }
+ } catch (const std::exception& e) {
+ logger_->log_error("update_func failed with an exception: %s", e.what());
+ return false;
+ } catch (...) {
+ logger_->log_error("update_func failed with an exception");
+ return false;
+ }
+ if (!exists) {
+ it = map_.emplace(key, "").first;
+ }
+ it->second = std::move(value);
+ return true;
+}
+
+} /* namespace controllers */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
diff --git a/extensions/standard-processors/controllers/UnorderedMapKeyValueStoreService.h b/extensions/standard-processors/controllers/UnorderedMapKeyValueStoreService.h
new file mode 100644
index 0000000..e41e915
--- /dev/null
+++ b/extensions/standard-processors/controllers/UnorderedMapKeyValueStoreService.h
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __UNORDERED_MAP_KEY_VALUE_STORE_SERVICE_H__
+#define __UNORDERED_MAP_KEY_VALUE_STORE_SERVICE_H__
+
+#include "controllers/keyvalue/KeyValueStoreService.h"
+#include "core/Core.h"
+#include "properties/Configure.h"
+#include "core/logging/Logger.h"
+#include "core/logging/LoggerConfiguration.h"
+#include "core/Resource.h"
+
+#include <unordered_map>
+#include <string>
+#include <mutex>
+#include <memory>
+#include <utility>
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace controllers {
+
+class UnorderedMapKeyValueStoreService : virtual public KeyValueStoreService {
+ public:
+ explicit UnorderedMapKeyValueStoreService(const std::string& name, const std::string& id);
+ explicit UnorderedMapKeyValueStoreService(const std::string& name, utils::Identifier uuid = utils::Identifier());
+ explicit UnorderedMapKeyValueStoreService(const std::string& name, const std::shared_ptr<Configure>& configuration);
+
+ virtual ~UnorderedMapKeyValueStoreService();
+
+ virtual bool set(const std::string& key, const std::string& value) override;
+
+ virtual bool get(const std::string& key, std::string& value) override;
+
+ virtual bool get(std::unordered_map<std::string, std::string>& kvs) override;
+
+ virtual bool remove(const std::string& key) override;
+
+ virtual bool clear() override;
+
+ virtual bool update(const std::string& key, const std::function<bool(bool /*exists*/, std::string& /*value*/)>& update_func) override;
+
+ protected:
+ std::unordered_map<std::string, std::string> map_;
+ std::recursive_mutex mutex_;
+
+ private:
+ std::shared_ptr<logging::Logger> logger_;
+};
+
+REGISTER_RESOURCE(UnorderedMapKeyValueStoreService, "A key-value service implemented by a locked std::unordered_map<std::string, std::string>");
+
+} /* namespace controllers */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif /* __UNORDERED_MAP_KEY_VALUE_STORE_SERVICE_H__ */
diff --git a/extensions/standard-processors/controllers/UnorderedMapPersistableKeyValueStoreService.cpp b/extensions/standard-processors/controllers/UnorderedMapPersistableKeyValueStoreService.cpp
new file mode 100644
index 0000000..b41baf1
--- /dev/null
+++ b/extensions/standard-processors/controllers/UnorderedMapPersistableKeyValueStoreService.cpp
@@ -0,0 +1,260 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "UnorderedMapPersistableKeyValueStoreService.h"
+
+#include <fstream>
+
+#include "utils/file/FileUtils.h"
+#include "utils/StringUtils.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace controllers {
+
+constexpr int UnorderedMapPersistableKeyValueStoreService::FORMAT_VERSION;
+
+core::Property UnorderedMapPersistableKeyValueStoreService::File(
+ core::PropertyBuilder::createProperty("File")->withDescription("Path to a file to store state")
+ ->isRequired(true)->build());
+
+UnorderedMapPersistableKeyValueStoreService::UnorderedMapPersistableKeyValueStoreService(const std::string& name, const std::string& id)
+ : KeyValueStoreService(name, id)
+ , AbstractAutoPersistingKeyValueStoreService(name, id)
+ , UnorderedMapKeyValueStoreService(name, id)
+ , logger_(logging::LoggerFactory<UnorderedMapPersistableKeyValueStoreService>::getLogger()) {
+}
+
+UnorderedMapPersistableKeyValueStoreService::UnorderedMapPersistableKeyValueStoreService(const std::string& name, utils::Identifier uuid /*= utils::Identifier()*/)
+ : KeyValueStoreService(name, uuid)
+ , AbstractAutoPersistingKeyValueStoreService(name, uuid)
+ , UnorderedMapKeyValueStoreService(name, uuid)
+ , logger_(logging::LoggerFactory<UnorderedMapPersistableKeyValueStoreService>::getLogger()) {
+}
+
+UnorderedMapPersistableKeyValueStoreService::UnorderedMapPersistableKeyValueStoreService(const std::string& name, const std::shared_ptr<Configure> &configuration)
+ : KeyValueStoreService(name)
+ , AbstractAutoPersistingKeyValueStoreService(name)
+ , UnorderedMapKeyValueStoreService(name)
+ , logger_(logging::LoggerFactory<UnorderedMapPersistableKeyValueStoreService>::getLogger()) {
+ setConfiguration(configuration);
+ initialize();
+}
+
+UnorderedMapPersistableKeyValueStoreService::~UnorderedMapPersistableKeyValueStoreService() {
+ persist();
+}
+
+std::string UnorderedMapPersistableKeyValueStoreService::escape(const std::string& str) {
+ std::stringstream escaped;
+ for (const auto c : str) {
+ switch (c) {
+ case '\\':
+ escaped << "\\\\";
+ break;
+ case '\n':
+ escaped << "\\n";
+ break;
+ case '=':
+ escaped << "\\=";
+ break;
+ default:
+ escaped << c;
+ break;
+ }
+ }
+ return escaped.str();
+}
+
+bool UnorderedMapPersistableKeyValueStoreService::parseLine(const std::string& line, std::string& key, std::string& value) {
+ std::stringstream key_ss;
+ std::stringstream value_ss;
+ bool in_escape_sequence = false;
+ bool key_complete = false;
+ for (const auto c : line) {
+ auto& current = key_complete ? value_ss : key_ss;
+ if (in_escape_sequence) {
+ switch (c) {
+ case '\\':
+ current << '\\';
+ break;
+ case 'n':
+ current << '\n';
+ break;
+ case '=':
+ current << '=';
+ break;
+ default:
+ logger_->log_error("Invalid escape sequence in \"%s\": \"\\%c\"", line.c_str(), c);
+ return false;
+ }
+ in_escape_sequence = false;
+ } else {
+ if (c == '\\') {
+ in_escape_sequence = true;
+ } else if (c == '=') {
+ if (key_complete) {
+ logger_->log_error("Unterminated \'=\' in line \"%s\"", line.c_str());
+ return false;
+ } else {
+ key_complete = true;
+ }
+ } else {
+ current << c;
+ }
+ }
+ }
+ if (in_escape_sequence) {
+ logger_->log_error("Unterminated escape sequence in \"%s\"", line.c_str());
+ return false;
+ }
+ if (!key_complete) {
+ logger_->log_error("Key not found in \"%s\"", line.c_str());
+ return false;
+ }
+ key = key_ss.str();
+ if (key.empty()) {
+ logger_->log_error("Line with empty key found in \"%s\": \"%s\"", file_.c_str(), line.c_str());
+ return false;
+ }
+ value = value_ss.str();
+ return true;
+}
+
+void UnorderedMapPersistableKeyValueStoreService::initialize() {
+ AbstractAutoPersistingKeyValueStoreService::initialize();
+ std::set<core::Property> supportedProperties;
+ supportedProperties.insert(File);
+ updateSupportedProperties(supportedProperties);
+}
+
+void UnorderedMapPersistableKeyValueStoreService::onEnable() {
+ if (configuration_ == nullptr) {
+ logger_->log_debug("Cannot enable UnorderedMapPersistableKeyValueStoreService");
+ return;
+ }
+
+ if (!getProperty(File.getName(), file_)) {
+ logger_->log_error("Invalid or missing property: File");
+ return;
+ }
+
+ /* We must not start the persistence thread until we attempted to load the state */
+ load();
+
+ AbstractAutoPersistingKeyValueStoreService::onEnable();
+
+ logger_->log_trace("Enabled UnorderedMapPersistableKeyValueStoreService");
+}
+
+void UnorderedMapPersistableKeyValueStoreService::notifyStop() {
+ AbstractAutoPersistingKeyValueStoreService::notifyStop();
+ persist();
+}
+
+bool UnorderedMapPersistableKeyValueStoreService::set(const std::string& key, const std::string& value) {
+ std::lock_guard<std::recursive_mutex> lock(mutex_);
+ bool res = UnorderedMapKeyValueStoreService::set(key, value);
+ if (always_persist_ && res) {
+ return persist();
+ }
+ return res;
+}
+
+bool UnorderedMapPersistableKeyValueStoreService::remove(const std::string& key) {
+ std::lock_guard<std::recursive_mutex> lock(mutex_);
+ bool res = UnorderedMapKeyValueStoreService::remove(key);
+ if (always_persist_ && res) {
+ return persist();
+ }
+ return res;
+}
+
+bool UnorderedMapPersistableKeyValueStoreService::clear() {
+ std::lock_guard<std::recursive_mutex> lock(mutex_);
+ bool res = UnorderedMapKeyValueStoreService::clear();
+ if (always_persist_ && res) {
+ return persist();
+ }
+ return res;
+}
+
+bool UnorderedMapPersistableKeyValueStoreService::update(const std::string& key, const std::function<bool(bool /*exists*/, std::string& /*value*/)>& update_func) {
+ std::lock_guard<std::recursive_mutex> lock(mutex_);
+ bool res = UnorderedMapKeyValueStoreService::update(key, update_func);
+ if (always_persist_ && res) {
+ return persist();
+ }
+ return res;
+}
+
+bool UnorderedMapPersistableKeyValueStoreService::persist() {
+ std::lock_guard<std::recursive_mutex> lock(mutex_);
+ std::ofstream ofs(file_);
+ if (!ofs.is_open()) {
+ logger_->log_error("Failed to open file \"%s\" to store state", file_.c_str());
+ return false;
+ }
+ ofs << escape(FORMAT_VERSION_KEY) << "=" << escape(std::to_string(FORMAT_VERSION)) << "\n";
+ for (const auto& kv : map_) {
+ ofs << escape(kv.first) << "=" << escape(kv.second) << "\n";
+ }
+ return true;
+}
+
+bool UnorderedMapPersistableKeyValueStoreService::load() {
+ std::lock_guard<std::recursive_mutex> lock(mutex_);
+ std::ifstream ifs(file_);
+ if (!ifs.is_open()) {
+ logger_->log_debug("Failed to open file \"%s\" to load state", file_.c_str());
+ return false;
+ }
+ std::unordered_map<std::string, std::string> map;
+ std::string line;
+ while (std::getline(ifs, line)) {
+ std::string key, value;
+ if (!parseLine(line, key, value)) {
+ continue;
+ }
+ if (key == FORMAT_VERSION_KEY) {
+ int format_version = 0;
+ try {
+ format_version = std::stoi(value);
+ } catch (...) {
+ logger_->log_error("Invalid format version number found in \"%s\": \"%s\"", file_.c_str(), value.c_str());
+ return false;
+ }
+ if (format_version > FORMAT_VERSION) {
+ logger_->log_error("\"%s\" has been serialized with a larger format version than currently known: %d > %d", file_.c_str(), format_version, FORMAT_VERSION);
+ return false;
+ }
+ } else {
+ map[key] = value;
+ }
+ }
+ map_ = std::move(map);
+ logger_->log_debug("Loaded state from \"%s\"", file_.c_str());
+ return true;
+}
+
+} /* namespace controllers */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
diff --git a/extensions/standard-processors/controllers/UnorderedMapPersistableKeyValueStoreService.h b/extensions/standard-processors/controllers/UnorderedMapPersistableKeyValueStoreService.h
new file mode 100644
index 0000000..f098d7f
--- /dev/null
+++ b/extensions/standard-processors/controllers/UnorderedMapPersistableKeyValueStoreService.h
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __UNORDERED_MAP_PERSISTABLE_KEY_VALUE_STORE_SERVICE_H__
+#define __UNORDERED_MAP_PERSISTABLE_KEY_VALUE_STORE_SERVICE_H__
+
+#include "controllers/keyvalue/AbstractAutoPersistingKeyValueStoreService.h"
+#include "UnorderedMapKeyValueStoreService.h"
+#include "core/Core.h"
+#include "properties/Configure.h"
+#include "core/logging/Logger.h"
+#include "core/logging/LoggerConfiguration.h"
+#include "core/Resource.h"
+
+#include <unordered_map>
+#include <string>
+#include <mutex>
+#include <memory>
+#include <utility>
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace controllers {
+
+class UnorderedMapPersistableKeyValueStoreService : public AbstractAutoPersistingKeyValueStoreService,
+ public UnorderedMapKeyValueStoreService {
+ public:
+ explicit UnorderedMapPersistableKeyValueStoreService(const std::string& name, const std::string& id);
+ explicit UnorderedMapPersistableKeyValueStoreService(const std::string& name, utils::Identifier uuid = utils::Identifier());
+ explicit UnorderedMapPersistableKeyValueStoreService(const std::string& name, const std::shared_ptr<Configure>& configuration);
+
+ virtual ~UnorderedMapPersistableKeyValueStoreService();
+
+ static core::Property File;
+
+ virtual void onEnable() override;
+ virtual void initialize() override;
+ virtual void notifyStop() override;
+
+ virtual bool set(const std::string& key, const std::string& value) override;
+
+ virtual bool remove(const std::string& key) override;
+
+ virtual bool clear() override;
+
+ virtual bool update(const std::string& key, const std::function<bool(bool /*exists*/, std::string& /*value*/)>& update_func) override;
+
+ virtual bool persist() override;
+
+ protected:
+ static constexpr const char* FORMAT_VERSION_KEY = "__UnorderedMapPersistableKeyValueStoreService_FormatVersion";
+ static constexpr int FORMAT_VERSION = 1;
+
+ std::string file_;
+
+ bool load();
+
+ std::string escape(const std::string& str);
+ bool parseLine(const std::string& line, std::string& key, std::string& value);
+
+ private:
+ std::shared_ptr<logging::Logger> logger_;
+};
+
+REGISTER_RESOURCE(UnorderedMapPersistableKeyValueStoreService, "A persistable key-value service implemented by a locked std::unordered_map<std::string, std::string> and persisted into a file");
+
+} /* namespace controllers */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif /* __UNORDERED_MAP_PERSISTABLE_KEY_VALUE_STORE_SERVICE_H__ */
diff --git a/extensions/standard-processors/processors/TailFile.cpp b/extensions/standard-processors/processors/TailFile.cpp
index 9e7bbf1..0a52a93 100644
--- a/extensions/standard-processors/processors/TailFile.cpp
+++ b/extensions/standard-processors/processors/TailFile.cpp
@@ -69,8 +69,7 @@ namespace minifi {
namespace processors {
core::Property TailFile::FileName("File to Tail", "Fully-qualified filename of the file that should be tailed when using single file mode, or a file regex when using multifile mode", "");
-core::Property TailFile::StateFile("State File", "Specifies the file that should be used for storing state about"
- " what data has been ingested so that upon restart NiFi can resume from where it left off",
+core::Property TailFile::StateFile("State File", "DEPRECATED. Only use it for state migration from the legacy state file.",
"TailFileState");
core::Property TailFile::Delimiter("Input Delimiter", "Specifies the character that should be used for delimiting the data being tailed"
"from the incoming file."
@@ -113,6 +112,11 @@ void TailFile::onSchedule(const std::shared_ptr<core::ProcessContext> &context,
tail_states_.clear();
state_recovered_ = false;
+ state_manager_ = context->getStateManager();
+ if (state_manager_ == nullptr) {
+ throw Exception(PROCESSOR_EXCEPTION, "Failed to get StateManager");
+ }
+
std::string value;
if (context->getProperty(Delimiter.getName(), value)) {
@@ -202,7 +206,7 @@ void TailFile::parseStateFileLine(char *buf) {
if (key == "FILENAME") {
std::string fileLocation, fileName;
if (utils::file::PathUtils::getFileNameAndPath(value, fileLocation, fileName)) {
- logger_->log_debug("Received path %s, file %s", fileLocation, fileName);
+ logger_->log_debug("State migration received path %s, file %s", fileLocation, fileName);
tail_states_.insert(std::make_pair(fileName, TailState { fileLocation, fileName, 0, 0 }));
} else {
tail_states_.insert(std::make_pair(value, TailState { fileLocation, value, 0, 0 }));
@@ -232,20 +236,61 @@ void TailFile::parseStateFileLine(char *buf) {
const auto file = key.substr(strlen(POSITION_STR));
tail_states_[file].currentTailFilePosition_ = std::stoull(value);
}
-
- return;
}
-bool TailFile::recoverState() {
- std::ifstream file(state_file_.c_str(), std::ifstream::in);
- if (!file.good()) {
- logger_->log_error("load state file failed %s", state_file_);
- return false;
+
+
+bool TailFile::recoverState(const std::shared_ptr<core::ProcessContext>& context) {
+ bool state_load_success = false;
+
+ std::unordered_map<std::string, std::string> state_map;
+ if (state_manager_->get(state_map)) {
+ std::map<std::string, TailState> new_tail_states;
+ size_t i = 0;
+ while (true) {
+ std::string name;
+ try {
+ name = state_map.at("file." + std::to_string(i) + ".name");
+ } catch (...) {
+ break;
+ }
+ try {
+ const std::string& current = state_map.at("file." + std::to_string(i) + ".current");
+ uint64_t position = std::stoull(state_map.at("file." + std::to_string(i) + ".position"));
+
+ std::string fileLocation, fileName;
+ if (utils::file::PathUtils::getFileNameAndPath(current, fileLocation, fileName)) {
+ logger_->log_debug("Received path %s, file %s", fileLocation, fileName);
+ new_tail_states.emplace(fileName, TailState { fileLocation, fileName, position, 0 });
+ } else {
+ new_tail_states.emplace(current, TailState { fileLocation, current, position, 0 });
+ }
+ } catch (...) {
+ continue;
+ }
+ ++i;
+ }
+ state_load_success = true;
+ tail_states_ = std::move(new_tail_states);
+ for (const auto& s : tail_states_) {
+ logger_->log_debug("TailState %s: %s, %s, %llu, %llu", s.first, s.second.path_, s.second.current_file_name_, s.second.currentTailFilePosition_, s.second.currentTailFileModificationTime_);
+ }
+ } else {
+ logger_->log_info("Found no stored state");
}
- tail_states_.clear();
- char buf[BUFFER_SIZE];
- for (file.getline(buf, BUFFER_SIZE); file.good(); file.getline(buf, BUFFER_SIZE)) {
- parseStateFileLine(buf);
+
+ /* We could not get the state from the StateManager, try to migrate the old state file if it exists */
+ if (!state_load_success) {
+ std::ifstream file(state_file_.c_str(), std::ifstream::in);
+ if (!file.good()) {
+ logger_->log_error("load state file failed %s", state_file_);
+ return false;
+ }
+ tail_states_.clear();
+ char buf[BUFFER_SIZE];
+ for (file.getline(buf, BUFFER_SIZE); file.good(); file.getline(buf, BUFFER_SIZE)) {
+ parseStateFileLine(buf);
+ }
}
/**
@@ -264,28 +309,34 @@ bool TailFile::recoverState() {
}
}
- logger_->log_debug("load state file succeeded for %s", state_file_);
+ logger_->log_debug("load state succeeded");
+
+ /* Save the state to the state manager */
+ storeState(context);
+
return true;
}
-void TailFile::storeState() {
- std::ofstream file(state_file_.c_str());
- if (!file.is_open()) {
- logger_->log_error("store state file failed %s", state_file_);
- return;
+bool TailFile::storeState(const std::shared_ptr<core::ProcessContext>& context) {
+ std::unordered_map<std::string, std::string> state;
+ size_t i = 0;
+ for (const auto& tail_state : tail_states_) {
+ state["file." + std::to_string(i) + ".name"] = tail_state.first;
+ state["file." + std::to_string(i) + ".current"] = utils::file::FileUtils::concat_path(tail_state.second.path_, tail_state.second.current_file_name_);
+ state["file." + std::to_string(i) + ".position"] = std::to_string(tail_state.second.currentTailFilePosition_);
+ ++i;
}
- for (const auto &state : tail_states_) {
- file << "FILENAME=" << state.first << "\n";
- file << CURRENT_STR << state.first << "=" << state.second.path_ << utils::file::FileUtils::get_separator() << state.second.current_file_name_ << "\n";
- file << POSITION_STR << state.first << "=" << state.second.currentTailFilePosition_ << "\n";
+ if (!state_manager_->set(state)) {
+ logger_->log_error("Failed to set state");
+ return false;
}
- file.close();
+ return true;
}
static bool sortTailMatchedFileItem(TailMatchedFileItem i, TailMatchedFileItem j) {
return (i.modifiedTime < j.modifiedTime);
}
-void TailFile::checkRollOver(TailState &file, const std::string &base_file_name) {
+void TailFile::checkRollOver(const std::shared_ptr<core::ProcessContext>& context, TailState &file, const std::string &base_file_name) {
struct stat statbuf;
std::vector<TailMatchedFileItem> matchedFiles;
std::string fullPath = file.path_ + utils::file::FileUtils::get_separator() + file.current_file_name_;
@@ -339,7 +390,7 @@ void TailFile::checkRollOver(TailState &file, const std::string &base_file_name)
file.current_file_name_ = item.fileName;
- storeState();
+ storeState(context);
}
}
@@ -352,7 +403,7 @@ void TailFile::onTrigger(const std::shared_ptr<core::ProcessContext> &context, c
if (!this->state_recovered_) {
state_recovered_ = true;
// recover the state if we have not done so
- this->recoverState();
+ this->recoverState(context);
}
/**
@@ -361,8 +412,7 @@ void TailFile::onTrigger(const std::shared_ptr<core::ProcessContext> &context, c
for (auto &state : tail_states_) {
auto fileLocation = state.second.path_;
- logger_->log_debug("Tailing file %s from %llu", fileLocation, state.second.currentTailFilePosition_);
- checkRollOver(state.second, state.first);
+ checkRollOver(context, state.second, state.first);
std::string fullPath = fileLocation + utils::file::FileUtils::get_separator() + state.second.current_file_name_;
struct stat statbuf;
@@ -414,7 +464,7 @@ void TailFile::onTrigger(const std::shared_ptr<core::ProcessContext> &context, c
ffr->updateKeyedAttribute(FILENAME, logName);
session->transfer(ffr, Success);
state.second.currentTailFilePosition_ += ffr->getSize() + 1;
- storeState();
+ storeState(context);
}
} else {
@@ -429,7 +479,7 @@ void TailFile::onTrigger(const std::shared_ptr<core::ProcessContext> &context, c
+ extension;
flowFile->updateKeyedAttribute(FILENAME, logName);
state.second.currentTailFilePosition_ += flowFile->getSize();
- storeState();
+ storeState(context);
}
}
state.second.currentTailFileModificationTime_ = ((uint64_t) (statbuf.st_mtime) * 1000);
diff --git a/extensions/standard-processors/processors/TailFile.h b/extensions/standard-processors/processors/TailFile.h
index 470689a..1443af0 100644
--- a/extensions/standard-processors/processors/TailFile.h
+++ b/extensions/standard-processors/processors/TailFile.h
@@ -64,7 +64,6 @@ class TailFile : public core::Processor {
}
// Destructor
virtual ~TailFile() {
- storeState();
}
// Processor Name
static constexpr char const* ProcessorName = "TailFile";
@@ -92,9 +91,9 @@ class TailFile : public core::Processor {
// Initialize, over write by NiFi TailFile
void initialize(void) override;
// recoverState
- bool recoverState();
+ bool recoverState(const std::shared_ptr<core::ProcessContext>& context);
// storeState
- void storeState();
+ bool storeState(const std::shared_ptr<core::ProcessContext>& context);
private:
@@ -105,6 +104,8 @@ class TailFile : public core::Processor {
std::string state_file_;
// Delimiter for the data incoming from the tailed file.
std::string delimiter_;
+ // StateManager
+ std::shared_ptr<core::CoreComponentStateManager> state_manager_;
// determine if state is recovered;
bool state_recovered_;
@@ -119,7 +120,7 @@ class TailFile : public core::Processor {
/**
* Check roll over for the provided file.
*/
- void checkRollOver(TailState &file, const std::string &base_file_name);
+ void checkRollOver(const std::shared_ptr<core::ProcessContext>& context, TailState &file, const std::string &base_file_name);
std::shared_ptr<logging::Logger> logger_;
};
diff --git a/extensions/standard-processors/tests/unit/TailFileTests.cpp b/extensions/standard-processors/tests/unit/TailFileTests.cpp
index b715fde..a255c92 100644
--- a/extensions/standard-processors/tests/unit/TailFileTests.cpp
+++ b/extensions/standard-processors/tests/unit/TailFileTests.cpp
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-#include <stdio.h>
+#include <cstdio>
#include <fstream>
#include <map>
#include <memory>
@@ -32,6 +32,7 @@
#include "core/Core.h"
#include "core/FlowFile.h"
#include "utils/file/FileUtils.h"
+#include "utils/file/PathUtils.h"
#include "unit/ProvenanceTestHelper.h"
#include "core/Processor.h"
#include "core/ProcessContext.h"
@@ -45,6 +46,7 @@ static std::string NEWLINE_FILE = "" // NOLINT
"four,five,six, seven";
static const char *TMP_FILE = "minifi-tmpfile.txt";
static const char *STATE_FILE = "minifi-state-file.txt";
+
TEST_CASE("TailFileWithDelimiter", "[tailfiletest2]") {
// Create and write to the test file
@@ -72,7 +74,6 @@ TEST_CASE("TailFileWithDelimiter", "[tailfiletest2]") {
state_file << dir << utils::file::FileUtils::get_separator() << STATE_FILE;
plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::FileName.getName(), temp_file.str());
- plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::StateFile.getName(), state_file.str());
plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::Delimiter.getName(), "\n");
testController.runSession(plan, false);
@@ -85,10 +86,6 @@ TEST_CASE("TailFileWithDelimiter", "[tailfiletest2]") {
REQUIRE(LogTestController::getInstance().contains("Size:" + std::to_string(NEWLINE_FILE.find_first_of('\n')) + " Offset:0"));
LogTestController::getInstance().reset();
-
- // Delete the test and state file.
-
- remove(std::string(std::string(STATE_FILE) + "." + id).c_str());
}
TEST_CASE("TestNewContent", "[tailFileWithDelimiterState]") {
@@ -116,11 +113,7 @@ TEST_CASE("TestNewContent", "[tailFileWithDelimiterState]") {
tmpfile << NEWLINE_FILE;
tmpfile.close();
- std::stringstream state_file;
- state_file << dir << utils::file::FileUtils::get_separator() << STATE_FILE;
-
plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::FileName.getName(), temp_file.str());
- plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::StateFile.getName(), state_file.str());
plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::Delimiter.getName(), "\n");
testController.runSession(plan, true);
@@ -134,14 +127,9 @@ TEST_CASE("TestNewContent", "[tailFileWithDelimiterState]") {
appendStream << std::endl;
testController.runSession(plan, true);
- REQUIRE(LogTestController::getInstance().contains("position 14"));
REQUIRE(LogTestController::getInstance().contains("minifi-tmpfile.14-34.txt"));
LogTestController::getInstance().reset();
-
- // Delete the test and state file.
-
- remove(std::string(std::string(STATE_FILE) + "." + id).c_str());
}
TEST_CASE("TestDeleteState", "[tailFileWithDelimiterState]") {
@@ -170,27 +158,22 @@ TEST_CASE("TestDeleteState", "[tailFileWithDelimiterState]") {
tmpfile << NEWLINE_FILE;
tmpfile.close();
- std::stringstream state_file;
- state_file << dir << utils::file::FileUtils::get_separator() << STATE_FILE;
-
plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::FileName.getName(), temp_file.str());
- plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::StateFile.getName(), state_file.str());
plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::Delimiter.getName(), "\n");
+
testController.runSession(plan, true);
REQUIRE(LogTestController::getInstance().contains("minifi-tmpfile.0-13.txt"));
plan->reset(true); // start a new but with state file
- remove(std::string(state_file.str() + "." + id).c_str());
+ LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output);
- testController.runSession(plan, true);
+ plan->getStateManagerProvider()->getCoreComponentStateManager(*tailfile)->clear();
- REQUIRE(LogTestController::getInstance().contains("position 0"));
+ testController.runSession(plan, true);
// if we lose state we restart
REQUIRE(LogTestController::getInstance().contains("minifi-tmpfile.0-13.txt"));
-
- // Delete the test and state file.
}
TEST_CASE("TestChangeState", "[tailFileWithDelimiterState]") {
@@ -222,65 +205,46 @@ TEST_CASE("TestChangeState", "[tailFileWithDelimiterState]") {
appendStream.write("\n", 1);
appendStream.close();
- std::stringstream state_file;
- state_file << dir << utils::file::FileUtils::get_separator() << STATE_FILE;
-
plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::FileName.getName(), temp_file.str());
- plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::StateFile.getName(), state_file.str());
plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::Delimiter.getName(), "\n");
testController.runSession(plan, true);
-
REQUIRE(LogTestController::getInstance().contains("minifi-tmpfile.0-13.txt"));
+ std::string filePath, fileName;
+ REQUIRE(utils::file::PathUtils::getFileNameAndPath(temp_file.str(), filePath, fileName));
// should stay the same
for (int i = 0; i < 5; i++) {
plan->reset(true); // start a new but with state file
- auto statefile = state_file.str() + "." + id;
-
- remove(statefile.c_str());
-
- std::ofstream newstatefile;
- newstatefile.open(statefile);
- newstatefile << "FILENAME=" << temp_file.str() << std::endl;
- newstatefile << "POSITION=14" << std::endl;
- newstatefile.close();
+ plan->getStateManagerProvider()->getCoreComponentStateManager(*tailfile)->set({{"file.0.name", fileName},
+ {"file.0.position", "14"},
+ {"file.0.current", temp_file.str()}});
testController.runSession(plan, true);
- REQUIRE(LogTestController::getInstance().contains("position 14"));
-
// if we lose state we restart
REQUIRE(LogTestController::getInstance().contains("minifi-tmpfile.14-34.txt"));
}
for (int i = 14; i < 34; i++) {
plan->reset(true); // start a new but with state file
- auto statefile = state_file.str() + "." + id;
+ plan->getStateManagerProvider()->getCoreComponentStateManager(*tailfile)->set({{"file.0.name", fileName},
+ {"file.0.position", std::to_string(i)},
+ {"file.0.current", temp_file.str()}});
- remove(statefile.c_str());
-
- std::ofstream newstatefile;
- newstatefile.open(statefile);
- newstatefile << "FILENAME=" << temp_file.str() << std::endl;
- newstatefile << "POSITION=" << i << std::endl;
- newstatefile.close();
testController.runSession(plan, true);
- REQUIRE(LogTestController::getInstance().contains("position " + std::to_string(i)));
}
plan->runCurrentProcessor();
for (int i = 14; i < 34; i++) {
REQUIRE(LogTestController::getInstance().contains("minifi-tmpfile." + std::to_string(i) + "-34.txt"));
}
- // Delete the test and state file.
-
- remove(std::string(state_file.str() + "." + id).c_str());
}
-TEST_CASE("TestInvalidState", "[tailFileWithDelimiterState]") {
+
+TEST_CASE("TestStateMigration", "[tailFileStateMigration]") {
// Create and write to the test file
TestController testController;
LogTestController::getInstance().setTrace<minifi::processors::TailFile>();
@@ -288,79 +252,110 @@ TEST_CASE("TestInvalidState", "[tailFileWithDelimiterState]") {
LogTestController::getInstance().setTrace<TestPlan>();
LogTestController::getInstance().setDebug<minifi::processors::LogAttribute>();
- std::shared_ptr<TestPlan> plan = testController.createPlan();
- std::shared_ptr<core::Processor> tailfile = plan->addProcessor("TailFile", "tailfileProc");
+ auto plan = testController.createPlan();
+ auto tailfile = plan->addProcessor("TailFile", "tailfileProc");
auto id = tailfile->getUUIDStr();
- plan->addProcessor("LogAttribute", "logattribute", core::Relationship("success", "description"), true);
+ auto logattribute = plan->addProcessor("LogAttribute", "logattribute", core::Relationship("success", "description"), true);
+ plan->setProperty(logattribute, org::apache::nifi::minifi::processors::LogAttribute::FlowFilesToLog.getName(), "0");
char format[] = "/tmp/gt.XXXXXX";
auto dir = testController.createTempDirectory(format);
- std::stringstream temp_file;
- temp_file << dir << utils::file::FileUtils::get_separator() << TMP_FILE;
- std::ofstream tmpfile;
- tmpfile.open(temp_file.str());
- tmpfile << NEWLINE_FILE;
- tmpfile.close();
+ auto createTempFile = [&](const std::string& file_name) -> std::string /*file_path*/ {
+ std::stringstream temp_file;
+ temp_file << dir << utils::file::FileUtils::get_separator() << file_name;
- std::ofstream appendStream;
- appendStream.open(temp_file.str(), std::ios_base::app);
- appendStream.write("\n", 1);
- appendStream.close();
+ std::ofstream tmpfile;
+ tmpfile.open(temp_file.str(), std::ios::out | std::ios::binary);
+ tmpfile << NEWLINE_FILE << "\n";
+ tmpfile.close();
+
+ return temp_file.str();
+ };
std::stringstream state_file;
state_file << dir << utils::file::FileUtils::get_separator() << STATE_FILE;
- plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::FileName.getName(), temp_file.str());
- plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::StateFile.getName(), state_file.str());
- plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::Delimiter.getName(), "\n");
+ auto statefile = state_file.str() + "." + id;
- testController.runSession(plan, true);
+ SECTION("single") {
+ const std::string temp_file = createTempFile(TMP_FILE);
-#ifdef WIN32
- REQUIRE(LogTestController::getInstance().contains("minifi-tmpfile.0-14.txt"));
-#else
- REQUIRE(LogTestController::getInstance().contains("minifi-tmpfile.0-13.txt"));
-#endif
+ plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::FileName.getName(), temp_file);
+ plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::StateFile.getName(), state_file.str());
+ plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::Delimiter.getName(), "\n");
- plan->reset(true); // start a new but with state file
+ std::ofstream newstatefile;
+ newstatefile.open(statefile);
+ SECTION("legacy") {
+ newstatefile << "FILENAME=" << temp_file << std::endl;
+ newstatefile << "POSITION=14" << std::endl;
+ }
+ SECTION("newer single") {
+ newstatefile << "FILENAME=" << TMP_FILE << std::endl;
+ newstatefile << "POSITION." << TMP_FILE << "=14" << std::endl;
+ newstatefile << "CURRENT." << TMP_FILE << "=" << temp_file << std::endl;
+ }
+ newstatefile.close();
- auto statefile = state_file.str() + "." + id;
+ testController.runSession(plan, true);
+ REQUIRE(LogTestController::getInstance().contains("minifi-tmpfile.14-34.txt"));
- remove(statefile.c_str());
+ std::unordered_map<std::string, std::string> state;
+ REQUIRE(plan->getStateManagerProvider()->getCoreComponentStateManager(*tailfile)->get(state));
- SECTION("No Filename") {
- std::ofstream newstatefile;
- newstatefile.open(statefile);
- newstatefile << "POSITION=14" << std::endl;
- newstatefile.close();
- REQUIRE_THROWS(testController.runSession(plan, true));
+ std::string filePath, fileName;
+ REQUIRE(utils::file::PathUtils::getFileNameAndPath(temp_file, filePath, fileName));
+ std::unordered_map<std::string, std::string> expected_state{{"file.0.name", fileName},
+ {"file.0.position", "35"},
+ {"file.0.current", temp_file}};
+ REQUIRE(expected_state == state);
}
- SECTION("Invalid current filename") {
- std::ofstream newstatefile;
- newstatefile.open(statefile);
- newstatefile << "FILENAME=minifi-tmpfile.txt" << std::endl;
- newstatefile << "CURRENT.minifi-tempfile.txt=minifi-tmpfile.txt" << std::endl;
- newstatefile << "POSITION=14" << std::endl;
- newstatefile.close();
- REQUIRE_THROWS(testController.runSession(plan, true));
- }
- SECTION("No current filename and partial path") {
- std::ofstream newstatefile;
- newstatefile.open(statefile);
- newstatefile << "FILENAME=minifi-tmpfile.txt" << std::endl;
- newstatefile << "POSITION=14" << std::endl;
- newstatefile.close();
- REQUIRE_THROWS(testController.runSession(plan, true));
- }
+ SECTION("multiple") {
+ const std::string file_name_1 = "bar.txt";
+ const std::string file_name_2 = "foo.txt";
+ const std::string temp_file_1 = createTempFile(file_name_1);
+ const std::string temp_file_2 = createTempFile(file_name_2);
+
+ plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::TailMode.getName(), "Multiple file");
+ plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::BaseDirectory.getName(), dir);
+ plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::FileName.getName(), ".*");
+ plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::StateFile.getName(), state_file.str());
+ plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::Delimiter.getName(), "\n");
-// Delete the test and state file.
+ std::ofstream newstatefile;
+ newstatefile.open(statefile);
+ newstatefile << "FILENAME=" << file_name_1 << std::endl;
+ newstatefile << "POSITION." << file_name_1 << "=14" << std::endl;
+ newstatefile << "CURRENT." << file_name_1 << "=" << temp_file_1 << std::endl;
+ newstatefile << "FILENAME=" << file_name_2 << std::endl;
+ newstatefile << "POSITION." << file_name_2 << "=15" << std::endl;
+ newstatefile << "CURRENT." << file_name_2 << "=" << temp_file_2 << std::endl;
+ newstatefile.close();
- remove(std::string(std::string(STATE_FILE) + "." + id).c_str());
+ testController.runSession(plan, true);
+ REQUIRE(LogTestController::getInstance().contains(file_name_1.substr(0, file_name_1.rfind('.')) + ".14-34.txt"));
+ REQUIRE(LogTestController::getInstance().contains(file_name_2.substr(0, file_name_2.rfind('.')) + ".15-34.txt"));
+
+ std::unordered_map<std::string, std::string> state;
+ REQUIRE(plan->getStateManagerProvider()->getCoreComponentStateManager(*tailfile)->get(state));
+
+ std::string filePath1, filePath2, fileName1, fileName2;
+ REQUIRE(utils::file::PathUtils::getFileNameAndPath(temp_file_1, filePath1, fileName1));
+ REQUIRE(utils::file::PathUtils::getFileNameAndPath(temp_file_2, filePath2, fileName2));
+ std::unordered_map<std::string, std::string> expected_state{{"file.0.name", fileName1},
+ {"file.0.position", "35"},
+ {"file.0.current", temp_file_1},
+ {"file.1.name", fileName2},
+ {"file.1.position", "35"},
+ {"file.1.current", temp_file_2}};
+ REQUIRE(expected_state == state);
+ }
}
+
TEST_CASE("TailFileWithOutDelimiter", "[tailfiletest2]") {
// Create and write to the test file
@@ -385,11 +380,15 @@ TEST_CASE("TailFileWithOutDelimiter", "[tailfiletest2]") {
tmpfile << NEWLINE_FILE;
tmpfile.close();
- std::stringstream state_file;
- state_file << dir << utils::file::FileUtils::get_separator() << STATE_FILE;
+ SECTION("Single") {
+ plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::FileName.getName(), temp_file.str());
+ }
- plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::FileName.getName(), temp_file.str());
- plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::StateFile.getName(), state_file.str());
+ SECTION("Multiple") {
+ plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::FileName.getName(), "minifi-.*\\.txt");
+ plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::TailMode.getName(), "Multiple file");
+ plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::BaseDirectory.getName(), dir);
+ }
testController.runSession(plan, false);
auto records = plan->getProvenanceRecords();
@@ -432,11 +431,7 @@ TEST_CASE("TailFileLongWithDelimiter", "[tailfiletest2]") {
tmpfile << line1 << "\n" << line2 << "\n" << line3 << "\n" << line4;
tmpfile.close();
- std::stringstream state_file;
- state_file << dir << utils::file::FileUtils::get_separator() << STATE_FILE;
-
plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::FileName.getName(), temp_file.str());
- plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::StateFile.getName(), state_file.str());
plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::Delimiter.getName(), "\n");
std::shared_ptr<core::Processor> log_attr = plan->addProcessor("LogAttribute", "Log", core::Relationship("success", "description"), true);
@@ -486,7 +481,7 @@ TEST_CASE("TailFileWithDelimiterMultipleDelimiters", "[tailfiletest2]") {
std::string line1(4097, '\n');
std::mt19937 gen(std::random_device { }());
std::generate_n(line1.begin(), 4095, [&]() -> char {
- return 32 + gen() % (127 - 32);
+ return 32 + gen() % (127 - 32);
});
std::string line2("foo");
std::string line3("bar");
@@ -514,11 +509,7 @@ TEST_CASE("TailFileWithDelimiterMultipleDelimiters", "[tailfiletest2]") {
tmpfile << line1 << "\n" << line2 << "\n" << line3 << "\n" << line4;
tmpfile.close();
- std::stringstream state_file;
- state_file << dir << utils::file::FileUtils::get_separator() << STATE_FILE;
-
plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::FileName.getName(), temp_file.str());
- plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::StateFile.getName(), state_file.str());
plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::Delimiter.getName(), "\n");
std::shared_ptr<core::Processor> log_attr = plan->addProcessor("LogAttribute", "Log", core::Relationship("success", "description"), true);
@@ -556,14 +547,13 @@ TEST_CASE("TailWithInvalid", "[tailfiletest2]") {
auto dir = testController.createTempDirectory(format);
SECTION("No File and No base") {
- plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::TailMode.getName(), "Multiple file");
+ plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::TailMode.getName(), "Multiple file");
}
SECTION("No base") {
- plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::FileName.getName(), "minifi-.*\\.txt");
- plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::TailMode.getName(), "Multiple file");
+ plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::FileName.getName(), "minifi-.*\\.txt");
+ plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::TailMode.getName(), "Multiple file");
}
- plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::StateFile.getName(), STATE_FILE);
REQUIRE_THROWS(plan->runNextProcessor());
}
@@ -592,9 +582,6 @@ TEST_CASE("TailFileWithRealDelimiterAndRotate", "[tailfiletest2]") {
#endif
in_file.append("testfifo.txt");
- std::string state_file(dir);
- state_file.append("tailfile.state");
-
std::ofstream in_file_stream(in_file);
in_file_stream << NEWLINE_FILE;
in_file_stream.flush();
@@ -604,16 +591,16 @@ TEST_CASE("TailFileWithRealDelimiterAndRotate", "[tailfiletest2]") {
plan->setProperty(tail_file, processors::TailFile::Delimiter.getName(), std::string(1, DELIM));
SECTION("single") {
- plan->setProperty(
- tail_file,
- processors::TailFile::FileName.getName(), in_file);
+ plan->setProperty(
+ tail_file,
+ processors::TailFile::FileName.getName(), in_file);
}
SECTION("Multiple") {
- plan->setProperty(tail_file, org::apache::nifi::minifi::processors::TailFile::FileName.getName(), "test.*");
- plan->setProperty(tail_file, org::apache::nifi::minifi::processors::TailFile::TailMode.getName(), "Multiple file");
- plan->setProperty(tail_file, org::apache::nifi::minifi::processors::TailFile::BaseDirectory.getName(), dir);
+ plan->setProperty(tail_file, org::apache::nifi::minifi::processors::TailFile::FileName.getName(), "test.*");
+ plan->setProperty(tail_file, org::apache::nifi::minifi::processors::TailFile::TailMode.getName(), "Multiple file");
+ plan->setProperty(tail_file, org::apache::nifi::minifi::processors::TailFile::BaseDirectory.getName(), dir);
}
- plan->setProperty(tail_file, processors::TailFile::StateFile.getName(), state_file);
+
auto log_attr = plan->addProcessor("LogAttribute", "Log", core::Relationship("success", "description"), true);
plan->setProperty(log_attr, processors::LogAttribute::FlowFilesToLog.getName(), "0");
plan->setProperty(log_attr, processors::LogAttribute::LogPayload.getName(), "true");
@@ -621,7 +608,7 @@ TEST_CASE("TailFileWithRealDelimiterAndRotate", "[tailfiletest2]") {
plan->runNextProcessor(); // Tail
plan->runNextProcessor(); // Log
- std::cout << " find " << expected_pieces << std::endl;
+
REQUIRE(LogTestController::getInstance().contains(std::string("Logged ") + std::to_string(expected_pieces) + " flow files"));
in_file_stream << DELIM;
@@ -665,9 +652,6 @@ TEST_CASE("TailFileWithMultileRolledOverFiles", "[tailfiletest2]") {
char format[] = "/tmp/gt.XXXXXX";
auto dir = testController.createTempDirectory(format);
- std::string state_file(dir);
- state_file.append("tailfile.state");
-
// Define test input file
std::string in_file(dir);
in_file.append("fruits.txt");
@@ -687,7 +671,6 @@ TEST_CASE("TailFileWithMultileRolledOverFiles", "[tailfiletest2]") {
auto tail_file = plan->addProcessor("TailFile", "Tail");
plan->setProperty(tail_file, processors::TailFile::Delimiter.getName(), std::string(1, DELIM));
plan->setProperty(tail_file, processors::TailFile::FileName.getName(), in_file);
- plan->setProperty(tail_file, processors::TailFile::StateFile.getName(), state_file);
auto log_attr = plan->addProcessor("LogAttribute", "Log", core::Relationship("success", "description"), true);
plan->setProperty(log_attr, processors::LogAttribute::FlowFilesToLog.getName(), "0");
// Log as many FFs as it can to make sure exactly the expected amount is produced
@@ -711,3 +694,4 @@ TEST_CASE("TailFileWithMultileRolledOverFiles", "[tailfiletest2]") {
REQUIRE(LogTestController::getInstance().contains(std::string("Logged 2 flow files")));
}
+
diff --git a/extensions/windows-event-log/Bookmark.cpp b/extensions/windows-event-log/Bookmark.cpp
index d727de2..1bbcfe9 100644
--- a/extensions/windows-event-log/Bookmark.cpp
+++ b/extensions/windows-event-log/Bookmark.cpp
@@ -2,6 +2,7 @@
#include <direct.h>
+#include "wel/UnicodeConversion.h"
#include "utils/file/FileUtils.h"
#include "utils/ScopeGuard.h"
@@ -10,16 +11,30 @@ namespace apache {
namespace nifi {
namespace minifi {
namespace processors {
-
-Bookmark::Bookmark(const std::wstring& channel, const std::wstring& query, const std::string& bookmarkRootDir, const std::string& uuid, bool processOldEvents, std::shared_ptr<logging::Logger> logger)
- :logger_(logger) {
- if (!createUUIDDir(bookmarkRootDir, uuid, filePath_))
- return;
-
- filePath_ += "Bookmark.txt";
-
- if (!getBookmarkXmlFromFile(bookmarkXml_)) {
- return;
+static const std::string BOOKMARK_KEY = "bookmark";
+
+Bookmark::Bookmark(const std::wstring& channel, const std::wstring& query, const std::string& bookmarkRootDir, const std::string& uuid, bool processOldEvents, std::shared_ptr<core::CoreComponentStateManager> state_manager, std::shared_ptr<logging::Logger> logger)
+ : logger_(logger)
+ , state_manager_(state_manager) {
+ std::unordered_map<std::string, std::string> state_map;
+ if (state_manager_->get(state_map) && state_map.count(BOOKMARK_KEY) == 1U) {
+ bookmarkXml_ = wel::to_wstring(state_map[BOOKMARK_KEY].c_str());
+ } else if (!bookmarkRootDir.empty()) {
+ filePath_ = utils::file::FileUtils::concat_path(
+ utils::file::FileUtils::concat_path(
+ utils::file::FileUtils::concat_path(bookmarkRootDir, "uuid"), uuid), "Bookmark.txt");
+
+ std::wstring bookmarkXml;
+ if (getBookmarkXmlFromFile(bookmarkXml)) {
+ if (saveBookmarkXml(bookmarkXml) && state_manager_->persist()) {
+ logger_->log_info("State migration successful");
+ rename(filePath_.c_str(), (filePath_ + "-migrated").c_str());
+ } else {
+ logger_->log_warn("Could not migrate state from specified State Directory %s", bookmarkRootDir);
+ }
+ }
+ } else {
+ logger_->log_info("Found no stored state");
}
if (!bookmarkXml_.empty()) {
@@ -31,9 +46,8 @@ Bookmark::Bookmark(const std::wstring& channel, const std::wstring& query, const
LOG_LAST_ERROR(EvtCreateBookmark);
bookmarkXml_.clear();
- if (!createEmptyBookmarkXmlFile()) {
- return;
- }
+ state_map.erase(BOOKMARK_KEY);
+ state_manager_->set(state_map);
}
if (!(hBookmark_ = EvtCreateBookmark(0))) {
@@ -88,6 +102,15 @@ EVT_HANDLE Bookmark::getBookmarkHandleFromXML() {
return hBookmark_;
}
+bool Bookmark::saveBookmarkXml(const std::wstring& bookmarkXml) {
+ bookmarkXml_ = bookmarkXml;
+
+ std::unordered_map<std::string, std::string> state_map;
+ state_map[BOOKMARK_KEY] = wel::to_string(bookmarkXml_.c_str());
+
+ return state_manager_->set(state_map);
+}
+
bool Bookmark::saveBookmark(EVT_HANDLE hEvent)
{
std::wstring bookmarkXml;
@@ -95,9 +118,7 @@ bool Bookmark::saveBookmark(EVT_HANDLE hEvent)
return false;
}
- saveBookmarkXml(bookmarkXml);
-
- return true;
+ return saveBookmarkXml(bookmarkXml);
}
bool Bookmark::getNewBookmarkXml(EVT_HANDLE hEvent, std::wstring& bookmarkXml) {
@@ -122,7 +143,7 @@ bool Bookmark::getNewBookmarkXml(EVT_HANDLE hEvent, std::wstring& bookmarkXml) {
return false;
}
- bookmarkXml = &buf[0];
+ bookmarkXml = buf.data();
return true;
}
@@ -135,62 +156,12 @@ bool Bookmark::getNewBookmarkXml(EVT_HANDLE hEvent, std::wstring& bookmarkXml) {
return false;
}
-void Bookmark::saveBookmarkXml(const std::wstring& bookmarkXml) {
- bookmarkXml_ = bookmarkXml;
-
- // Write new bookmark over old and in the end write '!'. Then new bookmark is read until '!'. This is faster than truncate.
- file_.seekp(std::ios::beg);
-
- file_ << bookmarkXml << L'!';
-
- file_.flush();
-}
-
-bool Bookmark::createEmptyBookmarkXmlFile() {
- if (file_.is_open()) {
- file_.close();
- }
-
- file_.open(filePath_, std::ios::out);
- if (!file_.is_open()) {
- logger_->log_error("Cannot open %s", filePath_.c_str());
- return false;
- }
-
- return true;
-}
-
-bool Bookmark::createUUIDDir(const std::string& bookmarkRootDir, const std::string& uuid, std::string& dir)
-{
- if (bookmarkRootDir.empty()) {
- dir.clear();
- return false;
- }
-
- auto dirWithBackslash = bookmarkRootDir;
- if (bookmarkRootDir.back() != '\\') {
- dirWithBackslash += '\\';
- }
-
- dir = dirWithBackslash + "uuid\\" + uuid + "\\";
-
- utils::file::FileUtils::create_dir(dir);
-
- auto dirCreated = utils::file::FileUtils::is_directory(dir.c_str());
- if (!dirCreated) {
- logger_->log_error("Cannot create %s", dir.c_str());
- dir.clear();
- }
-
- return dirCreated;
-}
-
bool Bookmark::getBookmarkXmlFromFile(std::wstring& bookmarkXml) {
bookmarkXml.clear();
std::wifstream file(filePath_);
if (!file.is_open()) {
- return createEmptyBookmarkXmlFile();
+ return false;
}
// Generically is not efficient, but bookmarkXML is small ~100 bytes.
@@ -206,13 +177,6 @@ bool Bookmark::getBookmarkXmlFromFile(std::wstring& bookmarkXml) {
file.close();
- file_.open(filePath_);
- if (!file_.is_open()) {
- logger_->log_error("Cannot open %s", filePath_.c_str());
- bookmarkXml.clear();
- return false;
- }
-
if (bookmarkXml.empty()) {
return true;
}
@@ -222,7 +186,7 @@ bool Bookmark::getBookmarkXmlFromFile(std::wstring& bookmarkXml) {
if (std::wstring::npos == pos) {
logger_->log_error("No '!' in bookmarXml '%ls'", bookmarkXml.c_str());
bookmarkXml.clear();
- return createEmptyBookmarkXmlFile();
+ return false;
}
// Remove '!'.
diff --git a/extensions/windows-event-log/Bookmark.h b/extensions/windows-event-log/Bookmark.h
index 02e47bb..4d40064 100644
--- a/extensions/windows-event-log/Bookmark.h
+++ b/extensions/windows-event-log/Bookmark.h
@@ -19,25 +19,22 @@ namespace processors {
class Bookmark
{
public:
- Bookmark(const std::wstring& channel, const std::wstring& query, const std::string& bookmarkRootDir, const std::string& uuid, bool processOldEvents, std::shared_ptr<logging::Logger> logger);
+ Bookmark(const std::wstring& channel, const std::wstring& query, const std::string& bookmarkRootDir, const std::string& uuid, bool processOldEvents, std::shared_ptr<core::CoreComponentStateManager> state_manager, std::shared_ptr<logging::Logger> logger);
~Bookmark();
operator bool() const;
EVT_HANDLE getBookmarkHandleFromXML();
bool getNewBookmarkXml(EVT_HANDLE hEvent, std::wstring& bookmarkXml);
- void saveBookmarkXml(const std::wstring& bookmarkXml);
+ bool saveBookmarkXml(const std::wstring& bookmarkXml);
private:
bool saveBookmark(EVT_HANDLE hEvent);
- bool createEmptyBookmarkXmlFile();
- bool createUUIDDir(const std::string& bookmarkRootDir, const std::string& uuid, std::string& dir);
- std::string filePath(const std::string& uuid);
bool getBookmarkXmlFromFile(std::wstring& bookmarkXml);
private:
std::shared_ptr<logging::Logger> logger_;
+ std::shared_ptr<core::CoreComponentStateManager> state_manager_;
std::string filePath_;
bool ok_{};
EVT_HANDLE hBookmark_{};
- std::wfstream file_;
std::wstring bookmarkXml_;
};
diff --git a/extensions/windows-event-log/ConsumeWindowsEventLog.cpp b/extensions/windows-event-log/ConsumeWindowsEventLog.cpp
index c7f41c7..85686c0 100644
--- a/extensions/windows-event-log/ConsumeWindowsEventLog.cpp
+++ b/extensions/windows-event-log/ConsumeWindowsEventLog.cpp
@@ -146,7 +146,7 @@ core::Property ConsumeWindowsEventLog::BookmarkRootDirectory(
core::PropertyBuilder::createProperty("State Directory")->
isRequired(false)->
withDefaultValue("CWELState")->
- withDescription("Directory which contains processor state data.")->
+ withDescription("DEPRECATED. Only use it for state migration from the state file, supplying the legacy state directory.")->
build());
core::Property ConsumeWindowsEventLog::ProcessOldEvents(
@@ -201,6 +201,11 @@ bool ConsumeWindowsEventLog::insertHeaderName(wel::METADATA_NAMES &header, const
}
void ConsumeWindowsEventLog::onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) {
+ state_manager_ = context->getStateManager();
+ if (state_manager_ == nullptr) {
+ throw Exception(PROCESSOR_EXCEPTION, "Failed to get StateManager");
+ }
+
context->getProperty(IdentifierMatcher.getName(), regex_);
context->getProperty(ResolveAsAttributes.getName(), resolve_as_attributes_);
context->getProperty(IdentifierFunction.getName(), apply_identifier_function_);
@@ -263,7 +268,7 @@ void ConsumeWindowsEventLog::onSchedule(const std::shared_ptr<core::ProcessConte
logger_->log_error("State Directory is empty");
return;
}
- pBookmark_ = std::make_unique<Bookmark>(wstrChannel_, wstrQuery_, bookmarkDir, getUUIDStr(), processOldEvents, logger_);
+ pBookmark_ = std::make_unique<Bookmark>(wstrChannel_, wstrQuery_, bookmarkDir, getUUIDStr(), processOldEvents, state_manager_, logger_);
if (!*pBookmark_) {
pBookmark_.reset();
return;
@@ -591,10 +596,6 @@ void ConsumeWindowsEventLog::putEventRenderFlowFileToSession(const EventRender&
}
}
-void ConsumeWindowsEventLog::notifyStop()
-{
-}
-
void ConsumeWindowsEventLog::LogWindowsError()
{
auto error_id = GetLastError();
diff --git a/extensions/windows-event-log/ConsumeWindowsEventLog.h b/extensions/windows-event-log/ConsumeWindowsEventLog.h
index 02ce868..c01eb89 100644
--- a/extensions/windows-event-log/ConsumeWindowsEventLog.h
+++ b/extensions/windows-event-log/ConsumeWindowsEventLog.h
@@ -97,7 +97,6 @@ public:
virtual void onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) override;
//! Initialize, overwrite by NiFi ConsumeWindowsEventLog
virtual void initialize(void) override;
- virtual void notifyStop() override;
protected:
@@ -114,12 +113,13 @@ protected:
private:
// Logger
+ std::shared_ptr<logging::Logger> logger_;
+ std::shared_ptr<core::CoreComponentStateManager> state_manager_;
wel::METADATA_NAMES header_names_;
std::string header_delimiter_;
std::string channel_;
std::wstring wstrChannel_;
std::wstring wstrQuery_;
- std::shared_ptr<logging::Logger> logger_;
std::string regex_;
bool resolve_as_attributes_;
bool apply_identifier_function_;
diff --git a/extensions/windows-event-log/wel/UnicodeConversion.h b/extensions/windows-event-log/wel/UnicodeConversion.h
index 01c48bc..90f5200 100644
--- a/extensions/windows-event-log/wel/UnicodeConversion.h
+++ b/extensions/windows-event-log/wel/UnicodeConversion.h
@@ -34,6 +34,11 @@ namespace org {
ATL::CW2A aString(pChar, CP_UTF8);
return std::string(aString);
}
+
+ inline std::wstring to_wstring(const char* pChar) {
+ ATL::CA2W wString(pChar, CP_UTF8);
+ return std::wstring(wString);
+ }
} /* namespace wel */
} /* namespace minifi */
} /* namespace nifi */
diff --git a/libminifi/CMakeLists.txt b/libminifi/CMakeLists.txt
index 9eaaf58..4a2c296 100644
--- a/libminifi/CMakeLists.txt
+++ b/libminifi/CMakeLists.txt
@@ -89,7 +89,7 @@ if (NOT OPENSSL_OFF)
set(TLS_SOURCES "src/io/tls/*.cpp")
endif()
-file(GLOB SOURCES "src/utils/file/*.cpp" "src/sitetosite/*.cpp" "src/core/logging/*.cpp" "src/core/state/*.cpp" "src/core/state/nodes/*.cpp" "src/c2/protocols/*.cpp" "src/c2/triggers/*.cpp" "src/c2/*.cpp" "src/io/*.cpp" ${SOCKET_SOURCES} ${TLS_SOURCES} "src/core/controller/*.cpp" "src/controllers/*.cpp" "src/core/*.cpp" "src/core/repository/*.cpp" "src/core/yaml/*.cpp" "src/core/reporting/*.cpp" "src/provenance/*.cpp" "src/utils/*.cpp" "src/*.cpp")
+file(GLOB SOURCES "src/utils/file/*.cpp" "src/sitetosite/*.cpp" "src/core/logging/*.cpp" "src/core/state/*.cpp" "src/core/state/nodes/*.cpp" "src/c2/protocols/*.cpp" "src/c2/triggers/*.cpp" "src/c2/*.cpp" "src/io/*.cpp" ${SOCKET_SOURCES} ${TLS_SOURCES} "src/core/controller/*.cpp" "src/controllers/*.cpp" "src/controllers/keyvalue/*.cpp" "src/core/*.cpp" "src/core/repository/*.cpp" "src/core/yaml/*.cpp" "src/core/reporting/*.cpp" "src/provenance/*.cpp" "src/utils/*.cpp" "src/*.cpp")
if(WIN32)
include(FindMessageCompiler)
diff --git a/libminifi/include/FlowController.h b/libminifi/include/FlowController.h
index 5efac83..0ee0e01 100644
--- a/libminifi/include/FlowController.h
+++ b/libminifi/include/FlowController.h
@@ -219,6 +219,11 @@ class FlowController : public core::controller::ControllerServiceProvider, publi
virtual std::future<utils::TaskRescheduleInfo> disableControllerService(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode);
/**
+ * Removes all controller services.
+ */
+ virtual void clearControllerServices();
+
+ /**
* Gets all controller services.
*/
virtual std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> getAllControllerServices();
@@ -286,6 +291,11 @@ class FlowController : public core::controller::ControllerServiceProvider, publi
virtual void enableAllControllerServices();
/**
+ * Disables all controller services for the provider.
+ */
+ virtual void disableAllControllerServices();
+
+ /**
* Retrieves metrics node
* @return metrics response node
*/
diff --git a/libminifi/include/controllers/keyvalue/AbstractAutoPersistingKeyValueStoreService.h b/libminifi/include/controllers/keyvalue/AbstractAutoPersistingKeyValueStoreService.h
new file mode 100644
index 0000000..e632503
--- /dev/null
+++ b/libminifi/include/controllers/keyvalue/AbstractAutoPersistingKeyValueStoreService.h
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_KEYVALUE_AbstractAutoPersistingKeyValueStoreService_H_
+#define LIBMINIFI_INCLUDE_KEYVALUE_AbstractAutoPersistingKeyValueStoreService_H_
+
+#include "PersistableKeyValueStoreService.h"
+#include "core/Core.h"
+#include "properties/Configure.h"
+#include "core/logging/Logger.h"
+#include "core/logging/LoggerConfiguration.h"
+#include "core/Resource.h"
+
+#include <string>
+#include <thread>
+#include <mutex>
+#include <memory>
+#include <utility>
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace controllers {
+
+class AbstractAutoPersistingKeyValueStoreService : public PersistableKeyValueStoreService {
+ public:
+ explicit AbstractAutoPersistingKeyValueStoreService(const std::string& name, const std::string& id);
+ explicit AbstractAutoPersistingKeyValueStoreService(const std::string& name, utils::Identifier uuid = utils::Identifier());
+
+ virtual ~AbstractAutoPersistingKeyValueStoreService();
+
+ static core::Property AlwaysPersist;
+ static core::Property AutoPersistenceInterval;
+
+ virtual void initialize() override;
+ virtual void onEnable() override;
+ virtual void notifyStop() override;
+
+ protected:
+ bool always_persist_;
+ uint64_t auto_persistence_interval_;
+
+ std::thread persisting_thread_;
+ bool running_;
+ std::mutex persisting_mutex_;
+ std::condition_variable persisting_cv_;
+ void persistingThreadFunc();
+
+ private:
+ std::shared_ptr<logging::Logger> logger_;
+
+ void stopPersistingThread();
+};
+
+} /* namespace controllers */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif /* LIBMINIFI_INCLUDE_KEYVALUE_AbstractAutoPersistingKeyValueStoreService_H_ */
diff --git a/libminifi/include/controllers/keyvalue/AbstractCoreComponentStateManagerProvider.h b/libminifi/include/controllers/keyvalue/AbstractCoreComponentStateManagerProvider.h
new file mode 100644
index 0000000..cddd152
--- /dev/null
+++ b/libminifi/include/controllers/keyvalue/AbstractCoreComponentStateManagerProvider.h
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_KEYVALUE_AbstractCoreComponentStateManagerProvider_H_
+#define LIBMINIFI_INCLUDE_KEYVALUE_AbstractCoreComponentStateManagerProvider_H_
+
+#include "core/Core.h"
+#include "core/CoreComponentState.h"
+
+#include <memory>
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace controllers {
+
+ class AbstractCoreComponentStateManagerProvider : public std::enable_shared_from_this<AbstractCoreComponentStateManagerProvider>,
+ public core::CoreComponentStateManagerProvider {
+ public:
+ virtual ~AbstractCoreComponentStateManagerProvider();
+
+ virtual std::shared_ptr<core::CoreComponentStateManager> getCoreComponentStateManager(const std::string& uuid) override;
+
+ virtual std::unordered_map<std::string, std::unordered_map<std::string, std::string>> getAllCoreComponentStates() override;
+
+ class AbstractCoreComponentStateManager : public core::CoreComponentStateManager{
+ private:
+ std::shared_ptr<AbstractCoreComponentStateManagerProvider> provider_;
+ std::string id_;
+ bool state_valid_;
+ std::unordered_map<std::string, std::string> state_;
+
+ public:
+ AbstractCoreComponentStateManager(std::shared_ptr<AbstractCoreComponentStateManagerProvider> provider, const std::string& id);
+
+ virtual bool set(const std::unordered_map<std::string, std::string>& kvs) override;
+
+ virtual bool get(std::unordered_map<std::string, std::string>& kvs) override;
+
+ virtual bool clear() override;
+
+ virtual bool persist() override;
+ };
+
+ protected:
+ virtual bool setImpl(const std::string& key, const std::string& value) = 0;
+ virtual bool getImpl(const std::string& key, std::string& value) = 0;
+ virtual bool getImpl(std::unordered_map<std::string, std::string>& kvs) = 0;
+ virtual bool removeImpl(const std::string& key) = 0;
+ virtual bool persistImpl() = 0;
+
+ virtual std::string serialize(const std::unordered_map<std::string, std::string>& kvs);
+ virtual bool deserialize(const std::string& serialized, std::unordered_map<std::string, std::string>& kvs);
+};
+
+} /* namespace controllers */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif /* LIBMINIFI_INCLUDE_KEYVALUE_AbstractCoreComponentStateManagerProvider_H_ */
diff --git a/libminifi/include/controllers/keyvalue/KeyValueStoreService.h b/libminifi/include/controllers/keyvalue/KeyValueStoreService.h
new file mode 100644
index 0000000..5594435
--- /dev/null
+++ b/libminifi/include/controllers/keyvalue/KeyValueStoreService.h
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_KEYVALUE_KeyValueStoreService_H_
+#define LIBMINIFI_INCLUDE_KEYVALUE_KeyValueStoreService_H_
+
+#include "core/Core.h"
+#include "properties/Configure.h"
+#include "core/controller/ControllerService.h"
+
+#include <unordered_map>
+#include <unordered_set>
+#include <string>
+#include <cstdint>
+#include <functional>
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace controllers {
+
+class KeyValueStoreService : public core::controller::ControllerService {
+ public:
+ explicit KeyValueStoreService(const std::string& name, const std::string& id);
+ explicit KeyValueStoreService(const std::string& name, utils::Identifier uuid = utils::Identifier());
+
+ virtual ~KeyValueStoreService();
+
+ virtual void yield() override;
+ virtual bool isRunning() override;
+ virtual bool isWorkAvailable() override;
+
+ virtual bool set(const std::string& key, const std::string& value) = 0;
+
+ virtual bool get(const std::string& key, std::string& value) = 0;
+
+ virtual bool get(std::unordered_map<std::string, std::string>& kvs) = 0;
+
+ virtual bool remove(const std::string& key) = 0;
+
+ virtual bool clear() = 0;
+
+ virtual bool update(const std::string& key, const std::function<bool(bool /*exists*/, std::string& /*value*/)>& update_func) = 0;
+};
+
+} /* namespace controllers */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif /* LIBMINIFI_INCLUDE_KEYVALUE_KeyValueStoreService_H_ */
diff --git a/libminifi/include/controllers/keyvalue/PersistableKeyValueStoreService.h b/libminifi/include/controllers/keyvalue/PersistableKeyValueStoreService.h
new file mode 100644
index 0000000..c9bda4f
--- /dev/null
+++ b/libminifi/include/controllers/keyvalue/PersistableKeyValueStoreService.h
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_KEYVALUE_PersistableKeyValueStoreService_H_
+#define LIBMINIFI_INCLUDE_KEYVALUE_PersistableKeyValueStoreService_H_
+
+#include "KeyValueStoreService.h"
+#include "AbstractCoreComponentStateManagerProvider.h"
+#include "core/Core.h"
+#include "properties/Configure.h"
+
+#include <string>
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace controllers {
+
+class PersistableKeyValueStoreService : virtual public KeyValueStoreService, public AbstractCoreComponentStateManagerProvider {
+ public:
+ explicit PersistableKeyValueStoreService(const std::string& name, const std::string& id);
+ explicit PersistableKeyValueStoreService(const std::string& name, utils::Identifier uuid = utils::Identifier());
+
+ virtual ~PersistableKeyValueStoreService();
+
+ virtual bool persist() = 0;
+
+ protected:
+ virtual bool setImpl(const std::string& key, const std::string& value) override;
+ virtual bool getImpl(const std::string& key, std::string& value) override;
+ virtual bool getImpl(std::unordered_map<std::string, std::string>& kvs) override;
+ virtual bool removeImpl(const std::string& key) override;
+ virtual bool persistImpl() override;
+};
+
+} /* namespace controllers */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif /* LIBMINIFI_INCLUDE_KEYVALUE_PersistableKeyValueStoreService_H_ */
diff --git a/libminifi/include/core/ConfigurableComponent.h b/libminifi/include/core/ConfigurableComponent.h
index 4905001..e3d6d62 100644
--- a/libminifi/include/core/ConfigurableComponent.h
+++ b/libminifi/include/core/ConfigurableComponent.h
@@ -98,11 +98,13 @@ class ConfigurableComponent {
* @return result of set operation.
*/
bool setSupportedProperties(std::set<Property> properties);
+
/**
- * Sets supported properties for the ConfigurableComponent
- * @param supported properties
- * @return result of set operation.
+ * Updates the supported properties for the ConfigurableComponent
+ * @param new supported properties
+ * @return result of update operation.
*/
+ bool updateSupportedProperties(std::set<Property> properties);
/**
* Gets whether or not this processor supports dynamic properties.
diff --git a/libminifi/include/core/CoreComponentState.h b/libminifi/include/core/CoreComponentState.h
new file mode 100644
index 0000000..21c7613
--- /dev/null
+++ b/libminifi/include/core/CoreComponentState.h
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef LIBMINIFI_INCLUDE_CORE_CoreComponentState_H_
+#define LIBMINIFI_INCLUDE_CORE_CoreComponentState_H_
+
+#include "Core.h"
+
+#include <cstdint>
+#include <memory>
+#include <unordered_map>
+#include <string>
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace core {
+
+class CoreComponentStateManager {
+ public:
+ virtual ~CoreComponentStateManager() {
+ }
+
+ virtual bool set(const std::unordered_map<std::string, std::string>& kvs) = 0;
+
+ virtual bool get(std::unordered_map<std::string, std::string>& kvs) = 0;
+
+ virtual bool clear() = 0;
+
+ virtual bool persist() = 0;
+};
+
+class CoreComponentStateManagerProvider {
+ public:
+ virtual ~CoreComponentStateManagerProvider() = default;
+
+ virtual std::shared_ptr<CoreComponentStateManager> getCoreComponentStateManager(const std::string& uuid) = 0;
+
+ virtual std::shared_ptr<CoreComponentStateManager> getCoreComponentStateManager(const CoreComponent& component) {
+ return getCoreComponentStateManager(component.getUUIDStr());
+ }
+
+ virtual std::unordered_map<std::string, std::unordered_map<std::string, std::string>> getAllCoreComponentStates() = 0;
+};
+
+} /* namespace core */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif /* LIBMINIFI_INCLUDE_CORE_CoreComponentState_H_ */
diff --git a/libminifi/include/core/ProcessContext.h b/libminifi/include/core/ProcessContext.h
index db44db9..a6be32d 100644
--- a/libminifi/include/core/ProcessContext.h
+++ b/libminifi/include/core/ProcessContext.h
@@ -32,9 +32,12 @@
#include "core/controller/ControllerServiceProvider.h"
#include "core/controller/ControllerServiceLookup.h"
#include "core/logging/LoggerConfiguration.h"
+#include "controllers/keyvalue/AbstractAutoPersistingKeyValueStoreService.h"
#include "ProcessorNode.h"
#include "core/Repository.h"
#include "core/FlowFile.h"
+#include "core/CoreComponentState.h"
+#include "utils/file/FileUtils.h"
#include "VariableRegistry.h"
namespace org {
@@ -62,6 +65,7 @@ class ProcessContext : public controller::ControllerServiceLookup, public core::
configure_(std::make_shared<minifi::Configure>()),
initialized_(false) {
repo_ = repo;
+ state_manager_provider_ = getStateManagerProvider(logger_, controller_service_provider_, nullptr);
}
// Constructor
@@ -79,6 +83,7 @@ class ProcessContext : public controller::ControllerServiceLookup, public core::
logger_(logging::LoggerFactory<ProcessContext>::getLogger()),
initialized_(false) {
repo_ = repo;
+ state_manager_provider_ = getStateManagerProvider(logger_, controller_service_provider_, configuration);
}
// Destructor
virtual ~ProcessContext() {
@@ -208,6 +213,112 @@ class ProcessContext : public controller::ControllerServiceLookup, public core::
bool isInitialized() const {
return initialized_;
}
+
+ static constexpr char const* DefaultStateManagerProviderName = "defaultstatemanagerprovider";
+
+ std::shared_ptr<CoreComponentStateManager> getStateManager() {
+ if (state_manager_provider_ == nullptr) {
+ return nullptr;
+ }
+ return state_manager_provider_->getCoreComponentStateManager(*processor_node_);
+ }
+
+ static std::shared_ptr<core::CoreComponentStateManagerProvider> getOrCreateDefaultStateManagerProvider(
+ std::shared_ptr<controller::ControllerServiceProvider> controller_service_provider,
+ std::shared_ptr<minifi::Configure> configuration,
+ const char *base_path = "") {
+ static std::mutex mutex;
+ std::lock_guard<std::mutex> lock(mutex);
+
+ /* See if we have already created a default provider */
+ std::shared_ptr<core::controller::ControllerServiceNode> node = controller_service_provider->getControllerServiceNode(DefaultStateManagerProviderName);
+ if (node != nullptr) {
+ return std::dynamic_pointer_cast<core::CoreComponentStateManagerProvider>(node->getControllerServiceImplementation());
+ }
+
+ /* Try to get configuration options for default provider */
+ std::string always_persist, auto_persistence_interval;
+ configuration->get(Configure::nifi_state_management_provider_local_always_persist, always_persist);
+ configuration->get(Configure::nifi_state_management_provider_local_auto_persistence_interval, auto_persistence_interval);
+
+ /* Function to help creating a provider */
+ auto create_provider = [&](
+ const std::string& type,
+ const std::string& longType,
+ const std::unordered_map<std::string, std::string>& extraProperties) -> std::shared_ptr<core::CoreComponentStateManagerProvider> {
+ node = controller_service_provider->createControllerService(type, longType, DefaultStateManagerProviderName, true /*firstTimeAdded*/);
+ if (node == nullptr) {
+ return nullptr;
+ }
+ node->initialize();
+ auto provider = node->getControllerServiceImplementation();
+ if (provider == nullptr) {
+ return nullptr;
+ }
+ if (!always_persist.empty() && !provider->setProperty(
+ controllers::AbstractAutoPersistingKeyValueStoreService::AlwaysPersist.getName(), always_persist)) {
+ return nullptr;
+ }
+ if (!auto_persistence_interval.empty() && !provider->setProperty(
+ controllers::AbstractAutoPersistingKeyValueStoreService::AutoPersistenceInterval.getName(), auto_persistence_interval)) {
+ return nullptr;
+ }
+ for (const auto& extraProperty : extraProperties) {
+ if (!provider->setProperty(extraProperty.first, extraProperty.second)) {
+ return nullptr;
+ }
+ }
+ if (!node->enable()) {
+ return nullptr;
+ }
+ return std::dynamic_pointer_cast<core::CoreComponentStateManagerProvider>(provider);
+ };
+
+ /* Try to create a RocksDB-backed provider */
+ auto provider = create_provider("RocksDbPersistableKeyValueStoreService",
+ "org.apache.nifi.minifi.controllers.RocksDbPersistableKeyValueStoreService",
+ {{"Directory", utils::file::FileUtils::concat_path(base_path, "corecomponentstate")}});
+ if (provider != nullptr) {
+ return provider;
+ }
+
+ /* Fall back to a locked unordered map-backed provider */
+ provider = create_provider("UnorderedMapPersistableKeyValueStoreService",
+ "org.apache.nifi.minifi.controllers.UnorderedMapPersistableKeyValueStoreService",
+ {{"File", utils::file::FileUtils::concat_path(base_path, "corecomponentstate.txt")}});
+ if (provider != nullptr) {
+ return provider;
+ }
+
+ /* Give up */
+ return nullptr;
+ }
+
+ static std::shared_ptr<core::CoreComponentStateManagerProvider> getStateManagerProvider(
+ std::shared_ptr<logging::Logger> logger,
+ std::shared_ptr<controller::ControllerServiceProvider> controller_service_provider,
+ std::shared_ptr<minifi::Configure> configuration) {
+ if (controller_service_provider == nullptr) {
+ return nullptr;
+ }
+ std::string id;
+ if (configuration != nullptr && configuration->get(minifi::Configure::nifi_state_management_provider_local, id)) {
+ auto node = controller_service_provider->getControllerServiceNode(id);
+ if (node == nullptr) {
+ logger->log_error("Failed to find the CoreComponentStateManagerProvider %s defined by %s", id, minifi::Configure::nifi_state_management_provider_local);
+ return nullptr;
+ } else {
+ return std::dynamic_pointer_cast<core::CoreComponentStateManagerProvider>(node->getControllerServiceImplementation());
+ }
+ } else {
+ auto state_manager_provider = getOrCreateDefaultStateManagerProvider(controller_service_provider, configuration);
+ if (state_manager_provider == nullptr) {
+ logger->log_error("Failed to create default CoreComponentStateManagerProvider");
+ }
+ return state_manager_provider;
+ }
+ }
+
private:
template<typename T>
@@ -217,6 +328,8 @@ class ProcessContext : public controller::ControllerServiceLookup, public core::
// controller service provider.
std::shared_ptr<controller::ControllerServiceProvider> controller_service_provider_;
+ // state manager provider
+ std::shared_ptr<core::CoreComponentStateManagerProvider> state_manager_provider_;
// repository shared pointer.
std::shared_ptr<core::Repository> repo_;
std::shared_ptr<core::Repository> flow_repo_;
diff --git a/libminifi/include/core/controller/ControllerService.h b/libminifi/include/core/controller/ControllerService.h
index 5231592..d42d6f5 100644
--- a/libminifi/include/core/controller/ControllerService.h
+++ b/libminifi/include/core/controller/ControllerService.h
@@ -137,6 +137,9 @@ class ControllerService : public ConfigurableComponent, public Connectable {
void setState(ControllerServiceState state) {
current_state_ = state;
+ if (state == DISABLED) {
+ notifyStop();
+ }
}
virtual bool supportsDynamicProperties() {
diff --git a/libminifi/include/core/controller/ControllerServiceProvider.h b/libminifi/include/core/controller/ControllerServiceProvider.h
index 6a147e1..75f8b23 100644
--- a/libminifi/include/core/controller/ControllerServiceProvider.h
+++ b/libminifi/include/core/controller/ControllerServiceProvider.h
@@ -113,6 +113,11 @@ class ControllerServiceProvider : public CoreComponent, public ConfigurableCompo
virtual std::future<utils::TaskRescheduleInfo> disableControllerService(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) = 0;
/**
+ * Removes all controller services.
+ */
+ virtual void clearControllerServices() = 0;
+
+ /**
* Gets a list of all controller services.
*/
virtual std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> getAllControllerServices() {
@@ -215,6 +220,8 @@ class ControllerServiceProvider : public CoreComponent, public ConfigurableCompo
virtual void enableAllControllerServices() = 0;
+ virtual void disableAllControllerServices() = 0;
+
virtual bool supportsDynamicProperties() {
return false;
}
diff --git a/libminifi/include/core/controller/StandardControllerServiceProvider.h b/libminifi/include/core/controller/StandardControllerServiceProvider.h
index 6ce6651..8dc6f1b 100644
--- a/libminifi/include/core/controller/StandardControllerServiceProvider.h
+++ b/libminifi/include/core/controller/StandardControllerServiceProvider.h
@@ -116,7 +116,6 @@ class StandardControllerServiceProvider : public ControllerServiceProvider, publ
virtual void enableAllControllerServices() {
logger_->log_info("Enabling %u controller services", controller_map_->getAllControllerServices().size());
for (auto service : controller_map_->getAllControllerServices()) {
-
if (service->canEnable()) {
logger_->log_info("Enabling %s", service->getName());
agent_->enableControllerService(service);
@@ -126,6 +125,15 @@ class StandardControllerServiceProvider : public ControllerServiceProvider, publ
}
}
+ virtual void disableAllControllerServices() {
+ logger_->log_info("Disabling %u controller services", controller_map_->getAllControllerServices().size());
+ for (auto service : controller_map_->getAllControllerServices()) {
+ if (!service->disable()) {
+ logger_->log_warn("Could not disable %s", service->getName());
+ }
+ }
+ }
+
void enableControllerServices(std::vector<std::shared_ptr<ControllerServiceNode>> serviceNodes) {
for (auto node : serviceNodes) {
enableControllerService(node);
@@ -141,6 +149,10 @@ class StandardControllerServiceProvider : public ControllerServiceProvider, publ
}
}
+ void clearControllerServices() {
+ controller_map_->clear();
+ }
+
void verifyCanStopReferencingComponents(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) {
}
diff --git a/libminifi/include/properties/Configure.h b/libminifi/include/properties/Configure.h
index 824d386..99adea5 100644
--- a/libminifi/include/properties/Configure.h
+++ b/libminifi/include/properties/Configure.h
@@ -100,6 +100,11 @@ class Configure : public Properties {
static const char *nifi_c2_flow_base_url;
static const char *nifi_c2_full_heartbeat;
+ // state management options
+ static const char *nifi_state_management_provider_local;
+ static const char *nifi_state_management_provider_local_always_persist;
+ static const char *nifi_state_management_provider_local_auto_persistence_interval;
+
private:
std::string agent_identifier_;
std::string agent_class_;
diff --git a/libminifi/src/Configure.cpp b/libminifi/src/Configure.cpp
index fbb0fa9..04f585a 100644
--- a/libminifi/src/Configure.cpp
+++ b/libminifi/src/Configure.cpp
@@ -63,6 +63,9 @@ const char *Configure::nifi_c2_flow_id = "nifi.c2.flow.id";
const char *Configure::nifi_c2_flow_url = "nifi.c2.flow.url";
const char *Configure::nifi_c2_flow_base_url = "nifi.c2.flow.base.url";
const char *Configure::nifi_c2_full_heartbeat = "nifi.c2.full.heartbeat";
+const char *Configure::nifi_state_management_provider_local = "nifi.state.management.provider.local";
+const char *Configure::nifi_state_management_provider_local_always_persist = "nifi.state.management.provider.local.always.persist";
+const char *Configure::nifi_state_management_provider_local_auto_persistence_interval = "nifi.state.management.provider.local.auto.persistence.interval";
} /* namespace minifi */
} /* namespace nifi */
diff --git a/libminifi/src/FlowController.cpp b/libminifi/src/FlowController.cpp
index 350b64d..1cff30e 100644
--- a/libminifi/src/FlowController.cpp
+++ b/libminifi/src/FlowController.cpp
@@ -248,6 +248,8 @@ int16_t FlowController::stop(bool force, uint64_t timeToWait) {
this->timer_scheduler_->stop();
this->event_scheduler_->stop();
this->cron_scheduler_->stop();
+ // stop the ControllerServices
+ this->controller_service_provider_->disableAllControllerServices();
thread_pool_.shutdown();
running_ = false;
}
@@ -793,6 +795,13 @@ std::future<utils::TaskRescheduleInfo> FlowController::disableControllerService(
}
/**
+ * Removes all controller services.
+ */
+void FlowController::clearControllerServices() {
+ controller_service_provider_->clearControllerServices();
+}
+
+/**
* Gets all controller services.
*/
std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> FlowController::getAllControllerServices() {
@@ -888,6 +897,13 @@ void FlowController::enableAllControllerServices() {
controller_service_provider_->enableAllControllerServices();
}
+/**
+ * Disables all controller services for the provider.
+ */
+void FlowController::disableAllControllerServices() {
+ controller_service_provider_->disableAllControllerServices();
+}
+
int16_t FlowController::applyUpdate(const std::string &source, const std::string &configuration) {
if (applyConfiguration(source, configuration)) {
return 1;
diff --git a/libminifi/src/c2/C2Agent.cpp b/libminifi/src/c2/C2Agent.cpp
index 6670a73..b71b8f7 100644
--- a/libminifi/src/c2/C2Agent.cpp
+++ b/libminifi/src/c2/C2Agent.cpp
@@ -25,6 +25,8 @@
#include <string>
#include <memory>
#include "c2/ControllerSocketProtocol.h"
+#include "core/ProcessContext.h"
+#include "core/CoreComponentState.h"
#include "core/state/UpdateController.h"
#include "core/logging/Logger.h"
#include "core/logging/LoggerConfiguration.h"
@@ -431,6 +433,28 @@ void C2Agent::handle_c2_server_response(const C2ContentResponse &resp) {
update_sink_->drainRepositories();
C2Payload response(Operation::ACKNOWLEDGE, resp.ident, false, true);
enqueue_c2_response(std::move(response));
+ } else if (resp.name == "corecomponentstate") {
+ // TODO(bakaid): untested
+ std::vector<std::shared_ptr<state::StateController>> components = update_sink_->getComponents(resp.name);
+ auto state_manager_provider = core::ProcessContext::getStateManagerProvider(logger_, controller_, configuration_);
+ if (state_manager_provider != nullptr) {
+ for (auto &component : components) {
+ logger_->log_debug("Clearing state for component %s", component->getComponentName());
+ auto state_manager = state_manager_provider->getCoreComponentStateManager(component->getComponentUUID());
+ if (state_manager != nullptr) {
+ component->stop(true);
+ state_manager->clear();
+ state_manager->persist();
+ component->start();
+ } else {
+ logger_->log_warn("Failed to get StateManager for component %s", component->getComponentUUID());
+ }
+ }
+ } else {
+ logger_->log_error("Failed to get StateManagerProvider");
+ }
+ C2Payload response(Operation::ACKNOWLEDGE, resp.ident, false, true);
+ enqueue_c2_response(std::move(response));
} else {
logger_->log_debug("Clearing unknown %s", resp.name);
}
@@ -565,7 +589,30 @@ void C2Agent::handle_describe(const C2ContentResponse &resp) {
response.addPayload(std::move(options));
}
enqueue_c2_response(std::move(response));
+ return;
}
+ } else if (resp.name == "corecomponentstate") {
+ C2Payload response(Operation::ACKNOWLEDGE, resp.ident, false, true);
+ response.setLabel("corecomponentstate");
+ C2Payload states(Operation::ACKNOWLEDGE, resp.ident, false, true);
+ states.setLabel("corecomponentstate");
+ auto state_manager_provider = core::ProcessContext::getStateManagerProvider(logger_, controller_, configuration_);
+ if (state_manager_provider != nullptr) {
+ auto core_component_states = state_manager_provider->getAllCoreComponentStates();
+ for (const auto& core_component_state : core_component_states) {
+ C2Payload state(Operation::ACKNOWLEDGE, resp.ident, false, true);
+ state.setLabel(core_component_state.first);
+ for (const auto& kv : core_component_state.second) {
+ C2ContentResponse entry(Operation::ACKNOWLEDGE);
+ entry.name = kv.first;
+ entry.operation_arguments[kv.first] = kv.second;
+ state.addContent(std::move(entry));
+ }
+ states.addPayload(std::move(state));
+ }
+ }
+ response.addPayload(std::move(states));
+ enqueue_c2_response(std::move(response));
return;
}
C2Payload response(Operation::ACKNOWLEDGE, resp.ident, false, true);
diff --git a/libminifi/src/controllers/keyvalue/AbstractAutoPersistingKeyValueStoreService.cpp b/libminifi/src/controllers/keyvalue/AbstractAutoPersistingKeyValueStoreService.cpp
new file mode 100644
index 0000000..6487eab
--- /dev/null
+++ b/libminifi/src/controllers/keyvalue/AbstractAutoPersistingKeyValueStoreService.cpp
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "controllers/keyvalue/AbstractAutoPersistingKeyValueStoreService.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace controllers {
+
+core::Property AbstractAutoPersistingKeyValueStoreService::AlwaysPersist(
+ core::PropertyBuilder::createProperty("Always Persist")->withDescription("Persist every change instead of persisting it periodically.")
+ ->isRequired(false)->withDefaultValue<bool>(false)->build());
+core::Property AbstractAutoPersistingKeyValueStoreService::AutoPersistenceInterval(
+ core::PropertyBuilder::createProperty("Auto Persistence Interval")->withDescription("The interval of the periodic task persisting all values. "
+ "Only used if Always Persist is false. "
+ "If set to 0 seconds, auto persistence will be disabled.")
+ ->isRequired(false)->withDefaultValue<core::TimePeriodValue>("1 min")->build());
+
+AbstractAutoPersistingKeyValueStoreService::AbstractAutoPersistingKeyValueStoreService(const std::string& name, const std::string& id)
+ : KeyValueStoreService(name, id)
+ , PersistableKeyValueStoreService(name, id)
+ , always_persist_(false)
+ , auto_persistence_interval_(0U)
+ , running_(false)
+ , logger_(logging::LoggerFactory<AbstractAutoPersistingKeyValueStoreService>::getLogger()) {
+}
+
+AbstractAutoPersistingKeyValueStoreService::AbstractAutoPersistingKeyValueStoreService(const std::string& name, utils::Identifier uuid /*= utils::Identifier()*/)
+ : KeyValueStoreService(name, uuid)
+ , PersistableKeyValueStoreService(name, uuid)
+ , always_persist_(false)
+ , auto_persistence_interval_(0U)
+ , running_(false)
+ , logger_(logging::LoggerFactory<AbstractAutoPersistingKeyValueStoreService>::getLogger()) {
+}
+
+AbstractAutoPersistingKeyValueStoreService::~AbstractAutoPersistingKeyValueStoreService() {
+ stopPersistingThread();
+}
+
+void AbstractAutoPersistingKeyValueStoreService::stopPersistingThread() {
+ std::unique_lock<std::mutex> lock(persisting_mutex_);
+ if (persisting_thread_.joinable()) {
+ running_ = false;
+ persisting_cv_.notify_one();
+ lock.unlock();
+ persisting_thread_.join();
+ }
+}
+
+void AbstractAutoPersistingKeyValueStoreService::initialize() {
+ ControllerService::initialize();
+ std::set<core::Property> supportedProperties;
+ supportedProperties.insert(AlwaysPersist);
+ supportedProperties.insert(AutoPersistenceInterval);
+ updateSupportedProperties(supportedProperties);
+}
+
+void AbstractAutoPersistingKeyValueStoreService::onEnable() {
+ std::unique_lock<std::mutex> lock(persisting_mutex_);
+
+ if (configuration_ == nullptr) {
+ logger_->log_debug("Cannot enable AbstractAutoPersistingKeyValueStoreService");
+ return;
+ }
+
+ std::string value;
+ if (!getProperty(AlwaysPersist.getName(), value)) {
+ logger_->log_error("Always Persist attribute is missing or invalid");
+ } else {
+ utils::StringUtils::StringToBool(value, always_persist_);
+ }
+ if (!getProperty(AutoPersistenceInterval.getName(), value)) {
+ logger_->log_error("Auto Persistence Interval attribute is missing or invalid");
+ } else {
+ core::TimeUnit unit;
+ if (!core::Property::StringToTime(value, auto_persistence_interval_, unit) || !core::Property::ConvertTimeUnitToMS(auto_persistence_interval_, unit, auto_persistence_interval_)) {
+ logger_->log_error("Auto Persistence Interval attribute is invalid");
+ }
+ }
+
+ if (!always_persist_ && auto_persistence_interval_ != 0U) {
+ if (!persisting_thread_.joinable()) {
+ logger_->log_trace("Starting auto persistence thread");
+ running_ = true;
+ persisting_thread_ = std::thread(&AbstractAutoPersistingKeyValueStoreService::persistingThreadFunc, this);
+ }
+ }
+
+ logger_->log_trace("Enabled AbstractAutoPersistingKeyValueStoreService");
+}
+
+void AbstractAutoPersistingKeyValueStoreService::notifyStop() {
+ stopPersistingThread();
+}
+
+void AbstractAutoPersistingKeyValueStoreService::persistingThreadFunc() {
+ std::unique_lock<std::mutex> lock(persisting_mutex_);
+
+ while (true) {
+ logger_->log_trace("Persisting thread is going to sleep for %d ms", auto_persistence_interval_);
+ persisting_cv_.wait_for(lock, std::chrono::milliseconds(auto_persistence_interval_), [this] {
+ return !running_;
+ });
+
+ if (!running_) {
+ logger_->log_trace("Stopping persistence thread");
+ return;
+ }
+
+ persist();
+ }
+}
+
+} /* namespace controllers */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
diff --git a/libminifi/src/controllers/keyvalue/AbstractCoreComponentStateManagerProvider.cpp b/libminifi/src/controllers/keyvalue/AbstractCoreComponentStateManagerProvider.cpp
new file mode 100644
index 0000000..8d817d5
--- /dev/null
+++ b/libminifi/src/controllers/keyvalue/AbstractCoreComponentStateManagerProvider.cpp
@@ -0,0 +1,143 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "controllers/keyvalue/AbstractCoreComponentStateManagerProvider.h"
+
+#include "rapidjson/rapidjson.h"
+#include "rapidjson/document.h"
+#include "rapidjson/stringbuffer.h"
+#include "rapidjson/writer.h"
+
+#include <memory>
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace controllers {
+
+AbstractCoreComponentStateManagerProvider::AbstractCoreComponentStateManager::AbstractCoreComponentStateManager(
+ std::shared_ptr<AbstractCoreComponentStateManagerProvider> provider,
+ const std::string& id)
+ : provider_(std::move(provider))
+ , id_(id)
+ , state_valid_(false) {
+ std::string serialized;
+ if (provider_->getImpl(id_, serialized) && provider_->deserialize(serialized, state_)) {
+ state_valid_ = true;
+ }
+}
+
+bool AbstractCoreComponentStateManagerProvider::AbstractCoreComponentStateManager::set(const std::unordered_map<std::string, std::string>& kvs) {
+ if (provider_->setImpl(id_, provider_->serialize(kvs))) {
+ state_valid_ = true;
+ state_ = kvs;
+ return true;
+ } else {
+ return false;
+ }
+}
+
+bool AbstractCoreComponentStateManagerProvider::AbstractCoreComponentStateManager::get(std::unordered_map<std::string, std::string>& kvs) {
+ if (!state_valid_) {
+ return false;
+ }
+ kvs = state_;
+ return true;
+}
+
+bool AbstractCoreComponentStateManagerProvider::AbstractCoreComponentStateManager::clear() {
+ if (!state_valid_) {
+ return false;
+ }
+ if (provider_->removeImpl(id_)) {
+ state_valid_ = false;
+ state_.clear();
+ return true;
+ } else {
+ return false;
+ }
+}
+
+bool AbstractCoreComponentStateManagerProvider::AbstractCoreComponentStateManager::persist() {
+ if (!state_valid_) {
+ return false;
+ }
+ return provider_->persistImpl();
+}
+
+AbstractCoreComponentStateManagerProvider::~AbstractCoreComponentStateManagerProvider() = default;
+
+std::shared_ptr<core::CoreComponentStateManager> AbstractCoreComponentStateManagerProvider::getCoreComponentStateManager(const std::string& uuid) {
+ return std::make_shared<AbstractCoreComponentStateManager>(shared_from_this(), uuid);
+}
+
+std::unordered_map<std::string, std::unordered_map<std::string, std::string>> AbstractCoreComponentStateManagerProvider::getAllCoreComponentStates() {
+ std::unordered_map<std::string, std::string> all_serialized;
+ if (!getImpl(all_serialized)) {
+ return {};
+ }
+
+ std::unordered_map<std::string, std::unordered_map<std::string, std::string>> all_deserialized;
+ for (const auto& serialized : all_serialized) {
+ std::unordered_map<std::string, std::string> deserialized;
+ if (deserialize(serialized.second, deserialized)) {
+ all_deserialized.emplace(serialized.first, std::move(deserialized));
+ }
+ }
+
+ return all_deserialized;
+}
+
+std::string AbstractCoreComponentStateManagerProvider::serialize(const std::unordered_map<std::string, std::string>& kvs) {
+ rapidjson::Document doc(rapidjson::kObjectType);
+ rapidjson::Document::AllocatorType &alloc = doc.GetAllocator();
+ for (const auto& kv : kvs) {
+ doc.AddMember(rapidjson::StringRef(kv.first.c_str(), kv.first.size()), rapidjson::StringRef(kv.second.c_str(), kv.second.size()), alloc);
+ }
+
+ rapidjson::StringBuffer buffer;
+ rapidjson::Writer<rapidjson::StringBuffer> writer(buffer);
+ doc.Accept(writer);
+
+ return buffer.GetString();
+}
+
+bool AbstractCoreComponentStateManagerProvider::deserialize(const std::string& serialized, std::unordered_map<std::string, std::string>& kvs) {
+ rapidjson::StringStream stream(serialized.c_str());
+ rapidjson::Document doc;
+ rapidjson::ParseResult res = doc.ParseStream(stream);
+ if (!res) {
+ return false;
+ }
+ if (!doc.IsObject()) {
+ return false;
+ }
+
+ kvs.clear();
+ for (const auto& kv : doc.GetObject()) {
+ kvs[kv.name.GetString()] = kv.value.GetString();
+ }
+
+ return true;
+}
+
+} /* namespace controllers */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
diff --git a/extensions/windows-event-log/wel/UnicodeConversion.h b/libminifi/src/controllers/keyvalue/KeyValueStoreService.cpp
similarity index 51%
copy from extensions/windows-event-log/wel/UnicodeConversion.h
copy to libminifi/src/controllers/keyvalue/KeyValueStoreService.cpp
index 01c48bc..a7176ce 100644
--- a/extensions/windows-event-log/wel/UnicodeConversion.h
+++ b/libminifi/src/controllers/keyvalue/KeyValueStoreService.cpp
@@ -1,7 +1,4 @@
/**
- * @file UnicodeConversion.h
- * Unicode conversion functions
- *
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -18,24 +15,39 @@
* limitations under the License.
*/
-#pragma once
+#include "controllers/keyvalue/KeyValueStoreService.h"
-#include <string>
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace controllers {
-#include <atlbase.h>
-#include <atlconv.h>
+KeyValueStoreService::KeyValueStoreService(const std::string& name, const std::string& id)
+ : ControllerService(name, id) {
+}
-namespace org {
- namespace apache {
- namespace nifi {
- namespace minifi {
- namespace wel {
- inline std::string to_string(const wchar_t* pChar) {
- ATL::CW2A aString(pChar, CP_UTF8);
- return std::string(aString);
- }
- } /* namespace wel */
- } /* namespace minifi */
- } /* namespace nifi */
- } /* namespace apache */
+KeyValueStoreService::KeyValueStoreService(const std::string& name, utils::Identifier uuid /*= utils::Identifier()*/)
+ : ControllerService(name, uuid) {
+}
+
+KeyValueStoreService::~KeyValueStoreService() {
+}
+
+void KeyValueStoreService::yield() {
+}
+
+bool KeyValueStoreService::isRunning() {
+ return getState() == core::controller::ControllerServiceState::ENABLED;
+}
+
+bool KeyValueStoreService::isWorkAvailable() {
+ return false;
+}
+
+
+} /* namespace controllers */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
} /* namespace org */
diff --git a/libminifi/src/controllers/keyvalue/PersistableKeyValueStoreService.cpp b/libminifi/src/controllers/keyvalue/PersistableKeyValueStoreService.cpp
new file mode 100644
index 0000000..0a1c933
--- /dev/null
+++ b/libminifi/src/controllers/keyvalue/PersistableKeyValueStoreService.cpp
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "controllers/keyvalue/PersistableKeyValueStoreService.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace controllers {
+PersistableKeyValueStoreService::PersistableKeyValueStoreService(const std::string& name, const std::string& id)
+ : KeyValueStoreService(name, id) {
+}
+
+PersistableKeyValueStoreService::PersistableKeyValueStoreService(const std::string& name, utils::Identifier uuid /*= utils::Identifier()*/)
+ : KeyValueStoreService(name, uuid) {
+}
+
+PersistableKeyValueStoreService::~PersistableKeyValueStoreService() {
+}
+
+bool PersistableKeyValueStoreService::setImpl(const std::string& key, const std::string& value) {
+ return set(key, value);
+}
+
+bool PersistableKeyValueStoreService::getImpl(const std::string& key, std::string& value) {
+ return get(key, value);
+}
+
+bool PersistableKeyValueStoreService::getImpl(std::unordered_map<std::string, std::string>& kvs) {
+ return get(kvs);
+}
+
+bool PersistableKeyValueStoreService::removeImpl(const std::string& key) {
+ return remove(key);
+}
+
+bool PersistableKeyValueStoreService::persistImpl() {
+ return persist();
+}
+
+} /* namespace controllers */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
diff --git a/libminifi/src/core/ConfigurableComponent.cpp b/libminifi/src/core/ConfigurableComponent.cpp
index 2ef6f0b..bdc92c4 100644
--- a/libminifi/src/core/ConfigurableComponent.cpp
+++ b/libminifi/src/core/ConfigurableComponent.cpp
@@ -195,6 +195,19 @@ bool ConfigurableComponent::setSupportedProperties(std::set<Property> properties
return true;
}
+bool ConfigurableComponent::updateSupportedProperties(std::set<Property> properties) {
+ if (!canEdit()) {
+ return false;
+ }
+
+ std::lock_guard<std::mutex> lock(configuration_mutex_);
+
+ for (auto item : properties) {
+ properties_[item.getName()] = item;
+ }
+ return true;
+}
+
bool ConfigurableComponent::getDynamicProperty(const std::string name, std::string &value) const {
std::lock_guard<std::mutex> lock(configuration_mutex_);
diff --git a/libminifi/src/core/FlowConfiguration.cpp b/libminifi/src/core/FlowConfiguration.cpp
index dafa6c5..b1e9cea 100644
--- a/libminifi/src/core/FlowConfiguration.cpp
+++ b/libminifi/src/core/FlowConfiguration.cpp
@@ -36,6 +36,10 @@ static_initializers &get_static_functions() {
}
FlowConfiguration::~FlowConfiguration() {
+ if (service_provider_ != nullptr) {
+ /* This is needed to counteract the StandardControllerServiceProvider <-> StandardControllerServiceNode shared_ptr cycle */
+ service_provider_->clearControllerServices();
+ }
}
std::shared_ptr<core::Processor> FlowConfiguration::createProcessor(std::string name, utils::Identifier & uuid) {
diff --git a/libminifi/test/TestBase.cpp b/libminifi/test/TestBase.cpp
index 52cf39a..51dbad5 100644
--- a/libminifi/test/TestBase.cpp
+++ b/libminifi/test/TestBase.cpp
@@ -34,7 +34,7 @@ void LogTestController::setLevel(const std::string name, spdlog::level::level_en
}
TestPlan::TestPlan(std::shared_ptr<core::ContentRepository> content_repo, std::shared_ptr<core::Repository> flow_repo, std::shared_ptr<core::Repository> prov_repo,
- const std::shared_ptr<minifi::state::response::FlowVersion> &flow_version, const std::shared_ptr<minifi::Configure> &configuration)
+ const std::shared_ptr<minifi::state::response::FlowVersion> &flow_version, const std::shared_ptr<minifi::Configure> &configuration, const char* state_dir)
: configuration_(configuration),
content_repo_(content_repo),
flow_repo_(flow_repo),
@@ -45,6 +45,23 @@ TestPlan::TestPlan(std::shared_ptr<core::ContentRepository> content_repo, std::s
flow_version_(flow_version),
logger_(logging::LoggerFactory<TestPlan>::getLogger()) {
stream_factory = org::apache::nifi::minifi::io::StreamFactory::getInstance(std::make_shared<minifi::Configure>());
+ controller_services_ = std::make_shared<core::controller::ControllerServiceMap>();
+ controller_services_provider_ = std::make_shared<core::controller::StandardControllerServiceProvider>(controller_services_, nullptr, configuration_);
+ /* Inject the default state provider ahead of ProcessContext to make sure we have a unique state directory */
+ if (state_dir == nullptr) {
+ char state_dir_name_template[] = "/var/tmp/teststate.XXXXXX";
+ state_dir_ = utils::file::FileUtils::create_temp_directory(state_dir_name_template);
+ } else {
+ state_dir_ = state_dir;
+ }
+ state_manager_provider_ = core::ProcessContext::getOrCreateDefaultStateManagerProvider(controller_services_provider_, configuration_, state_dir_.c_str());
+}
+
+TestPlan::~TestPlan() {
+ for (auto& processor : configured_processors_) {
+ processor->setScheduledState(core::ScheduledState::STOPPED);
+ }
+ controller_services_provider_->clearControllerServices();
}
std::shared_ptr<core::Processor> TestPlan::addProcessor(const std::shared_ptr<core::Processor> &processor, const std::string &name, const std::initializer_list<core::Relationship>& relationships,
@@ -151,6 +168,31 @@ std::shared_ptr<core::Processor> TestPlan::addProcessor(const std::string &proce
return addProcessor(processor_name, uuid, name, relationships, linkToPrevious);
}
+std::shared_ptr<core::controller::ControllerServiceNode> TestPlan::addController(const std::string &controller_name, const std::string &name) {
+ if (finalized) {
+ return nullptr;
+ }
+ std::lock_guard<std::recursive_mutex> guard(mutex);
+
+ utils::Identifier uuid;
+
+ utils::IdGenerator::getIdGenerator()->generate(uuid);
+
+ std::shared_ptr<core::controller::ControllerServiceNode> controller_service_node =
+ controller_services_provider_->createControllerService(controller_name, controller_name, name, true /*firstTimeAdded*/);
+ if (controller_service_node == nullptr) {
+ return nullptr;
+ }
+
+ controller_service_nodes_.push_back(controller_service_node);
+
+ controller_service_node->initialize();
+ controller_service_node->setUUID(uuid);
+ controller_service_node->setName(name);
+
+ return controller_service_node;
+}
+
bool TestPlan::setProperty(const std::shared_ptr<core::Processor> proc, const std::string &prop, const std::string &value, bool dynamic) {
std::lock_guard<std::recursive_mutex> guard(mutex);
int32_t i = 0;
@@ -172,6 +214,16 @@ bool TestPlan::setProperty(const std::shared_ptr<core::Processor> proc, const st
}
}
+bool TestPlan::setProperty(const std::shared_ptr<core::controller::ControllerServiceNode> controller_service_node, const std::string &prop, const std::string &value, bool dynamic /*= false*/) {
+ if (dynamic) {
+ controller_service_node->setDynamicProperty(prop, value);
+ return controller_service_node->getControllerServiceImplementation()->setDynamicProperty(prop, value);
+ } else {
+ controller_service_node->setProperty(prop, value);
+ return controller_service_node->getControllerServiceImplementation()->setProperty(prop, value);
+ }
+}
+
void TestPlan::reset(bool reschedule) {
std::lock_guard<std::recursive_mutex> guard(mutex);
process_sessions_.clear();
@@ -283,6 +335,10 @@ void TestPlan::finalize() {
}
}
+ for (auto& controller_service_node : controller_service_nodes_) {
+ controller_service_node->enable();
+ }
+
finalized = true;
}
diff --git a/libminifi/test/TestBase.h b/libminifi/test/TestBase.h
index 73d1430..6a7fe95 100644
--- a/libminifi/test/TestBase.h
+++ b/libminifi/test/TestBase.h
@@ -44,6 +44,7 @@
#include "core/ProcessContextBuilder.h"
#include "core/ProcessSession.h"
#include "core/ProcessorNode.h"
+#include "core/controller/ControllerServiceNode.h"
#include "core/reporting/SiteToSiteProvenanceReportingTask.h"
#include "core/state/nodes/FlowInformation.h"
#include "properties/Configure.h"
@@ -182,7 +183,6 @@ class LogTestController {
TestBootstrapLogger(std::shared_ptr<spdlog::logger> logger)
: Logger(logger) {
}
- ;
};
LogTestController()
: LogTestController(nullptr) {
@@ -228,7 +228,9 @@ class TestPlan {
public:
explicit TestPlan(std::shared_ptr<core::ContentRepository> content_repo, std::shared_ptr<core::Repository> flow_repo, std::shared_ptr<core::Repository> prov_repo,
- const std::shared_ptr<minifi::state::response::FlowVersion> &flow_version, const std::shared_ptr<minifi::Configure> &configuration);
+ const std::shared_ptr<minifi::state::response::FlowVersion> &flow_version, const std::shared_ptr<minifi::Configure> &configuration, const char* state_dir);
+
+ virtual ~TestPlan();
std::shared_ptr<core::Processor> addProcessor(const std::shared_ptr<core::Processor> &processor, const std::string &name,
core::Relationship relationship = core::Relationship("success", "description"), bool linkToPrevious = false) {
@@ -249,8 +251,12 @@ class TestPlan {
std::shared_ptr<core::Processor> addProcessor(const std::string &processor_name, utils::Identifier& uuid, const std::string &name, const std::initializer_list<core::Relationship>& relationships,
bool linkToPrevious = false);
+ std::shared_ptr<core::controller::ControllerServiceNode> addController(const std::string &controller_name, const std::string &name);
+
bool setProperty(const std::shared_ptr<core::Processor> proc, const std::string &prop, const std::string &value, bool dynamic = false);
+ bool setProperty(const std::shared_ptr<core::controller::ControllerServiceNode> controller_service_node, const std::string &prop, const std::string &value, bool dynamic = false);
+
void reset(bool reschedule = false);
bool runNextProcessor(std::function<void(const std::shared_ptr<core::ProcessContext>, const std::shared_ptr<core::ProcessSession>)> verify = nullptr);
@@ -277,10 +283,18 @@ class TestPlan {
return logger_;
}
- protected:
+ std::string getStateDir() {
+ return state_dir_;
+ }
+
+ std::shared_ptr<core::CoreComponentStateManagerProvider> getStateManagerProvider() {
+ return state_manager_provider_;
+ }
void finalize();
+ protected:
+
std::shared_ptr<minifi::Connection> buildFinalConnection(std::shared_ptr<core::Processor> processor, bool setDest = false);
std::shared_ptr<org::apache::nifi::minifi::io::StreamFactory> stream_factory;
@@ -292,8 +306,13 @@ class TestPlan {
std::shared_ptr<core::Repository> flow_repo_;
std::shared_ptr<core::Repository> prov_repo_;
+ std::shared_ptr<core::controller::ControllerServiceMap> controller_services_;
std::shared_ptr<core::controller::ControllerServiceProvider> controller_services_provider_;
+ std::shared_ptr<core::CoreComponentStateManagerProvider> state_manager_provider_;
+
+ std::string state_dir_;
+
std::recursive_mutex mutex;
std::atomic<bool> finalized;
@@ -303,6 +322,7 @@ class TestPlan {
std::shared_ptr<core::FlowFile> current_flowfile_;
std::shared_ptr<minifi::state::response::FlowVersion> flow_version_;
+ std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> controller_service_nodes_;
std::map<std::string, std::shared_ptr<core::Processor>> processor_mapping_;
std::vector<std::shared_ptr<core::Processor>> processor_queue_;
std::vector<std::shared_ptr<core::Processor>> configured_processors_;
@@ -330,7 +350,7 @@ class TestController {
flow_version_ = std::make_shared<minifi::state::response::FlowVersion>("test", "test", "test");
}
- std::shared_ptr<TestPlan> createPlan(std::shared_ptr<minifi::Configure> configuration = nullptr) {
+ std::shared_ptr<TestPlan> createPlan(std::shared_ptr<minifi::Configure> configuration = nullptr, const char* state_dir = nullptr) {
if (configuration == nullptr) {
configuration = std::make_shared<minifi::Configure>();
}
@@ -340,7 +360,7 @@ class TestController {
std::shared_ptr<core::Repository> flow_repo = std::make_shared<TestRepository>();
std::shared_ptr<core::Repository> repo = std::make_shared<TestRepository>();
- return std::make_shared<TestPlan>(content_repo, flow_repo, repo, flow_version_, configuration);
+ return std::make_shared<TestPlan>(content_repo, flow_repo, repo, flow_version_, configuration, state_dir);
}
void runSession(std::shared_ptr<TestPlan> &plan, bool runToCompletion = true, std::function<void(const std::shared_ptr<core::ProcessContext>&, const std::shared_ptr<core::ProcessSession>&)> verify =
diff --git a/libminifi/test/integration/IntegrationBase.h b/libminifi/test/integration/IntegrationBase.h
index 91bafe8..3310b97 100644
--- a/libminifi/test/integration/IntegrationBase.h
+++ b/libminifi/test/integration/IntegrationBase.h
@@ -89,6 +89,7 @@ class IntegrationBase {
uint64_t wait_time_;
std::string port, scheme, path;
std::string key_dir;
+ std::string state_dir;
};
IntegrationBase::IntegrationBase(uint64_t waitTime)
@@ -122,6 +123,11 @@ void IntegrationBase::run(std::string test_file_location) {
core::YamlConfiguration yaml_config(test_repo, test_repo, content_repo, stream_factory, configuration, test_file_location);
+ auto controller_service_provider = yaml_ptr->getControllerServiceProvider();
+ char state_dir_name_template[] = "/var/tmp/integrationstate.XXXXXX";
+ state_dir = utils::file::FileUtils::create_temp_directory(state_dir_name_template);
+ core::ProcessContext::getOrCreateDefaultStateManagerProvider(controller_service_provider, configuration, state_dir.c_str());
+
std::unique_ptr<core::ProcessGroup> ptr = yaml_config.getRoot(test_file_location);
std::shared_ptr<core::ProcessGroup> pg = std::shared_ptr<core::ProcessGroup>(ptr.get());
diff --git a/libminifi/test/keyvalue-tests/CMakeLists.txt b/libminifi/test/keyvalue-tests/CMakeLists.txt
new file mode 100644
index 0000000..dfafebe
--- /dev/null
+++ b/libminifi/test/keyvalue-tests/CMakeLists.txt
@@ -0,0 +1,42 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+file(GLOB KEYVALUE_TESTS "*.cpp")
+
+SET(KEYVALUE_INT_TEST_COUNT 0)
+
+FOREACH(testfile ${KEYVALUE_TESTS})
+ get_filename_component(testfilename "${testfile}" NAME_WE)
+ add_executable("${testfilename}" "${testfile}" )
+ target_include_directories(${testfilename} PRIVATE BEFORE "${CMAKE_SOURCE_DIR}/libminifi/test/")
+ createTests("${testfilename}")
+ target_wholearchive_library(${testfilename} minifi-standard-processors)
+ if (NOT DISABLE_ROCKSDB)
+ target_wholearchive_library(${testfilename} minifi-rocksdb-repos)
+ endif()
+
+ MATH(EXPR KEYVALUE_INT_TEST_COUNT "${KEYVALUE_INT_TEST_COUNT}+1")
+ENDFOREACH()
+
+message("-- Finished building ${KEYVALUE_INT_TEST_COUNT} keyvalue controller test file(s)...")
+
+add_test(NAME UnorderedMapPersistableKeyValueStoreServiceTest COMMAND PersistableKeyValueStoreServiceTest --config-yaml "${TEST_RESOURCES}/UnorderedMapPersistableKeyValueStoreServiceTest.yml")
+if (NOT DISABLE_ROCKSDB)
+ add_test(NAME RocksdDbPersistableKeyValueStoreServiceTest COMMAND PersistableKeyValueStoreServiceTest --config-yaml "${TEST_RESOURCES}/RocksDbPersistableKeyValueStoreServiceTest.yml")
+endif()
diff --git a/libminifi/test/keyvalue-tests/PersistableKeyValueStoreServiceTest.cpp b/libminifi/test/keyvalue-tests/PersistableKeyValueStoreServiceTest.cpp
new file mode 100644
index 0000000..a30de53
--- /dev/null
+++ b/libminifi/test/keyvalue-tests/PersistableKeyValueStoreServiceTest.cpp
@@ -0,0 +1,254 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#define CATCH_CONFIG_RUNNER
+#include "catch.hpp"
+
+#include <vector>
+#include <memory>
+#include <utility>
+#include <string>
+#include <array>
+#include "../TestBase.h"
+#include "../../controller/Controller.h"
+#include "core/controller/ControllerService.h"
+#include "core/ProcessGroup.h"
+#include "core/yaml/YamlConfiguration.h"
+#include "controllers/keyvalue/PersistableKeyValueStoreService.h"
+#include "controllers/keyvalue/AbstractAutoPersistingKeyValueStoreService.h"
+
+static std::string config_yaml; // NOLINT
+
+static inline void configYamlHandler(Catch::ConfigData&, const std::string& path) {
+ config_yaml = path;
+}
+
+int main(int argc, char* argv[]) {
+ Catch::Session session;
+
+ Catch::Clara::CommandLine<Catch::ConfigData>& cli = const_cast<Catch::Clara::CommandLine<Catch::ConfigData>&>(session.cli());
+ cli["--config-yaml"]
+ .describe("path to the config.yaml containing the PersistableKeyValueStoreService controller service configuration")
+ .bind(&configYamlHandler, "path");
+
+ int ret = session.applyCommandLine(argc, argv);
+ if (ret != 0) {
+ return ret;
+ }
+
+ if (config_yaml.empty()) {
+ std::cerr << "Missing --config-yaml <path>. It must contain the path to the config.yaml containing the PersistableKeyValueStoreService controller service configuration." << std::endl;
+ return -1;
+ }
+
+ return session.run();
+}
+
+class PersistableKeyValueStoreServiceTestsFixture {
+ public:
+ PersistableKeyValueStoreServiceTestsFixture() {
+ LogTestController::getInstance().setTrace<TestPlan>();
+ LogTestController::getInstance().setTrace<minifi::controllers::PersistableKeyValueStoreService>();
+ LogTestController::getInstance().setTrace<minifi::controllers::AbstractAutoPersistingKeyValueStoreService>();
+
+ // Create temporary directories
+ char format[] = "/var/tmp/state.XXXXXX";
+ state_dir = testController.createTempDirectory(format);
+ REQUIRE(false == state_dir.empty());
+ REQUIRE(0 == chdir(state_dir.c_str()));
+
+ loadYaml();
+ }
+
+ virtual ~PersistableKeyValueStoreServiceTestsFixture() {
+ LogTestController::getInstance().reset();
+ }
+
+ void loadYaml() {
+ controller.reset();
+ persistable_key_value_store_service_node.reset();
+
+ process_group.reset();
+ yaml_config.reset();
+
+ stream_factory.reset();
+ content_repo.reset();
+ test_flow_repo.reset();
+ test_repo.reset();
+ configuration.reset();
+
+ configuration = std::make_shared<minifi::Configure>();
+ test_repo = std::make_shared<TestRepository>();
+ test_flow_repo = std::make_shared<TestFlowRepository>();
+
+ configuration->set(minifi::Configure::nifi_flow_configuration_file, config_yaml);
+
+ content_repo = std::make_shared<core::repository::VolatileContentRepository>();
+ content_repo->initialize(configuration);
+ stream_factory = minifi::io::StreamFactory::getInstance(configuration);
+
+ yaml_config = std::unique_ptr<core::YamlConfiguration>(new core::YamlConfiguration(test_repo, test_repo, content_repo, stream_factory, configuration, config_yaml));
+
+ process_group = yaml_config->getRoot(config_yaml);
+ persistable_key_value_store_service_node = process_group->findControllerService("testcontroller");
+ REQUIRE(persistable_key_value_store_service_node != nullptr);
+ persistable_key_value_store_service_node->enable();
+
+ controller = std::dynamic_pointer_cast<minifi::controllers::PersistableKeyValueStoreService>(
+ persistable_key_value_store_service_node->getControllerServiceImplementation());
+ REQUIRE(controller != nullptr);
+ }
+
+ protected:
+ std::string state_dir;
+ std::shared_ptr<minifi::Configure> configuration;
+ std::shared_ptr<core::Repository> test_repo;
+ std::shared_ptr<core::Repository> test_flow_repo;
+ std::shared_ptr<core::ContentRepository> content_repo;
+ std::shared_ptr<minifi::io::StreamFactory> stream_factory;
+
+ std::unique_ptr<core::YamlConfiguration> yaml_config;
+ std::unique_ptr<core::ProcessGroup> process_group;
+
+ std::shared_ptr<core::controller::ControllerServiceNode> persistable_key_value_store_service_node;
+ std::shared_ptr<minifi::controllers::PersistableKeyValueStoreService> controller;
+
+ TestController testController;
+};
+
+
+TEST_CASE_METHOD(PersistableKeyValueStoreServiceTestsFixture, "PersistableKeyValueStoreServiceTestsFixture set and get", "[basic]") {
+ const char* key = "foobar";
+ const char* value = "234";
+ REQUIRE(true == controller->set(key, value));
+
+ SECTION("without persistence") {
+ }
+ SECTION("with persistence") {
+ controller->persist();
+ loadYaml();
+ }
+
+ std::string res;
+ REQUIRE(true == controller->get(key, res));
+ REQUIRE(value == res);
+}
+
+TEST_CASE_METHOD(PersistableKeyValueStoreServiceTestsFixture, "PersistableKeyValueStoreServiceTestsFixture special characters", "[basic]") {
+ const char* key = "[]{}()==\\=\n\n";
+ const char* value = ":./'\\=!\n=[]{}()";
+ REQUIRE(true == controller->set(key, value));
+
+ SECTION("without persistence") {
+ }
+ SECTION("with persistence") {
+ controller->persist();
+ loadYaml();
+ }
+
+ std::string res;
+ REQUIRE(true == controller->get(key, res));
+ REQUIRE(value == res);
+}
+
+TEST_CASE_METHOD(PersistableKeyValueStoreServiceTestsFixture, "PersistableKeyValueStoreServiceTestsFixture set and get all", "[basic]") {
+ const std::unordered_map<std::string, std::string> kvs = {
+ {"foobar", "234"},
+ {"buzz", "value"},
+ };
+ for (const auto& kv : kvs) {
+ REQUIRE(true == controller->set(kv.first, kv.second));
+ }
+
+ SECTION("without persistence") {
+ }
+ SECTION("with persistence") {
+ controller->persist();
+ loadYaml();
+ }
+
+ std::unordered_map<std::string, std::string> kvs_res;
+ REQUIRE(true == controller->get(kvs_res));
+ REQUIRE(kvs == kvs_res);
+}
+
+TEST_CASE_METHOD(PersistableKeyValueStoreServiceTestsFixture, "PersistableKeyValueStoreServiceTestsFixture set and overwrite", "[basic]") {
+ const char* key = "foobar";
+ const char* value = "234";
+ const char* new_value = "baz";
+ REQUIRE(true == controller->set(key, value));
+ REQUIRE(true == controller->set(key, new_value));
+
+ SECTION("without persistence") {
+ }
+ SECTION("with persistence") {
+ controller->persist();
+ loadYaml();
+ }
+
+ std::string res;
+ REQUIRE(true == controller->get(key, res));
+ REQUIRE(new_value == res);
+}
+
+TEST_CASE_METHOD(PersistableKeyValueStoreServiceTestsFixture, "PersistableKeyValueStoreServiceTestsFixture set and remove", "[basic]") {
+ const char* key = "foobar";
+ const char* value = "234";
+ REQUIRE(true == controller->set(key, value));
+ REQUIRE(true == controller->remove(key));
+
+ SECTION("without persistence") {
+ }
+ SECTION("with persistence") {
+ controller->persist();
+ loadYaml();
+ }
+
+ std::string res;
+ REQUIRE(false == controller->get(key, res));
+}
+
+
+TEST_CASE_METHOD(PersistableKeyValueStoreServiceTestsFixture, "PersistableKeyValueStoreServiceTestsFixture set and clear", "[basic]") {
+ const std::unordered_map<std::string, std::string> kvs = {
+ {"foobar", "234"},
+ {"buzz", "value"},
+ };
+ for (const auto& kv : kvs) {
+ REQUIRE(true == controller->set(kv.first, kv.second));
+ }
+ REQUIRE(true == controller->clear());
+
+ SECTION("without persistence") {
+ }
+ SECTION("with persistence") {
+ controller->persist();
+ loadYaml();
+ }
+
+ std::unordered_map<std::string, std::string> kvs_res;
+ REQUIRE(0U == kvs_res.size());
+
+ /* Make sure we can still insert after we cleared */
+ const char* key = "foo";
+ const char* value = "bar";
+ REQUIRE(true == controller->set(key, value));
+ std::string res;
+ REQUIRE(true == controller->get(key, res));
+ REQUIRE(value == res);
+}
diff --git a/extensions/rocksdb-repos/CMakeLists.txt b/libminifi/test/resources/RocksDbPersistableKeyValueStoreServiceTest.yml
similarity index 63%
copy from extensions/rocksdb-repos/CMakeLists.txt
copy to libminifi/test/resources/RocksDbPersistableKeyValueStoreServiceTest.yml
index 3c5395e..5819063 100644
--- a/extensions/rocksdb-repos/CMakeLists.txt
+++ b/libminifi/test/resources/RocksDbPersistableKeyValueStoreServiceTest.yml
@@ -16,16 +16,21 @@
# specific language governing permissions and limitations
# under the License.
#
+Flow Controller:
+ name: MiNiFi Flow
-include(${CMAKE_SOURCE_DIR}/extensions/ExtensionHeader.txt)
+Processors: []
-file(GLOB SOURCES "*.cpp")
+Connections: []
-add_library(minifi-rocksdb-repos STATIC ${SOURCES})
-set_property(TARGET minifi-rocksdb-repos PROPERTY POSITION_INDEPENDENT_CODE ON)
+Controller Services:
+ - name: testcontroller
+ id: 2438e3c8-015a-1000-79ca-83af40ec1994
+ class: RocksDbPersistableKeyValueStoreService
+ Properties:
+ Auto Persistence Interval:
+ - value: 0 sec
+ Directory:
+ - value: state
-target_link_libraries(minifi-rocksdb-repos ${LIBMINIFI} Threads::Threads)
-target_link_libraries(minifi-rocksdb-repos RocksDB::RocksDB)
-
-SET (ROCKSDB-REPOS minifi-rocksdb-repos PARENT_SCOPE)
-register_extension(minifi-rocksdb-repos)
+Remote Processing Groups: []
diff --git a/libminifi/test/resources/TestC2DescribeCoreComponentState.yml b/libminifi/test/resources/TestC2DescribeCoreComponentState.yml
new file mode 100644
index 0000000..8565917
--- /dev/null
+++ b/libminifi/test/resources/TestC2DescribeCoreComponentState.yml
@@ -0,0 +1,46 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+Flow Controller:
+ name: MiNiFi Flow
+ id: 2438e3c8-015a-1000-79ca-83af40ec1990
+Processors:
+ - name: TailFile1
+ id: 2438e3c8-015a-1000-79ca-83af40ec1993
+ class: org.apache.nifi.processors.standard.TailFile
+ max concurrent tasks: 1
+ scheduling strategy: TIMER_DRIVEN
+ scheduling period: 1 sec
+ penalization period: 30 sec
+ yield period: 1 sec
+ run duration nanos: 0
+ auto-terminated relationships list: success
+ - name: TailFile2
+ id: 2438e3c8-015a-1000-79ca-83af40ec1994
+ class: org.apache.nifi.processors.standard.TailFile
+ max concurrent tasks: 1
+ scheduling strategy: TIMER_DRIVEN
+ scheduling period: 1 sec
+ penalization period: 30 sec
+ yield period: 1 sec
+ run duration nanos: 0
+ auto-terminated relationships list: success
+
+Connections: []
+
+Remote Processing Groups: []
diff --git a/extensions/rocksdb-repos/CMakeLists.txt b/libminifi/test/resources/UnorderedMapPersistableKeyValueStoreServiceTest.yml
similarity index 63%
copy from extensions/rocksdb-repos/CMakeLists.txt
copy to libminifi/test/resources/UnorderedMapPersistableKeyValueStoreServiceTest.yml
index 3c5395e..61d8f26 100644
--- a/extensions/rocksdb-repos/CMakeLists.txt
+++ b/libminifi/test/resources/UnorderedMapPersistableKeyValueStoreServiceTest.yml
@@ -16,16 +16,21 @@
# specific language governing permissions and limitations
# under the License.
#
+Flow Controller:
+ name: MiNiFi Flow
-include(${CMAKE_SOURCE_DIR}/extensions/ExtensionHeader.txt)
+Processors: []
-file(GLOB SOURCES "*.cpp")
+Connections: []
-add_library(minifi-rocksdb-repos STATIC ${SOURCES})
-set_property(TARGET minifi-rocksdb-repos PROPERTY POSITION_INDEPENDENT_CODE ON)
+Controller Services:
+ - name: testcontroller
+ id: 2438e3c8-015a-1000-79ca-83af40ec1994
+ class: UnorderedMapPersistableKeyValueStoreService
+ Properties:
+ Auto Persistence Interval:
+ - value: 0 sec
+ File:
+ - value: state.txt
-target_link_libraries(minifi-rocksdb-repos ${LIBMINIFI} Threads::Threads)
-target_link_libraries(minifi-rocksdb-repos RocksDB::RocksDB)
-
-SET (ROCKSDB-REPOS minifi-rocksdb-repos PARENT_SCOPE)
-register_extension(minifi-rocksdb-repos)
+Remote Processing Groups: []