You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ab...@apache.org on 2020/04/27 19:49:10 UTC

[nifi-minifi-cpp] branch master updated: MINIFICPP-550 - Implement RocksDB controller service and component state storage

This is an automated email from the ASF dual-hosted git repository.

aboda pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git


The following commit(s) were added to refs/heads/master by this push:
     new fb2bf6a  MINIFICPP-550 - Implement RocksDB controller service and component state storage
fb2bf6a is described below

commit fb2bf6ac31e48f88b5b01cd2b75a1b74ca5c1e30
Author: Daniel Bakai <ba...@gmail.com>
AuthorDate: Thu Jul 4 12:30:37 2019 +0200

    MINIFICPP-550 - Implement RocksDB controller service and component state storage
    
    Signed-off-by: Arpad Boda <ab...@apache.org>
    
    This closes #605
---
 CMakeLists.txt                                     |   3 +
 conf/minifi.properties                             |  11 +
 .../tests/C2DescribeCoreComponentStateTest.cpp     | 116 +++++++
 extensions/http-curl/tests/CMakeLists.txt          |   1 +
 extensions/rocksdb-repos/CMakeLists.txt            |   2 +-
 extensions/rocksdb-repos/RocksDBLoader.h           |   5 +
 .../RocksDbPersistableKeyValueStoreService.cpp     | 193 +++++++++++
 .../RocksDbPersistableKeyValueStoreService.h       |  87 +++++
 extensions/sftp/processors/ListSFTP.cpp            | 385 +++++++++------------
 extensions/sftp/processors/ListSFTP.h              |  14 +-
 extensions/sftp/tests/CMakeLists.txt               |   3 +
 extensions/sftp/tests/FetchSFTPTests.cpp           |   4 +-
 extensions/sftp/tests/ListSFTPTests.cpp            | 119 ++++++-
 extensions/sftp/tests/ListThenFetchSFTPTests.cpp   |   4 +-
 extensions/sftp/tests/PutSFTPTests.cpp             |   6 +-
 extensions/sql/processors/ExecuteSQL.cpp           |   4 +-
 extensions/sql/processors/ExecuteSQL.h             |   4 +-
 extensions/sql/processors/PutSQL.cpp               |   2 +-
 extensions/sql/processors/PutSQL.h                 |   2 +-
 extensions/sql/processors/QueryDatabaseTable.cpp   | 177 ++++------
 extensions/sql/processors/QueryDatabaseTable.h     |   8 +-
 extensions/standard-processors/CMakeLists.txt      |   2 +-
 .../UnorderedMapKeyValueStoreService.cpp           | 111 ++++++
 .../controllers/UnorderedMapKeyValueStoreService.h |  75 ++++
 ...UnorderedMapPersistableKeyValueStoreService.cpp | 260 ++++++++++++++
 .../UnorderedMapPersistableKeyValueStoreService.h  |  88 +++++
 .../standard-processors/processors/TailFile.cpp    | 114 ++++--
 .../standard-processors/processors/TailFile.h      |   9 +-
 .../tests/unit/TailFileTests.cpp                   | 254 +++++++-------
 extensions/windows-event-log/Bookmark.cpp          | 116 +++----
 extensions/windows-event-log/Bookmark.h            |   9 +-
 .../windows-event-log/ConsumeWindowsEventLog.cpp   |  13 +-
 .../windows-event-log/ConsumeWindowsEventLog.h     |   4 +-
 .../windows-event-log/wel/UnicodeConversion.h      |   5 +
 libminifi/CMakeLists.txt                           |   2 +-
 libminifi/include/FlowController.h                 |  10 +
 .../AbstractAutoPersistingKeyValueStoreService.h   |  75 ++++
 .../AbstractCoreComponentStateManagerProvider.h    |  76 ++++
 .../controllers/keyvalue/KeyValueStoreService.h    |  66 ++++
 .../keyvalue/PersistableKeyValueStoreService.h     |  56 +++
 libminifi/include/core/ConfigurableComponent.h     |   8 +-
 libminifi/include/core/CoreComponentState.h        |  67 ++++
 libminifi/include/core/ProcessContext.h            | 113 ++++++
 .../include/core/controller/ControllerService.h    |   3 +
 .../core/controller/ControllerServiceProvider.h    |   7 +
 .../controller/StandardControllerServiceProvider.h |  14 +-
 libminifi/include/properties/Configure.h           |   5 +
 libminifi/src/Configure.cpp                        |   3 +
 libminifi/src/FlowController.cpp                   |  16 +
 libminifi/src/c2/C2Agent.cpp                       |  47 +++
 .../AbstractAutoPersistingKeyValueStoreService.cpp | 135 ++++++++
 .../AbstractCoreComponentStateManagerProvider.cpp  | 143 ++++++++
 .../controllers/keyvalue/KeyValueStoreService.cpp  |  52 +--
 .../keyvalue/PersistableKeyValueStoreService.cpp   |  60 ++++
 libminifi/src/core/ConfigurableComponent.cpp       |  13 +
 libminifi/src/core/FlowConfiguration.cpp           |   4 +
 libminifi/test/TestBase.cpp                        |  58 +++-
 libminifi/test/TestBase.h                          |  30 +-
 libminifi/test/integration/IntegrationBase.h       |   6 +
 libminifi/test/keyvalue-tests/CMakeLists.txt       |  42 +++
 .../PersistableKeyValueStoreServiceTest.cpp        | 254 ++++++++++++++
 .../RocksDbPersistableKeyValueStoreServiceTest.yml |  23 +-
 .../resources/TestC2DescribeCoreComponentState.yml |  46 +++
 ...deredMapPersistableKeyValueStoreServiceTest.yml |  23 +-
 64 files changed, 2977 insertions(+), 690 deletions(-)

diff --git a/CMakeLists.txt b/CMakeLists.txt
index 5283124..f394bd2 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -705,6 +705,9 @@ if (NOT SKIP_TESTS)
 	include(BuildTests)
 endif()
 
+## Add KeyValueStorageService tests
+registerTest("${TEST_DIR}/keyvalue-tests")
+
 include(BuildDocs)
 
 include(DockerConfig)
diff --git a/conf/minifi.properties b/conf/minifi.properties
index 3fa2c19..9580c75 100644
--- a/conf/minifi.properties
+++ b/conf/minifi.properties
@@ -37,6 +37,17 @@ nifi.database.content.repository.directory.default=${MINIFI_HOME}/content_reposi
 #nifi.rest.api.user.name=admin
 #nifi.rest.api.password=password
 
+# State storage configuration #
+## The default state storage can be overridden by specifying a controller service instance
+## that implements CoreComponentStateManagementProvider
+## (e.g. an instance of RocksDbPersistableKeyValueStoreService or UnorderedMapPersistableKeyValueStoreService)
+#nifi.state.management.provider.local=
+## To make the default state storage persist every state change, set this to true
+## this comes at a performance penalty, but makes sure no state is lost even on unclean shutdowns
+#nifi.state.management.provider.local.always.persist=true
+## To change the frequency at which the default state storage is persisted, modify the following
+#nifi.state.management.provider.local.auto.persistence.interval=1 min
+
 ## Enabling C2 Uncomment each of the following options
 ## define those with missing options
 #nifi.c2.enable=true
diff --git a/extensions/http-curl/tests/C2DescribeCoreComponentStateTest.cpp b/extensions/http-curl/tests/C2DescribeCoreComponentStateTest.cpp
new file mode 100644
index 0000000..b0e7d7d
--- /dev/null
+++ b/extensions/http-curl/tests/C2DescribeCoreComponentStateTest.cpp
@@ -0,0 +1,116 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#undef NDEBUG
+#include <string>
+#include "TestBase.h"
+#include "HTTPIntegrationBase.h"
+#include "HTTPHandlers.h"
+#include "processors/TailFile.h"
+#include "state/ProcessorController.h"
+#include "utils/file/FileUtils.h"
+
+class VerifyC2DescribeCoreComponentState : public VerifyC2Describe {
+ public:
+  explicit VerifyC2DescribeCoreComponentState(bool isSecure)
+      : VerifyC2Describe(isSecure) {
+    char format[] = "/var/tmp/ssth.XXXXXX";
+    temp_dir_ = testController.createTempDirectory(format);
+
+    test_file_1_ = utils::file::FileUtils::concat_path(temp_dir_, "test1.txt");
+    test_file_2_ = utils::file::FileUtils::concat_path(temp_dir_, "test2.txt");
+
+    std::ofstream f1(test_file_1_);
+    f1 << "foo";
+
+    std::ofstream f2(test_file_2_);
+    f2 << "foobar";
+  }
+
+ protected:
+  void updateProperties(std::shared_ptr<minifi::FlowController> flow_controller) override {
+    std::dynamic_pointer_cast<minifi::state::ProcessorController>(flow_controller->getComponents("TailFile1")[0])
+        ->getProcessor()->setProperty(minifi::processors::TailFile::FileName, test_file_1_);
+    std::dynamic_pointer_cast<minifi::state::ProcessorController>(flow_controller->getComponents("TailFile2")[0])
+        ->getProcessor()->setProperty(minifi::processors::TailFile::FileName, test_file_2_);
+  }
+
+  TestController testController;
+  std::string temp_dir_;
+  std::string test_file_1_;
+  std::string test_file_2_;
+};
+
+class DescribeCoreComponentStateHandler: public HeartbeatHandler {
+ public:
+
+  explicit DescribeCoreComponentStateHandler(bool isSecure)
+      : HeartbeatHandler(isSecure) {
+  }
+
+  virtual void handleHeartbeat(const rapidjson::Document&, struct mg_connection * conn) {
+    sendHeartbeatResponse("DESCRIBE", "corecomponentstate", "889345", conn);
+  }
+
+  virtual void handleAcknowledge(const rapidjson::Document& root) {
+    assert(root.HasMember("corecomponentstate"));
+
+    auto assertExpectedTailFileState = [&](const char* uuid, const char* name, const char* position) {
+      assert(root["corecomponentstate"].HasMember(uuid));
+      const auto& tf = root["corecomponentstate"][uuid];
+      assert(tf.HasMember("file.0.name"));
+      assert(std::string(tf["file.0.name"].GetString()) == name);
+      assert(tf.HasMember("file.0.position"));
+      assert(std::string(tf["file.0.position"].GetString()) == position);
+      assert(tf.HasMember("file.0.current"));
+      assert(strlen(tf["file.0.current"].GetString()) > 0U);
+    };
+
+    assertExpectedTailFileState("2438e3c8-015a-1000-79ca-83af40ec1993", "test1.txt", "3");
+    assertExpectedTailFileState("2438e3c8-015a-1000-79ca-83af40ec1994", "test2.txt", "6");
+  }
+};
+
+int main(int argc, char **argv) {
+  std::string key_dir, test_file_location, url;
+  url = "http://localhost:0/api/heartbeat";
+  if (argc > 1) {
+    test_file_location = argv[1];
+    if (argc > 2) {
+      url = "https://localhost:0/api/heartbeat";
+      key_dir = argv[2];
+    }
+  }
+
+  bool isSecure = false;
+  if (url.find("https") != std::string::npos) {
+    isSecure = true;
+  }
+
+  VerifyC2DescribeCoreComponentState harness(isSecure);
+
+  harness.setKeyDir(key_dir);
+
+  DescribeCoreComponentStateHandler responder(isSecure);
+
+  harness.setUrl(url, &responder);
+
+  harness.run(test_file_location);
+
+  return 0;
+}
diff --git a/extensions/http-curl/tests/CMakeLists.txt b/extensions/http-curl/tests/CMakeLists.txt
index ff848b5..f01c4f9 100644
--- a/extensions/http-curl/tests/CMakeLists.txt
+++ b/extensions/http-curl/tests/CMakeLists.txt
@@ -73,6 +73,7 @@ add_test(NAME HttpGetIntegrationTest COMMAND HttpGetIntegrationTest "${TEST_RESO
 add_test(NAME C2UpdateTest COMMAND C2UpdateTest "${TEST_RESOURCES}/TestHTTPGet.yml"  "${TEST_RESOURCES}/")
 add_test(NAME C2JstackTest COMMAND C2JstackTest "${TEST_RESOURCES}/TestHTTPGet.yml"  "${TEST_RESOURCES}/")
 add_test(NAME C2DescribeManifestTest COMMAND C2DescribeManifestTest "${TEST_RESOURCES}/TestHTTPGet.yml"  "${TEST_RESOURCES}/")
+add_test(NAME C2DescribeCoreComponentStateTest COMMAND C2DescribeCoreComponentStateTest "${TEST_RESOURCES}/TestC2DescribeCoreComponentState.yml"  "${TEST_RESOURCES}/")
 add_test(NAME C2UpdateAgentTest COMMAND C2UpdateAgentTest "${TEST_RESOURCES}/TestHTTPGet.yml"  "${TEST_RESOURCES}/")
 add_test(NAME C2FailedUpdateTest COMMAND C2FailedUpdateTest "${TEST_RESOURCES}/TestHTTPGet.yml" "${TEST_RESOURCES}/TestBad.yml"  "${TEST_RESOURCES}/")
 add_test(NAME C2NullConfiguration COMMAND C2NullConfiguration "${TEST_RESOURCES}/TestNull.yml"  "${TEST_RESOURCES}/")
diff --git a/extensions/rocksdb-repos/CMakeLists.txt b/extensions/rocksdb-repos/CMakeLists.txt
index 3c5395e..cd6b9ab 100644
--- a/extensions/rocksdb-repos/CMakeLists.txt
+++ b/extensions/rocksdb-repos/CMakeLists.txt
@@ -19,7 +19,7 @@
 
 include(${CMAKE_SOURCE_DIR}/extensions/ExtensionHeader.txt) 
 
-file(GLOB SOURCES  "*.cpp")
+file(GLOB SOURCES  "*.cpp" "controllers/*.cpp")
 
 add_library(minifi-rocksdb-repos STATIC ${SOURCES})
 set_property(TARGET minifi-rocksdb-repos PROPERTY POSITION_INDEPENDENT_CODE ON)
diff --git a/extensions/rocksdb-repos/RocksDBLoader.h b/extensions/rocksdb-repos/RocksDBLoader.h
index 4d3f30f..5ed957f 100644
--- a/extensions/rocksdb-repos/RocksDBLoader.h
+++ b/extensions/rocksdb-repos/RocksDBLoader.h
@@ -22,6 +22,7 @@
 #include "FlowFileRepository.h"
 #include "ProvenanceRepository.h"
 #include "RocksDbStream.h"
+#include "controllers/RocksDbPersistableKeyValueStoreService.h"
 #include "core/ClassLoader.h"
 
 class RocksDBFactory : public core::ObjectFactory {
@@ -50,9 +51,11 @@ class RocksDBFactory : public core::ObjectFactory {
     class_names.push_back("DatabaseContentRepository");
     class_names.push_back("FlowFileRepository");
     class_names.push_back("ProvenanceRepository");
+    class_names.push_back("RocksDbPersistableKeyValueStoreService");
     class_names.push_back("databasecontentrepository");
     class_names.push_back("flowfilerepository");
     class_names.push_back("provenancerepository");
+    class_names.push_back("rocksdbpersistablekeyvaluestoreservice");
     return class_names;
   }
 
@@ -65,6 +68,8 @@ class RocksDBFactory : public core::ObjectFactory {
       return std::unique_ptr<ObjectFactory>(new core::DefautObjectFactory<core::repository::FlowFileRepository>());
     } else if (name == "provenancerepository") {
       return std::unique_ptr<ObjectFactory>(new core::DefautObjectFactory<minifi::provenance::ProvenanceRepository>());
+    } else if (name == "rocksdbpersistablekeyvaluestoreservice") {
+      return std::unique_ptr<ObjectFactory>(new core::DefautObjectFactory<minifi::controllers::RocksDbPersistableKeyValueStoreService>());
     } else {
       return nullptr;
     }
diff --git a/extensions/rocksdb-repos/controllers/RocksDbPersistableKeyValueStoreService.cpp b/extensions/rocksdb-repos/controllers/RocksDbPersistableKeyValueStoreService.cpp
new file mode 100644
index 0000000..ef04230
--- /dev/null
+++ b/extensions/rocksdb-repos/controllers/RocksDbPersistableKeyValueStoreService.cpp
@@ -0,0 +1,193 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "RocksDbPersistableKeyValueStoreService.h"
+
+#include "utils/StringUtils.h"
+
+#include <fstream>
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace controllers {
+
+core::Property RocksDbPersistableKeyValueStoreService::Directory(
+    core::PropertyBuilder::createProperty("Directory")->withDescription("Path to a directory for the database")
+        ->isRequired(true)->build());
+
+RocksDbPersistableKeyValueStoreService::RocksDbPersistableKeyValueStoreService(const std::string& name, const std::string& id)
+    : controllers::KeyValueStoreService(name, id)
+    , controllers::AbstractAutoPersistingKeyValueStoreService(name, id)
+    , logger_(logging::LoggerFactory<RocksDbPersistableKeyValueStoreService>::getLogger()) {
+}
+
+RocksDbPersistableKeyValueStoreService::RocksDbPersistableKeyValueStoreService(const std::string& name, utils::Identifier uuid /*= utils::Identifier()*/)
+    : controllers::KeyValueStoreService(name, uuid)
+    , controllers::AbstractAutoPersistingKeyValueStoreService(name, uuid)
+    , logger_(logging::LoggerFactory<RocksDbPersistableKeyValueStoreService>::getLogger()) {
+}
+
+void RocksDbPersistableKeyValueStoreService::initialize() {
+  AbstractAutoPersistingKeyValueStoreService::initialize();
+  std::set<core::Property> supportedProperties;
+  supportedProperties.insert(Directory);
+  updateSupportedProperties(supportedProperties);
+}
+
+void RocksDbPersistableKeyValueStoreService::onEnable() {
+  if (configuration_ == nullptr) {
+    logger_->log_debug("Cannot enable RocksDbPersistableKeyValueStoreService");
+    return;
+  }
+
+  AbstractAutoPersistingKeyValueStoreService::onEnable();
+
+  if (!getProperty(Directory.getName(), directory_)) {
+    logger_->log_error("Invalid or missing property: Directory");
+    return;
+  }
+
+  db_.reset();
+  rocksdb::Options options;
+  options.create_if_missing = true;
+  options.use_direct_io_for_flush_and_compaction = true;
+  options.use_direct_reads = true;
+  // Use the same buffer settings as the FlowFileRepository
+  options.write_buffer_size = 8 << 20;
+  options.max_write_buffer_number = 20;
+  options.min_write_buffer_number_to_merge = 1;
+  if (!always_persist_) {
+    options.manual_wal_flush = true;
+  }
+  rocksdb::DB* db = nullptr;
+  rocksdb::Status status = rocksdb::DB::Open(options, directory_, &db);
+  if (status.ok()) {
+    db_.reset(db);
+    logger_->log_trace("Successfully opened RocksDB database at %s", directory_.c_str());
+  } else {
+    logger_->log_error("Failed to open RocksDB database at %s, error: %s", directory_.c_str(), status.getState());
+    return;
+  }
+
+  if (always_persist_) {
+    default_write_options.sync = true;
+  }
+
+  logger_->log_trace("Enabled RocksDbPersistableKeyValueStoreService");
+}
+
+void RocksDbPersistableKeyValueStoreService::notifyStop() {
+  AbstractAutoPersistingKeyValueStoreService::notifyStop();
+
+  db_.reset();
+}
+
+bool RocksDbPersistableKeyValueStoreService::set(const std::string& key, const std::string& value) {
+  if (!db_) {
+    return false;
+  }
+  rocksdb::Status status = db_->Put(default_write_options, key, value);
+  if (!status.ok()) {
+    logger_->log_error("Failed to Put key %s to RocksDB database at %s, error: %s", key.c_str(), directory_.c_str(), status.getState());
+    return false;
+  }
+  return true;
+}
+
+bool RocksDbPersistableKeyValueStoreService::get(const std::string& key, std::string& value) {
+  if (!db_) {
+    return false;
+  }
+  rocksdb::Status status = db_->Get(rocksdb::ReadOptions(), key, &value);
+  if (!status.ok()) {
+    logger_->log_error("Failed to Get key %s from RocksDB database at %s, error: %s", key.c_str(), directory_.c_str(), status.getState());
+    return false;
+  }
+  return true;
+}
+
+bool RocksDbPersistableKeyValueStoreService::get(std::unordered_map<std::string, std::string>& kvs) {
+  if (!db_) {
+    return false;
+  }
+  kvs.clear();
+  std::unique_ptr<rocksdb::Iterator> it = std::unique_ptr<rocksdb::Iterator>(db_->NewIterator(rocksdb::ReadOptions()));
+  for (it->SeekToFirst(); it->Valid(); it->Next()) {
+    kvs.emplace(it->key().ToString(), it->value().ToString());
+  }
+  if (!it->status().ok()) {
+    logger_->log_error("Encountered error when iterating through RocksDB database at %s, error: %s", directory_.c_str(), it->status().getState());
+    return false;
+  }
+  return true;
+}
+
+bool RocksDbPersistableKeyValueStoreService::remove(const std::string& key) {
+  if (!db_) {
+    return false;
+  }
+  rocksdb::Status status = db_->Delete(default_write_options, key);
+  if (!status.ok()) {
+    logger_->log_error("Failed to Delete from RocksDB database at %s, error: %s", directory_.c_str(), status.getState());
+    return false;
+  }
+  return true;
+}
+
+bool RocksDbPersistableKeyValueStoreService::clear() {
+  if (!db_) {
+    return false;
+  }
+  std::unique_ptr<rocksdb::Iterator> it = std::unique_ptr<rocksdb::Iterator>(db_->NewIterator(rocksdb::ReadOptions()));
+  for (it->SeekToFirst(); it->Valid(); it->Next()) {
+    rocksdb::Status status = db_->Delete(default_write_options, it->key());
+    if (!status.ok()) {
+      logger_->log_error("Failed to Delete from RocksDB database at %s, error: %s", directory_.c_str(), status.getState());
+      return false;
+    }
+  }
+  if (!it->status().ok()) {
+    logger_->log_error("Encountered error when iterating through RocksDB database at %s, error: %s", directory_.c_str(), it->status().getState());
+    return false;
+  }
+  return true;
+}
+
+bool RocksDbPersistableKeyValueStoreService::update(const std::string& key, const std::function<bool(bool /*exists*/, std::string& /*value*/)>& update_func) {
+  if (!db_) {
+    return false;
+  }
+  throw std::logic_error("Unsupported method");
+}
+
+bool RocksDbPersistableKeyValueStoreService::persist() {
+  if (!db_) {
+    return false;
+  }
+  if (always_persist_) {
+    return true;
+  }
+  return db_->FlushWAL(true /*sync*/).ok();
+}
+
+} /* namespace controllers */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
diff --git a/extensions/rocksdb-repos/controllers/RocksDbPersistableKeyValueStoreService.h b/extensions/rocksdb-repos/controllers/RocksDbPersistableKeyValueStoreService.h
new file mode 100644
index 0000000..65ea554
--- /dev/null
+++ b/extensions/rocksdb-repos/controllers/RocksDbPersistableKeyValueStoreService.h
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_CONTROLLERS_KEYVALUE_RocksDbPersistableKeyValueStoreService_H_
+#define LIBMINIFI_INCLUDE_CONTROLLERS_KEYVALUE_RocksDbPersistableKeyValueStoreService_H_
+
+#include "controllers/keyvalue/AbstractAutoPersistingKeyValueStoreService.h"
+#include "core/Core.h"
+#include "properties/Configure.h"
+#include "core/logging/Logger.h"
+#include "core/logging/LoggerConfiguration.h"
+
+#include "rocksdb/db.h"
+#include "rocksdb/options.h"
+#include "rocksdb/slice.h"
+
+#include <unordered_map>
+#include <string>
+#include <mutex>
+#include <memory>
+#include <utility>
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace controllers {
+
+class RocksDbPersistableKeyValueStoreService : public AbstractAutoPersistingKeyValueStoreService {
+ public:
+  explicit RocksDbPersistableKeyValueStoreService(const std::string& name, const std::string& id);
+  explicit RocksDbPersistableKeyValueStoreService(const std::string& name, utils::Identifier uuid = utils::Identifier());
+
+  virtual ~RocksDbPersistableKeyValueStoreService() = default;
+
+  static core::Property Directory;
+
+  virtual void initialize() override;
+  virtual void onEnable() override;
+  virtual void notifyStop() override;
+
+  virtual bool set(const std::string& key, const std::string& value) override;
+
+  virtual bool get(const std::string& key, std::string& value) override;
+
+  virtual bool get(std::unordered_map<std::string, std::string>& kvs) override;
+
+  virtual bool remove(const std::string& key) override;
+
+  virtual bool clear() override;
+
+  virtual bool update(const std::string& key, const std::function<bool(bool /*exists*/, std::string& /*value*/)>& update_func) override;
+
+  virtual bool persist() override;
+
+ protected:
+  std::string directory_;
+
+  std::unique_ptr<rocksdb::DB> db_;
+  rocksdb::WriteOptions default_write_options;
+
+ private:
+  std::shared_ptr<logging::Logger> logger_;
+};
+
+REGISTER_RESOURCE(RocksDbPersistableKeyValueStoreService, "A key-value service implemented by RocksDB");
+
+} /* namespace controllers */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif /* LIBMINIFI_INCLUDE_CONTROLLERS_KEYVALUE_RocksDbPersistableKeyValueStoreService_H_ */
diff --git a/extensions/sftp/processors/ListSFTP.cpp b/extensions/sftp/processors/ListSFTP.cpp
index 6a00d03..6ac89f2 100644
--- a/extensions/sftp/processors/ListSFTP.cpp
+++ b/extensions/sftp/processors/ListSFTP.cpp
@@ -125,10 +125,6 @@ core::Property ListSFTP::MinimumFileSize(
 core::Property ListSFTP::MaximumFileSize(
     core::PropertyBuilder::createProperty("Maximum File Size")->withDescription("The maximum size that a file must be in order to be pulled")
         ->isRequired(false)->build());
-core::Property ListSFTP::StateFile(
-    core::PropertyBuilder::createProperty("State File")->withDescription("Specifies the file that should be used for storing state about"
-                                                                         " what data has been ingested so that upon restart MiNiFi can resume from where it left off")
-        ->isRequired(true)->withDefaultValue("ListSFTP")->build());
 
 core::Relationship ListSFTP::Success("success", "All FlowFiles that are received are routed to success");
 
@@ -170,7 +166,6 @@ void ListSFTP::initialize() {
   properties.insert(MaximumFileAge);
   properties.insert(MinimumFileSize);
   properties.insert(MaximumFileSize);
-  properties.insert(StateFile);
   setSupportedProperties(properties);
 
   // Set the supported relationships
@@ -203,6 +198,11 @@ ListSFTP::~ListSFTP() {
 void ListSFTP::onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) {
   parseCommonPropertiesOnSchedule(context);
 
+  state_manager_ = context->getStateManager();
+  if (state_manager_ == nullptr) {
+    throw Exception(PROCESSOR_EXCEPTION, "Failed to get StateManager");
+  }
+
   std::string value;
   context->getProperty(ListingStrategy.getName(), listing_strategy_);
   if (!last_listing_strategy_.empty() && last_listing_strategy_ != listing_strategy_) {
@@ -270,45 +270,6 @@ void ListSFTP::onSchedule(const std::shared_ptr<core::ProcessContext> &context,
       logger_->log_error("Maximum File Size attribute is invalid");
     }
   }
-  context->getProperty(StateFile.getName(), value);
-  if (listing_strategy_ == LISTING_STRATEGY_TRACKING_TIMESTAMPS) {
-    std::stringstream ss;
-    ss << value << "." << getUUIDStr() << ".TrackingTimestamps";
-    auto new_tracking_timestamps_state_filename = ss.str();
-    if (new_tracking_timestamps_state_filename != tracking_timestamps_state_filename_) {
-      if (!tracking_timestamps_state_filename_.empty()) {
-        if (unlink(tracking_timestamps_state_filename_.c_str()) != 0) {
-          logger_->log_error("Unable to delete old Tracking Timestamps state file \"%s\"",
-                             tracking_timestamps_state_filename_.c_str());
-        }
-      }
-    }
-    tracking_timestamps_state_filename_ = new_tracking_timestamps_state_filename;
-  } else if (listing_strategy_ == LISTING_STRATEGY_TRACKING_ENTITIES) {
-    std::stringstream ss;
-    ss << value << "." << getUUIDStr() << ".TrackingEntities";
-    auto new_tracking_entities_state_filename = ss.str();
-    ss << ".json";
-    auto new_tracking_entities_state_json_filename = ss.str();
-    if (new_tracking_entities_state_filename != tracking_entities_state_filename_) {
-      if (!tracking_entities_state_filename_.empty()) {
-        if (unlink(tracking_entities_state_filename_.c_str()) != 0) {
-          logger_->log_error("Unable to delete old Tracking Entities state file \"%s\"",
-                             tracking_entities_state_filename_.c_str());
-        }
-      }
-      if (!tracking_entities_state_json_filename_.empty()) {
-        if (unlink(tracking_entities_state_json_filename_.c_str()) != 0) {
-          logger_->log_error("Unable to delete old Tracking Entities json state file \"%s\"",
-                             tracking_entities_state_json_filename_.c_str());
-        }
-      }
-    }
-    tracking_entities_state_filename_ = new_tracking_entities_state_filename;
-    tracking_entities_state_json_filename_ = new_tracking_entities_state_json_filename;
-  } else {
-    logger_->log_error("Unknown Listing Strategy: \"%s\"", listing_strategy_.c_str());
-  }
 
   startKeepaliveThreadIfNeeded();
 }
@@ -520,31 +481,24 @@ ListSFTP::ListedEntity::ListedEntity(uint64_t timestamp_, uint64_t size_)
     , size(size_) {
 }
 
-bool ListSFTP::persistTrackingTimestampsCache(const std::string& hostname, const std::string& username, const std::string& remote_path) {
-  std::ofstream file(tracking_timestamps_state_filename_);
-  if (!file.is_open()) {
-    logger_->log_error("Failed to store state to Tracking Timestamps state file \"%s\"", tracking_timestamps_state_filename_.c_str());
-    return false;
-  }
-  file << "hostname=" << hostname << "\n";
-  file << "username=" << username << "\n";
-  file << "remote_path=" << remote_path << "\n";
-  file << "listing.timestamp=" << last_listed_latest_entry_timestamp_ << "\n";
-  file << "processed.timestamp=" << last_processed_latest_entry_timestamp_ << "\n";
+bool ListSFTP::persistTrackingTimestampsCache(const std::shared_ptr<core::ProcessContext>& context, const std::string& hostname, const std::string& username, const std::string& remote_path) {
+  std::unordered_map<std::string, std::string> state;
+  state["listing_strategy"] = LISTING_STRATEGY_TRACKING_TIMESTAMPS;
+  state["hostname"] = hostname;
+  state["username"] = username;
+  state["remote_path"] = remote_path;
+  state["listing.timestamp"] = std::to_string(last_listed_latest_entry_timestamp_);
+  state["processed.timestamp"] = std::to_string(last_processed_latest_entry_timestamp_);
   size_t i = 0;
   for (const auto& identifier : latest_identifiers_processed_) {
-    file << "id." << i << "=" << identifier << "\n";
+    state["id." + std::to_string(i)] = identifier;
     ++i;
   }
-  return true;
+  return state_manager_->set(state);
 }
 
-bool ListSFTP::updateFromTrackingTimestampsCache(const std::string& hostname, const std::string& username, const std::string& remote_path) {
-  std::ifstream file(tracking_timestamps_state_filename_);
-  if (!file.is_open()) {
-    logger_->log_error("Failed to open Tracking Timestamps state file \"%s\"", tracking_timestamps_state_filename_.c_str());
-    return false;
-  }
+bool ListSFTP::updateFromTrackingTimestampsCache(const std::shared_ptr<core::ProcessContext>& context, const std::string& hostname, const std::string& username, const std::string& remote_path) {
+  std::string state_listing_strategy;
   std::string state_hostname;
   std::string state_username;
   std::string state_remote_path;
@@ -552,53 +506,68 @@ bool ListSFTP::updateFromTrackingTimestampsCache(const std::string& hostname, co
   uint64_t state_processed_timestamp;
   std::set<std::string> state_ids;
 
-  std::string line;
-  while (std::getline(file, line)) {
-    size_t separator_pos = line.find('=');
-    if (separator_pos == std::string::npos) {
-      logger_->log_warn("None key-value line found in Tracking Timestamps state file \"%s\": \"%s\"", tracking_timestamps_state_filename_.c_str(), line.c_str());
-    }
-    std::string key = line.substr(0, separator_pos);
-    std::string value = line.substr(separator_pos + 1);
-    if (key == "hostname") {
-      state_hostname = std::move(value);
-    } else if (key == "username") {
-      state_username = std::move(value);
-    } else if (key == "remote_path") {
-      state_remote_path = std::move(value);
-    } else if (key == "listing.timestamp") {
-      try {
-        state_listing_timestamp = stoull(value);
-      } catch (...) {
-        logger_->log_error("listing.timestamp is not an uint64 in Tracking Timestamps state file \"%s\"", tracking_timestamps_state_filename_.c_str());
-        return false;
-      }
-    } else if (key == "processed.timestamp") {
-      try {
-        state_processed_timestamp = stoull(value);
-      } catch (...) {
-        logger_->log_error("processed.timestamp is not an uint64 in Tracking Timestamps state file \"%s\"", tracking_timestamps_state_filename_.c_str());
-        return false;
-      }
-    } else if (key.compare(0, strlen("id."), "id.") == 0) {
-      state_ids.emplace(std::move(value));
-    } else {
-      logger_->log_warn("Unknown key found in Tracking Timestamps state file \"%s\": \"%s\"", tracking_timestamps_state_filename_.c_str(), key.c_str());
+  std::unordered_map<std::string, std::string> state_map;
+  if (!state_manager_->get(state_map)) {
+    logger_->log_info("Found no stored state");
+    return false;
+  }
+
+  try {
+    state_listing_strategy = state_map.at("listing_strategy");
+  } catch (...) {
+    logger_->log_error("listing_strategy is missing from state");
+    return false;
+  }
+  try {
+    state_hostname = state_map.at("hostname");
+  } catch (...) {
+    logger_->log_error("hostname is missing from state");
+    return false;
+  }
+  try {
+    state_username = state_map.at("username");
+  } catch (...) {
+    logger_->log_error("username is missing from state");
+    return false;
+  }
+  try {
+    state_remote_path = state_map.at("remote_path");
+  } catch (...) {
+    logger_->log_error("remote_path is missing from state");
+    return false;
+  }
+  try {
+    state_listing_timestamp = stoull(state_map.at("listing.timestamp"));
+  } catch (...) {
+    logger_->log_error("listing.timestamp is missing from state or is invalid");
+    return false;
+  }
+  try {
+    state_processed_timestamp = stoull(state_map.at("processed.timestamp"));
+  } catch (...) {
+    logger_->log_error("processed.timestamp is missing from state or is invalid");
+    return false;
+  }
+  for (const auto &kv : state_map) {
+    if (kv.first.compare(0, strlen("id."), "id.") == 0) {
+      state_ids.emplace(kv.second);
     }
   }
-  file.close();
 
-  if (state_hostname != hostname ||
+  if (state_listing_strategy != listing_strategy_ ||
+      state_hostname != hostname ||
       state_username != username ||
       state_remote_path != remote_path) {
-    logger_->log_error("Tracking Timestamps state file \"%s\" was created with different settings than the current ones, ignoring. "
-                       "Hostname: \"%s\" vs. \"%s\", "
-                       "Username: \"%s\" vs. \"%s\", "
-                       "Remote Path: \"%s\" vs. \"%s\"",
-                       tracking_timestamps_state_filename_.c_str(),
-                       state_hostname, hostname,
-                       state_username, username,
-                       state_remote_path, remote_path);
+    logger_->log_error(
+        "Processor state was persisted with different settings than the current ones, ignoring. "
+        "Listing Strategy: \"%s\" vs. \"%s\", "
+        "Hostname: \"%s\" vs. \"%s\", "
+        "Username: \"%s\" vs. \"%s\", "
+        "Remote Path: \"%s\" vs. \"%s\"",
+        state_listing_strategy, listing_strategy_,
+        state_hostname, hostname,
+        state_username, username,
+        state_remote_path, remote_path);
     return false;
   }
 
@@ -620,11 +589,11 @@ void ListSFTP::listByTrackingTimestamps(
   uint64_t min_timestamp_to_list = last_listed_latest_entry_timestamp_;
 
   /* Load state from cache file if needed */
-  if (!already_loaded_from_cache_ && !tracking_timestamps_state_filename_.empty()) {
-    if (updateFromTrackingTimestampsCache(hostname, username, remote_path)) {
-      logger_->log_debug("Successfully loaded Tracking Timestamps state file \"%s\"", tracking_timestamps_state_filename_.c_str());
+  if (!already_loaded_from_cache_) {
+    if (updateFromTrackingTimestampsCache(context, hostname, username, remote_path)) {
+      logger_->log_debug("Successfully loaded state");
     } else {
-      logger_->log_debug("Failed to load Tracking Timestamps state file \"%s\"", tracking_timestamps_state_filename_.c_str());
+      logger_->log_debug("Failed to load state");
     }
     already_loaded_from_cache_ = true;
   }
@@ -762,9 +731,7 @@ void ListSFTP::listByTrackingTimestamps(
 
     if (latest_listed_entry_timestamp_this_cycle != last_listed_latest_entry_timestamp_ || processed_new_files) {
       last_listed_latest_entry_timestamp_ = latest_listed_entry_timestamp_this_cycle;
-      if (!tracking_timestamps_state_filename_.empty()) {
-        persistTrackingTimestampsCache(hostname, username, remote_path);
-      }
+      persistTrackingTimestampsCache(context, hostname, username, remote_path);
     }
   } else {
     logger_->log_debug("There are no files to list. Yielding.");
@@ -773,140 +740,100 @@ void ListSFTP::listByTrackingTimestamps(
   }
 }
 
-bool ListSFTP::persistTrackingEntitiesCache(const std::string& hostname, const std::string& username, const std::string& remote_path) {
-  std::ofstream file(tracking_entities_state_filename_);
-  if (!file.is_open()) {
-    logger_->log_error("Failed to store Tracking Entities state to state file \"%s\"", tracking_entities_state_filename_.c_str());
-    return false;
-  }
-  file << "hostname=" << hostname << "\n";
-  file << "username=" << username << "\n";
-  file << "remote_path=" << remote_path << "\n";
-  file << "json_state_file=" << tracking_entities_state_json_filename_ << "\n";
-  file.close();
-
-  std::ofstream json_file(tracking_entities_state_json_filename_);
-  if (!json_file.is_open()) {
-    logger_->log_error("Failed to store Tracking Entities state to state json file \"%s\"", tracking_entities_state_json_filename_.c_str());
-    return false;
-  }
-
-  rapidjson::Document entities(rapidjson::kObjectType);
-  rapidjson::Document::AllocatorType& alloc = entities.GetAllocator();
-  for (const auto& already_listed_entity : already_listed_entities_) {
-    rapidjson::Value entity(rapidjson::kObjectType);
-    entity.AddMember("timestamp", already_listed_entity.second.timestamp, alloc);
-    entity.AddMember("size", already_listed_entity.second.size, alloc);
-    entities.AddMember(rapidjson::Value(already_listed_entity.first.c_str(), alloc), std::move(entity), alloc);
+bool ListSFTP::persistTrackingEntitiesCache(const std::shared_ptr<core::ProcessContext>& context, const std::string& hostname, const std::string& username, const std::string& remote_path) {
+  std::unordered_map<std::string, std::string> state;
+  state["listing_strategy"] = listing_strategy_;
+  state["hostname"] = hostname;
+  state["username"] = username;
+  state["remote_path"] = remote_path;
+  size_t i = 0;
+  for (const auto &already_listed_entity : already_listed_entities_) {
+    state["entity." + std::to_string(i) + ".name"] = already_listed_entity.first;
+    state["entity." + std::to_string(i) + ".timestamp"] = std::to_string(already_listed_entity.second.timestamp);
+    state["entity." + std::to_string(i) + ".size"] = std::to_string(already_listed_entity.second.size);
+    ++i;
   }
-
-  rapidjson::OStreamWrapper osw(json_file);
-  rapidjson::Writer<rapidjson::OStreamWrapper> writer(osw);
-  entities.Accept(writer);
-
-  return true;
+  return state_manager_->set(state);
 }
 
-bool ListSFTP::updateFromTrackingEntitiesCache(const std::string& hostname, const std::string& username, const std::string& remote_path) {
-  std::ifstream file(tracking_entities_state_filename_);
-  if (!file.is_open()) {
-    logger_->log_error("Failed to open Tracking Entities state file \"%s\"", tracking_entities_state_filename_.c_str());
-    return false;
-  }
+bool ListSFTP::updateFromTrackingEntitiesCache(const std::shared_ptr<core::ProcessContext>& context, const std::string& hostname, const std::string& username, const std::string& remote_path) {
+  std::string state_listing_strategy;
   std::string state_hostname;
   std::string state_username;
   std::string state_remote_path;
-  std::string state_json_state_file;
+  std::unordered_map<std::string, ListedEntity> new_already_listed_entities;
 
-  std::string line;
-  while (std::getline(file, line)) {
-    size_t separator_pos = line.find('=');
-    if (separator_pos == std::string::npos) {
-      logger_->log_warn("None key-value line found in Tracking Entities state file \"%s\": \"%s\"", tracking_entities_state_filename_.c_str(), line.c_str());
-      continue;
-    }
-    std::string key = line.substr(0, separator_pos);
-    std::string value = line.substr(separator_pos + 1);
-    if (key == "hostname") {
-      state_hostname = std::move(value);
-    } else if (key == "username") {
-      state_username = std::move(value);
-    } else if (key == "remote_path") {
-      state_remote_path = std::move(value);
-    } else if (key == "json_state_file") {
-      state_json_state_file = std::move(value);
-    } else {
-      logger_->log_warn("Unknown key found in Tracking Entities state file \"%s\": \"%s\"", tracking_entities_state_filename_.c_str(), key.c_str());
-    }
+  std::unordered_map<std::string, std::string> state_map;
+  if (!state_manager_->get(state_map)) {
+    logger_->log_debug("Failed to get state from StateManager");
+    return false;
   }
-  file.close();
 
-  if (state_hostname != hostname ||
-      state_username != username ||
-      state_remote_path != remote_path) {
-    logger_->log_error("Tracking Entities state file \"%s\" was created with different settings than the current ones, ignoring. "
-                       "Hostname: \"%s\" vs. \"%s\", "
-                       "Username: \"%s\" vs. \"%s\", "
-                       "Remote Path: \"%s\" vs. \"%s\"",
-                       tracking_entities_state_filename_.c_str(),
-                       state_hostname, hostname,
-                       state_username, username,
-                       state_remote_path, remote_path);
+  try {
+    state_listing_strategy = state_map.at("listing_strategy");
+  } catch (...) {
+    logger_->log_error("listing_strategy is missing from state");
     return false;
   }
-
-  if (state_json_state_file.empty()) {
-    logger_->log_error("Could not found json state file path in Tracking Entities state file \"%s\"", tracking_entities_state_filename_.c_str());
+  try {
+    state_hostname = state_map.at("hostname");
+  } catch (...) {
+    logger_->log_error("hostname is missing from state");
     return false;
   }
-
-  std::ifstream json_file(state_json_state_file);
-  if (!json_file.is_open()) {
-    logger_->log_error("Failed to open entities Tracking Entities state json file \"%s\"", state_json_state_file.c_str());
+  try {
+    state_username = state_map.at("username");
+  } catch (...) {
+    logger_->log_error("username is missing from state");
     return false;
   }
-
   try {
-    rapidjson::IStreamWrapper isw(json_file);
-    rapidjson::Document d;
-    rapidjson::ParseResult res = d.ParseStream(isw);
-    if (!res) {
-      logger_->log_error("Failed to parse Tracking Entities state json file \"%s\"", state_json_state_file.c_str());
-      return false;
-    }
-    if (!d.IsObject()) {
-      logger_->log_error("Tracking Entities state json file \"%s\" root is not an object", state_json_state_file.c_str());
-      return false;
-    }
+    state_remote_path = state_map.at("remote_path");
+  } catch (...) {
+    logger_->log_error("remote_path is missing from state");
+    return false;
+  }
 
-    std::unordered_map<std::string, ListedEntity> new_already_listed_entities;
-    for (const auto &already_listed_entity : d.GetObject()) {
-      auto it = already_listed_entity.value.FindMember("timestamp");
-      if (it == already_listed_entity.value.MemberEnd() || !it->value.IsUint64()) {
-        logger_->log_error("Tracking Entities state json file \"%s\" timestamp missing or malformatted for entity \"%s\"",
-            state_json_state_file.c_str(),
-            already_listed_entity.name.GetString());
-        continue;
-      }
-      uint64_t timestamp = it->value.GetUint64();
-      it = already_listed_entity.value.FindMember("size");
-      if (it == already_listed_entity.value.MemberEnd() || !it->value.IsUint64()) {
-        logger_->log_error("Tracking Entities state json file \"%s\" size missing or malformatted for entity \"%s\"",
-                           state_json_state_file.c_str(),
-                           already_listed_entity.name.GetString());
-        continue;
-      }
-      uint64_t size = it->value.GetUint64();
+  size_t i = 0;
+  while (true) {
+    std::string name;
+    try {
+      name = state_map.at("entity." + std::to_string(i) + ".name");
+    } catch (...) {
+      break;
+    }
+    try {
+      uint64_t timestamp = std::stoull(state_map.at("entity." + std::to_string(i) + ".timestamp"));
+      uint64_t size = std::stoull(state_map.at("entity." + std::to_string(i) + ".size"));
       new_already_listed_entities.emplace(std::piecewise_construct,
-                                          std::forward_as_tuple(already_listed_entity.name.GetString()),
+                                          std::forward_as_tuple(name),
                                           std::forward_as_tuple(timestamp, size));
+    } catch (...) {
+      logger_->log_error("State for entity \"%s\" is missing or invalid, skipping", name);
+      continue;
     }
-    already_listed_entities_ = std::move(new_already_listed_entities);
-  } catch (std::exception& e) {
-    logger_->log_error("Exception while parsing Tracking Entities state json file \"%s\": %s", state_json_state_file.c_str(), e.what());
+    ++i;
+  }
+
+  if (state_listing_strategy != listing_strategy_ ||
+      state_hostname != hostname ||
+      state_username != username ||
+      state_remote_path != remote_path) {
+    logger_->log_error(
+        "Processor state was persisted with different settings than the current ones, ignoring. "
+        "Listing Strategy: \"%s\" vs. \"%s\", "
+        "Hostname: \"%s\" vs. \"%s\", "
+        "Username: \"%s\" vs. \"%s\", "
+        "Remote Path: \"%s\" vs. \"%s\"",
+        state_listing_strategy, listing_strategy_,
+        state_hostname, hostname,
+        state_username, username,
+        state_remote_path, remote_path);
     return false;
   }
 
+  already_listed_entities_ = std::move(new_already_listed_entities);
+
   return true;
 }
 
@@ -920,12 +847,12 @@ void ListSFTP::listByTrackingEntities(
     uint64_t entity_tracking_time_window,
     std::vector<Child>&& files) {
   /* Load state from cache file if needed */
-  if (!already_loaded_from_cache_ && !tracking_entities_state_filename_.empty()) {
-    if (updateFromTrackingEntitiesCache(hostname, username, remote_path)) {
-      logger_->log_debug("Successfully loaded Tracking Entities state file \"%s\"", tracking_entities_state_filename_.c_str());
+  if (!already_loaded_from_cache_) {
+    if (updateFromTrackingEntitiesCache(context, hostname, username, remote_path)) {
+      logger_->log_debug("Successfully loaded state");
       initial_listing_complete_ = true;
     } else {
-      logger_->log_debug("Failed to load Tracking Entities state file \"%s\"", tracking_entities_state_filename_.c_str());
+      logger_->log_debug("Failed to load state");
     }
     already_loaded_from_cache_ = true;
   }
@@ -1014,9 +941,7 @@ void ListSFTP::listByTrackingEntities(
 
   initial_listing_complete_ = true;
 
-  if (!tracking_entities_state_filename_.empty()) {
-    persistTrackingEntitiesCache(hostname, username, remote_path);
-  }
+  persistTrackingEntitiesCache(context, hostname, username, remote_path);
 }
 
 void ListSFTP::onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) {
diff --git a/extensions/sftp/processors/ListSFTP.h b/extensions/sftp/processors/ListSFTP.h
index 1efb754..e1601ed 100644
--- a/extensions/sftp/processors/ListSFTP.h
+++ b/extensions/sftp/processors/ListSFTP.h
@@ -35,6 +35,7 @@
 #include "core/logging/LoggerConfiguration.h"
 #include "utils/Id.h"
 #include "utils/RegexUtils.h"
+#include "controllers/keyvalue/PersistableKeyValueStoreService.h"
 #include "../client/SFTPClient.h"
 
 namespace org {
@@ -81,7 +82,6 @@ class ListSFTP : public SFTPProcessorBase {
   static core::Property MaximumFileAge;
   static core::Property MinimumFileSize;
   static core::Property MaximumFileSize;
-  static core::Property StateFile;
 
   // Supported Relationships
   static core::Relationship Success;
@@ -104,6 +104,7 @@ class ListSFTP : public SFTPProcessorBase {
 
  private:
 
+  std::shared_ptr<core::CoreComponentStateManager> state_manager_;
   std::string listing_strategy_;
   bool search_recursively_;
   bool follow_symlink_;
@@ -139,15 +140,12 @@ class ListSFTP : public SFTPProcessorBase {
 
   bool already_loaded_from_cache_;
 
-  std::string tracking_timestamps_state_filename_;
   std::chrono::time_point<std::chrono::steady_clock> last_run_time_;
   uint64_t last_listed_latest_entry_timestamp_;
   uint64_t last_processed_latest_entry_timestamp_;
   std::set<std::string> latest_identifiers_processed_;
 
   bool initial_listing_complete_;
-  std::string tracking_entities_state_filename_;
-  std::string tracking_entities_state_json_filename_;
   struct ListedEntity {
     uint64_t timestamp;
     uint64_t size;
@@ -170,11 +168,11 @@ class ListSFTP : public SFTPProcessorBase {
       const std::string& username,
       const Child& child);
 
-  bool persistTrackingTimestampsCache(const std::string& hostname, const std::string& username, const std::string& remote_path);
-  bool updateFromTrackingTimestampsCache(const std::string& hostname, const std::string& username, const std::string& remote_path);
+  bool persistTrackingTimestampsCache(const std::shared_ptr<core::ProcessContext>& context, const std::string& hostname, const std::string& username, const std::string& remote_path);
+  bool updateFromTrackingTimestampsCache(const std::shared_ptr<core::ProcessContext>& context, const std::string& hostname, const std::string& username, const std::string& remote_path);
 
-  bool persistTrackingEntitiesCache(const std::string& hostname, const std::string& username, const std::string& remote_path);
-  bool updateFromTrackingEntitiesCache(const std::string& hostname, const std::string& username, const std::string& remote_path);
+  bool persistTrackingEntitiesCache(const std::shared_ptr<core::ProcessContext>& context, const std::string& hostname, const std::string& username, const std::string& remote_path);
+  bool updateFromTrackingEntitiesCache(const std::shared_ptr<core::ProcessContext>& context, const std::string& hostname, const std::string& username, const std::string& remote_path);
 
   void listByTrackingTimestamps(
       const std::shared_ptr<core::ProcessContext>& context,
diff --git a/extensions/sftp/tests/CMakeLists.txt b/extensions/sftp/tests/CMakeLists.txt
index e12970c..e9936d3 100644
--- a/extensions/sftp/tests/CMakeLists.txt
+++ b/extensions/sftp/tests/CMakeLists.txt
@@ -46,6 +46,9 @@ if (NOT SKIP_TESTS AND Java_FOUND AND Maven_FOUND AND NOT DISABLE_EXPRESSION_LAN
 		target_wholearchive_library(${testfilename} minifi-sftp)
 		target_wholearchive_library(${testfilename} minifi-expression-language-extensions)
 		target_wholearchive_library(${testfilename} minifi-standard-processors)
+		if (NOT DISABLE_ROCKSDB)
+			target_wholearchive_library(${testfilename} minifi-rocksdb-repos)
+		endif()
 
 		MATH(EXPR SFTP-EXTENSIONS_TEST_COUNT "${SFTP-EXTENSIONS_TEST_COUNT}+1")
 		add_test(NAME "${testfilename}" COMMAND "${testfilename}" WORKING_DIRECTORY ${TEST_DIR})
diff --git a/extensions/sftp/tests/FetchSFTPTests.cpp b/extensions/sftp/tests/FetchSFTPTests.cpp
index d7e95c7..89fea7c 100644
--- a/extensions/sftp/tests/FetchSFTPTests.cpp
+++ b/extensions/sftp/tests/FetchSFTPTests.cpp
@@ -59,8 +59,8 @@
 class FetchSFTPTestsFixture {
  public:
   FetchSFTPTestsFixture()
-  : src_dir(strdup("/tmp/sftps.XXXXXX"))
-  , dst_dir(strdup("/tmp/sftpd.XXXXXX")) {
+  : src_dir(strdup("/var/tmp/sftps.XXXXXX"))
+  , dst_dir(strdup("/var/tmp/sftpd.XXXXXX")) {
     LogTestController::getInstance().setTrace<TestPlan>();
     LogTestController::getInstance().setDebug<minifi::FlowController>();
     LogTestController::getInstance().setDebug<minifi::SchedulingAgent>();
diff --git a/extensions/sftp/tests/ListSFTPTests.cpp b/extensions/sftp/tests/ListSFTPTests.cpp
index f82b010..74e06cb 100644
--- a/extensions/sftp/tests/ListSFTPTests.cpp
+++ b/extensions/sftp/tests/ListSFTPTests.cpp
@@ -58,7 +58,7 @@
 class ListSFTPTestsFixture {
  public:
   ListSFTPTestsFixture()
-  : src_dir(strdup("/tmp/sftps.XXXXXX")) {
+  : src_dir(strdup("/var/tmp/sftps.XXXXXX")) {
     LogTestController::getInstance().setTrace<TestPlan>();
     LogTestController::getInstance().setDebug<minifi::FlowController>();
     LogTestController::getInstance().setDebug<minifi::SchedulingAgent>();
@@ -89,7 +89,13 @@ class ListSFTPTestsFixture {
   }
 
   void createPlan(utils::Identifier* list_sftp_uuid = nullptr) {
-    plan = testController.createPlan();
+    const std::string state_dir = plan == nullptr ? "" : plan->getStateDir();
+
+    log_attribute.reset();
+    list_sftp.reset();
+    plan.reset();
+
+    plan = testController.createPlan(nullptr /*config*/, state_dir.empty() ? nullptr : state_dir.c_str());
     if (list_sftp_uuid == nullptr) {
       list_sftp = plan->addProcessor(
           "ListSFTP",
@@ -124,7 +130,6 @@ class ListSFTPTestsFixture {
     plan->setProperty(list_sftp, "Minimum File Size", "0 B");
     plan->setProperty(list_sftp, "Target System Timestamp Precision", "Seconds");
     plan->setProperty(list_sftp, "Remote Path", "nifi_test/");
-    plan->setProperty(list_sftp, "State File", std::string(src_dir) + "/state");
 
     // Configure LogAttribute processor
     plan->setProperty(log_attribute, "FlowFiles To Log", "0");
@@ -546,6 +551,16 @@ TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP Tracking Timestamps restore sta
 
   testController.runSession(plan, true);
 
+  std::unordered_map<std::string, std::string> state;
+  REQUIRE(plan->getStateManagerProvider()->getCoreComponentStateManager(*list_sftp)->get(state));
+  REQUIRE("localhost" == state.at("hostname"));
+  REQUIRE("nifiuser" == state.at("username"));
+  REQUIRE("nifi_test" == state.at("remote_path"));
+  REQUIRE("Tracking Timestamps" == state.at("listing_strategy"));
+  REQUIRE("nifi_test/file1.ext" == state.at("id.0"));
+  REQUIRE(!state.at("listing.timestamp").empty());
+  REQUIRE(!state.at("processed.timestamp").empty());
+
   REQUIRE(LogTestController::getInstance().contains("from ListSFTP to relationship success"));
   REQUIRE(LogTestController::getInstance().contains("key:filename value:file1.ext"));
 
@@ -559,9 +574,19 @@ TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP Tracking Timestamps restore sta
 
   testController.runSession(plan, true);
 
-  REQUIRE(LogTestController::getInstance().contains("Successfully loaded Tracking Timestamps state file"));
+  REQUIRE(plan->getStateManagerProvider()->getCoreComponentStateManager(*list_sftp)->get(state));
+  REQUIRE("localhost" == state.at("hostname"));
+  REQUIRE("nifiuser" == state.at("username"));
+  REQUIRE("nifi_test" == state.at("remote_path"));
+  REQUIRE("Tracking Timestamps" == state.at("listing_strategy"));
+  REQUIRE("nifi_test/file2.ext" == state.at("id.0"));
+  REQUIRE(!state.at("listing.timestamp").empty());
+  REQUIRE(!state.at("processed.timestamp").empty());
+
+  REQUIRE(LogTestController::getInstance().contains("Successfully loaded state"));
   REQUIRE(LogTestController::getInstance().contains("from ListSFTP to relationship success"));
   REQUIRE(LogTestController::getInstance().contains("key:filename value:file2.ext"));
+  REQUIRE(!LogTestController::getInstance().contains("key:filename value:file1.ext", std::chrono::seconds(0)));
 }
 
 TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP Tracking Timestamps restore state changed configuration", "[ListSFTP][tracking-timestamps]") {
@@ -574,6 +599,13 @@ TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP Tracking Timestamps restore sta
   REQUIRE(LogTestController::getInstance().contains("from ListSFTP to relationship success"));
   REQUIRE(LogTestController::getInstance().contains("key:filename value:file1.ext"));
 
+  std::unordered_map<std::string, std::string> state;
+  REQUIRE(plan->getStateManagerProvider()->getCoreComponentStateManager(*list_sftp)->get(state));
+  REQUIRE("localhost" == state.at("hostname"));
+  REQUIRE("nifiuser" == state.at("username"));
+  REQUIRE("nifi_test" == state.at("remote_path"));
+  REQUIRE("Tracking Timestamps" == state.at("listing_strategy"));
+
   utils::Identifier list_sftp_uuid;
   REQUIRE(true == list_sftp->getUUID(list_sftp_uuid));
   createPlan(&list_sftp_uuid);
@@ -585,7 +617,14 @@ TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP Tracking Timestamps restore sta
 
   testController.runSession(plan, true);
 
-  REQUIRE(LogTestController::getInstance().contains("was created with different settings than the current ones, ignoring. "
+  REQUIRE(plan->getStateManagerProvider()->getCoreComponentStateManager(*list_sftp)->get(state));
+  REQUIRE("localhost" == state.at("hostname"));
+  REQUIRE("nifiuser" == state.at("username"));
+  REQUIRE("/nifi_test" == state.at("remote_path"));
+  REQUIRE("Tracking Timestamps" == state.at("listing_strategy"));
+
+  REQUIRE(LogTestController::getInstance().contains("Processor state was persisted with different settings than the current ones, ignoring. "
+                                                    "Listing Strategy: \"Tracking Timestamps\" vs. \"Tracking Timestamps\", "
                                                     "Hostname: \"localhost\" vs. \"localhost\", "
                                                     "Username: \"nifiuser\" vs. \"nifiuser\", "
                                                     "Remote Path: \"nifi_test\" vs. \"/nifi_test\""));
@@ -612,10 +651,11 @@ TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP Tracking Timestamps changed con
 
   testController.runSession(plan, true);
 
-  REQUIRE(LogTestController::getInstance().contains("was created with different settings than the current ones, ignoring. "
-  "Hostname: \"localhost\" vs. \"localhost\", "
-  "Username: \"nifiuser\" vs. \"nifiuser\", "
-  "Remote Path: \"nifi_test\" vs. \"/nifi_test\""));
+  REQUIRE(LogTestController::getInstance().contains("Processor state was persisted with different settings than the current ones, ignoring. "
+                                                    "Listing Strategy: \"Tracking Timestamps\" vs. \"Tracking Timestamps\", "
+                                                    "Hostname: \"localhost\" vs. \"localhost\", "
+                                                    "Username: \"nifiuser\" vs. \"nifiuser\", "
+                                                    "Remote Path: \"nifi_test\" vs. \"/nifi_test\""));
   REQUIRE(LogTestController::getInstance().contains("from ListSFTP to relationship success"));
   REQUIRE(LogTestController::getInstance().contains("key:filename value:file1.ext"));
   REQUIRE(LogTestController::getInstance().contains("key:filename value:file2.ext"));
@@ -814,13 +854,41 @@ TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP Tracking Entities restore state
   plan->setProperty(list_sftp, "Listing Strategy", "Tracking Entities");
   LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output);
 
+  std::unordered_map<std::string, std::string> state;
+  REQUIRE(plan->getStateManagerProvider()->getCoreComponentStateManager(*list_sftp)->get(state));
+  REQUIRE("localhost" == state.at("hostname"));
+  REQUIRE("nifiuser" == state.at("username"));
+  REQUIRE("nifi_test" == state.at("remote_path"));
+  REQUIRE("Tracking Entities" == state.at("listing_strategy"));
+  REQUIRE("nifi_test/file1.ext" == state.at("entity.0.name"));
+  REQUIRE("14" == state.at("entity.0.size"));
+  REQUIRE(!state.at("entity.0.timestamp").empty());
+
   createFileWithModificationTimeDiff("nifi_test/file2.ext", "Test content 2", -240 /* 4 minutes ago */);
 
   testController.runSession(plan, true);
 
-  REQUIRE(LogTestController::getInstance().contains("Successfully loaded Tracking Entities state file"));
+  REQUIRE(plan->getStateManagerProvider()->getCoreComponentStateManager(*list_sftp)->get(state));
+  REQUIRE("localhost" == state.at("hostname"));
+  REQUIRE("nifiuser" == state.at("username"));
+  REQUIRE("nifi_test" == state.at("remote_path"));
+  REQUIRE("Tracking Entities" == state.at("listing_strategy"));
+  if (state.at("entity.0.name") == "nifi_test/file1.ext") {
+    REQUIRE("nifi_test/file1.ext" == state.at("entity.0.name"));
+    REQUIRE("nifi_test/file2.ext" == state.at("entity.1.name"));
+  } else {
+    REQUIRE("nifi_test/file2.ext" == state.at("entity.0.name"));
+    REQUIRE("nifi_test/file1.ext" == state.at("entity.1.name"));
+  }
+  REQUIRE("14" == state.at("entity.0.size"));
+  REQUIRE(!state.at("entity.0.timestamp").empty());
+  REQUIRE("14" == state.at("entity.1.size"));
+  REQUIRE(!state.at("entity.1.timestamp").empty());
+
+  REQUIRE(LogTestController::getInstance().contains("Successfully loaded state"));
   REQUIRE(LogTestController::getInstance().contains("from ListSFTP to relationship success"));
   REQUIRE(LogTestController::getInstance().contains("key:filename value:file2.ext"));
+  REQUIRE(!LogTestController::getInstance().contains("key:filename value:file1.ext", std::chrono::seconds(0)));
 }
 
 TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP Tracking Entities restore state changed configuration", "[ListSFTP][tracking-entities]") {
@@ -833,6 +901,13 @@ TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP Tracking Entities restore state
   REQUIRE(LogTestController::getInstance().contains("from ListSFTP to relationship success"));
   REQUIRE(LogTestController::getInstance().contains("key:filename value:file1.ext"));
 
+  std::unordered_map<std::string, std::string> state;
+  REQUIRE(plan->getStateManagerProvider()->getCoreComponentStateManager(*list_sftp)->get(state));
+  REQUIRE("localhost" == state.at("hostname"));
+  REQUIRE("nifiuser" == state.at("username"));
+  REQUIRE("nifi_test" == state.at("remote_path"));
+  REQUIRE("Tracking Entities" == state.at("listing_strategy"));
+
   utils::Identifier list_sftp_uuid;
   REQUIRE(true == list_sftp->getUUID(list_sftp_uuid));
   createPlan(&list_sftp_uuid);
@@ -844,10 +919,17 @@ TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP Tracking Entities restore state
 
   testController.runSession(plan, true);
 
-  REQUIRE(LogTestController::getInstance().contains("was created with different settings than the current ones, ignoring. "
-  "Hostname: \"localhost\" vs. \"localhost\", "
-  "Username: \"nifiuser\" vs. \"nifiuser\", "
-  "Remote Path: \"nifi_test\" vs. \"/nifi_test\""));
+  REQUIRE(plan->getStateManagerProvider()->getCoreComponentStateManager(*list_sftp)->get(state));
+  REQUIRE("localhost" == state.at("hostname"));
+  REQUIRE("nifiuser" == state.at("username"));
+  REQUIRE("/nifi_test" == state.at("remote_path"));
+  REQUIRE("Tracking Entities" == state.at("listing_strategy"));
+
+  REQUIRE(LogTestController::getInstance().contains("Processor state was persisted with different settings than the current ones, ignoring. "
+                                                    "Listing Strategy: \"Tracking Entities\" vs. \"Tracking Entities\", "
+                                                    "Hostname: \"localhost\" vs. \"localhost\", "
+                                                    "Username: \"nifiuser\" vs. \"nifiuser\", "
+                                                    "Remote Path: \"nifi_test\" vs. \"/nifi_test\""));
   REQUIRE(LogTestController::getInstance().contains("from ListSFTP to relationship success"));
   REQUIRE(LogTestController::getInstance().contains("key:filename value:file1.ext"));
   REQUIRE(LogTestController::getInstance().contains("key:filename value:file2.ext"));
@@ -871,10 +953,11 @@ TEST_CASE_METHOD(ListSFTPTestsFixture, "ListSFTP Tracking Entities changed confi
 
   testController.runSession(plan, true);
 
-  REQUIRE(LogTestController::getInstance().contains("was created with different settings than the current ones, ignoring. "
-  "Hostname: \"localhost\" vs. \"localhost\", "
-  "Username: \"nifiuser\" vs. \"nifiuser\", "
-  "Remote Path: \"nifi_test\" vs. \"/nifi_test\""));
+  REQUIRE(LogTestController::getInstance().contains("Processor state was persisted with different settings than the current ones, ignoring. "
+                                                    "Listing Strategy: \"Tracking Entities\" vs. \"Tracking Entities\", "
+                                                    "Hostname: \"localhost\" vs. \"localhost\", "
+                                                    "Username: \"nifiuser\" vs. \"nifiuser\", "
+                                                    "Remote Path: \"nifi_test\" vs. \"/nifi_test\""));
   REQUIRE(LogTestController::getInstance().contains("from ListSFTP to relationship success"));
   REQUIRE(LogTestController::getInstance().contains("key:filename value:file1.ext"));
   REQUIRE(LogTestController::getInstance().contains("key:filename value:file2.ext"));
diff --git a/extensions/sftp/tests/ListThenFetchSFTPTests.cpp b/extensions/sftp/tests/ListThenFetchSFTPTests.cpp
index 0282cb9..f5155ec 100644
--- a/extensions/sftp/tests/ListThenFetchSFTPTests.cpp
+++ b/extensions/sftp/tests/ListThenFetchSFTPTests.cpp
@@ -57,8 +57,8 @@
 class ListThenFetchSFTPTestsFixture {
  public:
   ListThenFetchSFTPTestsFixture()
-  : src_dir(strdup("/tmp/sftps.XXXXXX"))
-  , dst_dir(strdup("/tmp/sftpd.XXXXXX")) {
+  : src_dir(strdup("/var/tmp/sftps.XXXXXX"))
+  , dst_dir(strdup("/var/tmp/sftpd.XXXXXX")) {
     LogTestController::getInstance().setTrace<TestPlan>();
     LogTestController::getInstance().setDebug<minifi::FlowController>();
     LogTestController::getInstance().setDebug<minifi::SchedulingAgent>();
diff --git a/extensions/sftp/tests/PutSFTPTests.cpp b/extensions/sftp/tests/PutSFTPTests.cpp
index 4d13a11..3c8bb1f 100644
--- a/extensions/sftp/tests/PutSFTPTests.cpp
+++ b/extensions/sftp/tests/PutSFTPTests.cpp
@@ -60,8 +60,8 @@
 class PutSFTPTestsFixture {
  public:
   PutSFTPTestsFixture()
-  : src_dir(strdup("/tmp/sftps.XXXXXX"))
-  , dst_dir(strdup("/tmp/sftpd.XXXXXX")) {
+  : src_dir(strdup("/var/tmp/sftps.XXXXXX"))
+  , dst_dir(strdup("/var/tmp/sftpd.XXXXXX")) {
     LogTestController::getInstance().setTrace<TestPlan>();
     LogTestController::getInstance().setDebug<minifi::FlowController>();
     LogTestController::getInstance().setDebug<minifi::SchedulingAgent>();
@@ -736,7 +736,7 @@ TEST_CASE_METHOD(PutSFTPTestsFixture, "PutSFTP connection caching reaches limit"
   std::vector<std::vector<char>> dst_dirs;
   std::vector<std::unique_ptr<SFTPTestServer>> sftp_servers;
 
-  std::string tmp_dir_format("/tmp/sftpd.XXXXXX");
+  std::string tmp_dir_format("/var/tmp/sftpd.XXXXXX");
   for (size_t i = 0; i < 10; i++) {
     dst_dirs.emplace_back(tmp_dir_format.data(), tmp_dir_format.data() + tmp_dir_format.size() + 1);
     testController.createTempDirectory(dst_dirs.back().data());
diff --git a/extensions/sql/processors/ExecuteSQL.cpp b/extensions/sql/processors/ExecuteSQL.cpp
index 7738408..fe0df48 100644
--- a/extensions/sql/processors/ExecuteSQL.cpp
+++ b/extensions/sql/processors/ExecuteSQL.cpp
@@ -81,14 +81,14 @@ void ExecuteSQL::initialize() {
   setSupportedRelationships( { s_success });
 }
 
-void ExecuteSQL::processOnSchedule(const core::ProcessContext &context) {
+void ExecuteSQL::processOnSchedule(core::ProcessContext &context) {
   initOutputFormat(context);
 
   context.getProperty(s_sqlSelectQuery.getName(), sqlSelectQuery_);
   context.getProperty(s_maxRowsPerFlowFile.getName(), max_rows_);
 }
 
-void ExecuteSQL::processOnTrigger(core::ProcessSession &session) {
+void ExecuteSQL::processOnTrigger(core::ProcessSession& session) {
   auto statement = connection_->prepareStatement(sqlSelectQuery_);
 
   auto rowset = statement->execute();
diff --git a/extensions/sql/processors/ExecuteSQL.h b/extensions/sql/processors/ExecuteSQL.h
index 764f038..2a9773d 100644
--- a/extensions/sql/processors/ExecuteSQL.h
+++ b/extensions/sql/processors/ExecuteSQL.h
@@ -46,9 +46,9 @@ class ExecuteSQL: public SQLProcessor<ExecuteSQL>, public OutputFormat {
   //! Processor Name
   static const std::string ProcessorName;
 
-  void processOnSchedule(const core::ProcessContext& context);
+  void processOnSchedule(core::ProcessContext& context);
   void processOnTrigger(core::ProcessSession& session);
-  
+
   void initialize() override;
 
   static const core::Property s_sqlSelectQuery;
diff --git a/extensions/sql/processors/PutSQL.cpp b/extensions/sql/processors/PutSQL.cpp
index 0512cc9..5296760 100644
--- a/extensions/sql/processors/PutSQL.cpp
+++ b/extensions/sql/processors/PutSQL.cpp
@@ -73,7 +73,7 @@ void PutSQL::initialize() {
   setSupportedRelationships( { s_success });
 }
 
-void PutSQL::processOnSchedule(const core::ProcessContext& context) {
+void PutSQL::processOnSchedule(core::ProcessContext& context) {
   std::string sqlStatements;
   context.getProperty(s_sqlStatements.getName(), sqlStatements);
   sqlStatements_ = utils::StringUtils::split(sqlStatements, ";");
diff --git a/extensions/sql/processors/PutSQL.h b/extensions/sql/processors/PutSQL.h
index 284008a..6c5494d 100644
--- a/extensions/sql/processors/PutSQL.h
+++ b/extensions/sql/processors/PutSQL.h
@@ -45,7 +45,7 @@ class PutSQL: public SQLProcessor<PutSQL> {
   //! Processor Name
   static const std::string ProcessorName;
 
-  void processOnSchedule(const core::ProcessContext &context);
+  void processOnSchedule(core::ProcessContext &context);
   void processOnTrigger(core::ProcessSession &session);
   
   void initialize() override;
diff --git a/extensions/sql/processors/QueryDatabaseTable.cpp b/extensions/sql/processors/QueryDatabaseTable.cpp
index a7d7033..7cf7a41 100644
--- a/extensions/sql/processors/QueryDatabaseTable.cpp
+++ b/extensions/sql/processors/QueryDatabaseTable.cpp
@@ -25,7 +25,7 @@
 #include <map>
 #include <set>
 #include <sstream>
-#include <stdio.h>
+#include <cstdio>
 #include <string>
 #include <iostream>
 #include <memory>
@@ -93,7 +93,7 @@ const core::Property QueryDatabaseTable::s_maxRowsPerFlowFile(
     "If the value specified is zero, then all rows are returned in a single FlowFile.")->supportsExpressionLanguage(true)->build());
 
 const core::Property QueryDatabaseTable::s_stateDirectory(
-  core::PropertyBuilder::createProperty("State Directory")->isRequired(false)->withDefaultValue("QDTState")->withDescription("Directory which contains processor state data.")->build());
+  core::PropertyBuilder::createProperty("State Directory")->isRequired(false)->withDefaultValue("QDTState")->withDescription("DEPRECATED. Only use it for state migration from the state file, supplying the legacy state directory.")->build());
 
 const std::string QueryDatabaseTable::s_initialMaxValueDynamicPropertyPrefix("initial.maxvalue.");
 
@@ -102,15 +102,18 @@ const core::Relationship QueryDatabaseTable::s_success("success", "Successfully
 static const std::string ResultTableName = "tablename";
 static const std::string ResultRowCount = "querydbtable.row.count";
 
+static const std::string TABLENAME_KEY = "tablename";
+static const std::string MAXVALUE_KEY_PREFIX = "maxvalue.";
+
 // State
-class State {
+class LegacyState {
  public:
-  State(const std::string& tableName, const std::string& stateDir, const std::string& uuid, std::shared_ptr<logging::Logger> logger)
+  LegacyState(const std::string& tableName, const std::string& stateDir, const std::string& uuid, std::shared_ptr<logging::Logger> logger)
     :tableName_(tableName), logger_(logger) {
-    if (!createUUIDDir(stateDir, uuid, filePath_))
-      return;
 
-    filePath_ += "State.txt";
+    filePath_ = utils::file::FileUtils::concat_path(
+            utils::file::FileUtils::concat_path(
+              utils::file::FileUtils::concat_path(stateDir, "uuid"), uuid), "State.txt");
 
     if (!getStateFromFile())
       return;
@@ -118,79 +121,30 @@ class State {
     ok_ = true;
   }
 
-  ~State() {}
-
   explicit operator bool() const {
     return ok_;
   }
 
-  std::unordered_map<std::string, std::string> mapState() const {
+  const std::unordered_map<std::string, std::string>& getStateMap() const {
     return mapState_;
   }
 
-  void writeStateToFile(const std::unordered_map<std::string, std::string>& mapState) {
-    file_.seekp(std::ios::beg);
-
-    file_ << tableName_ << separator();
-    auto dataSize = tableName_.size() + separator().size();
-
-    for (const auto& el : mapState) {
-      file_ << el.first << '=' << el.second << separator();
-      dataSize += el.first.size() + 1 + el.second.size() + separator().size();
+  bool moveStateFileToMigrated() {
+    if (!ok_) {
+      return false;
     }
-
-    // If a maxValueColumnName type is varchar, a new max value 'dataSize' can be shorter than previous max value 'dataSize_' - clear difference with ' ' to keep file format.
-    if (dataSize_ > dataSize) {
-      for (auto i = dataSize_ - dataSize; i > 0; i--) {
-        file_ << ' ';
-      }
-    }
-    dataSize_ = dataSize;
-
-    file_.flush();
-
-    mapState_ = mapState;
+    return rename(filePath_.c_str(), (filePath_ + "-migrated").c_str()) == 0;
   }
 
  private:
-   static const std::string& separator() {
-     static const std::string s_separator = "@!qdt!@";
-     return s_separator;
-   }
-
-   bool createUUIDDir(const std::string& stateDir, const std::string& uuid, std::string& dir)
-   {
-     if (stateDir.empty()) {
-       dir.clear();
-       return false;
-     }
-
-     const auto dirSeparator = utils::file::FileUtils::get_separator();
-
-     auto dirWithSlash = stateDir;
-     if (stateDir.back() != dirSeparator) {
-       dirWithSlash += dirSeparator;
-     }
-
-     dir = dirWithSlash + "uuid" + dirSeparator + uuid + dirSeparator;
-
-     utils::file::FileUtils::create_dir(dir);
-
-     if (!utils::file::FileUtils::is_directory(dir.c_str())) {
-       logger_->log_error("Cannot create %s", dir.c_str());
-       dir.clear();
-       return false;
-     }
-
-     return true;
-   }
+  static const std::string separator_;
 
    bool getStateFromFile() {
      std::string state;
 
      std::ifstream file(filePath_);
      if (!file) {
-       return createEmptyStateFile();
+       return false;
      }
 
      std::stringstream ss;
@@ -198,17 +152,15 @@ class State {
 
      state = ss.str();
 
-     dataSize_ = state.size();
-
      file.close();
 
      std::vector<std::string> listColumnNameValue;
 
-     size_t pos = state.find(separator(), 0);
+     size_t pos = state.find(separator_, 0);
      if (pos == std::string::npos) {
        logger_->log_error("Invalid data in '%s' file.", filePath_.c_str());
        mapState_.clear();
-       return createEmptyStateFile();
+       return false;
      }
 
      auto tableName = state.substr(0, pos);
@@ -216,20 +168,20 @@ class State {
        logger_->log_warn("tableName is changed - now: '%s', in State.txt: '%s'.", tableName_.c_str(), tableName.c_str());
        mapState_.clear();
 
-       return createEmptyStateFile();
+       return false;
      }
 
-     pos += separator().size();
+     pos += separator_.size();
 
      while (true) {
-       auto newPos = state.find(separator(), pos);
+       auto newPos = state.find(separator_, pos);
        if (newPos == std::string::npos)
          break;
 
        const std::string& columnNameValue = state.substr(pos, newPos - pos);
        listColumnNameValue.emplace_back(columnNameValue);
 
-       pos = newPos + separator().size();
+       pos = newPos + separator_.size();
      }
 
      for (const auto& columnNameValue : listColumnNameValue) {
@@ -237,7 +189,7 @@ class State {
        if (posEQ == std::string::npos) {
          logger_->log_error("Invalid data in '%s' file.", filePath_.c_str());
          mapState_.clear();
-         return createEmptyStateFile();
+         return false;
        }
 
        const auto& name = columnNameValue.substr(0, posEQ);
@@ -246,25 +198,6 @@ class State {
        mapState_.insert({ name, value });
      }
 
-     file_.open(filePath_);
-     if (!file_.is_open()) {
-       logger_->log_error("Cannot open %s", filePath_.c_str());
-       mapState_.clear();
-       return false;
-     }
-
-     return true;
-   }
-
-   bool createEmptyStateFile() {
-     file_.open(filePath_, std::ios::out);
-     if (!file_.is_open()) {
-       logger_->log_error("Cannot open '%s' file", filePath_.c_str());
-       return false;
-     }
-
-     dataSize_ = 0;
-
      return true;
    }
 
@@ -272,12 +205,11 @@ class State {
    std::unordered_map<std::string, std::string> mapState_;
    std::shared_ptr<logging::Logger> logger_;
    std::string filePath_;
-   std::fstream file_;
-   size_t dataSize_{};
    std::string tableName_;
    bool ok_{};
 };
 
+const std::string LegacyState::separator_ = "@!qdt!@";
 
 // QueryDatabaseTable
 QueryDatabaseTable::QueryDatabaseTable(const std::string& name, utils::Identifier uuid)
@@ -295,7 +227,7 @@ void QueryDatabaseTable::initialize() {
   setSupportedRelationships( { s_success });
 }
 
-void QueryDatabaseTable::processOnSchedule(const core::ProcessContext &context) {
+void QueryDatabaseTable::processOnSchedule(core::ProcessContext &context) {
   initOutputFormat(context);
 
   context.getProperty(s_tableName.getName(), tableName_);
@@ -308,19 +240,43 @@ void QueryDatabaseTable::processOnSchedule(const core::ProcessContext &context)
   context.getProperty(s_sqlQuery.getName(), sqlQuery_);
   context.getProperty(s_maxRowsPerFlowFile.getName(), maxRowsPerFlowFile_);
 
-  std::string stateDir;
-  context.getProperty(s_stateDirectory.getName(), stateDir);
-  if (stateDir.empty()) {
-    logger_->log_error("State Directory is empty");
-    return;
+  mapState_.clear();
+
+  state_manager_ = context.getStateManager();
+  if (state_manager_ == nullptr) {
+    throw Exception(PROCESSOR_EXCEPTION, "Failed to get StateManager");
   }
 
-  pState_ = std::make_unique<State>(tableName_, stateDir, getUUIDStr(), logger_);
-  if (!*pState_) {
-    return;
+  std::unordered_map<std::string, std::string> state_map;
+  if (state_manager_->get(state_map)) {
+    if (state_map[TABLENAME_KEY] != tableName_) {
+      state_manager_->clear();
+    } else {
+      for (auto&& elem : state_map) {
+        if (elem.first.find(MAXVALUE_KEY_PREFIX) == 0) {
+          mapState_.emplace(elem.first.substr(MAXVALUE_KEY_PREFIX.length()), std::move(elem.second));
+        }
+      }
+    }
+  } else {
+    // Try to migrate legacy state file
+    std::string stateDir;
+    context.getProperty(s_stateDirectory.getName(), stateDir);
+    if (!stateDir.empty()) {
+      LegacyState legacyState(tableName_, stateDir, getUUIDStr(), logger_);
+      if (legacyState) {
+        mapState_ = legacyState.getStateMap();
+        if (saveState() && state_manager_->persist()) {
+          logger_->log_info("State migration successful");
+          legacyState.moveStateFileToMigrated();
+        } else {
+          logger_->log_warn("Failed to persists migrated state");
+        }
+      } else {
+        logger_->log_warn("Could not migrate state from specified State Directory %s", stateDir);
+      }
+    }
   }
-  
-  mapState_ = pState_->mapState();
 
   // If 'listMaxValueColumnName_' doesn't match columns in mapState_, then clear mapState_.
   if (listMaxValueColumnName_.size() != mapState_.size()) {
@@ -366,7 +322,7 @@ void QueryDatabaseTable::processOnSchedule(const core::ProcessContext &context)
   }
 }
 
-void QueryDatabaseTable::processOnTrigger(core::ProcessSession &session) {
+void QueryDatabaseTable::processOnTrigger(core::ProcessSession& session) {
   const auto& selectQuery = getSelectQuery();
 
   logger_->log_info("QueryDatabaseTable: selectQuery: '%s'", selectQuery.c_str());
@@ -408,7 +364,7 @@ void QueryDatabaseTable::processOnTrigger(core::ProcessSession &session) {
       throw;
     }
 
-    pState_->writeStateToFile(mapState_);
+    saveState();
   }
 }
 
@@ -463,6 +419,15 @@ std::string QueryDatabaseTable::getSelectQuery() {
   return ret;
 }
 
+bool QueryDatabaseTable::saveState() {
+  std::unordered_map<std::string, std::string> state_map;
+  state_map.emplace(TABLENAME_KEY, tableName_);
+  for (const auto& elem : mapState_) {
+    state_map.emplace(MAXVALUE_KEY_PREFIX + elem.first, elem.second);
+  }
+  return state_manager_->set(state_map);
+}
+
 
 } /* namespace processors */
 } /* namespace minifi */
diff --git a/extensions/sql/processors/QueryDatabaseTable.h b/extensions/sql/processors/QueryDatabaseTable.h
index 412faab..44f1ed7 100644
--- a/extensions/sql/processors/QueryDatabaseTable.h
+++ b/extensions/sql/processors/QueryDatabaseTable.h
@@ -65,15 +65,18 @@ class QueryDatabaseTable: public SQLProcessor<QueryDatabaseTable>, public Output
     return true;
   }
 
-  void processOnSchedule(const core::ProcessContext& context);
+  void processOnSchedule(core::ProcessContext& context);
   void processOnTrigger(core::ProcessSession& session);
-  
+
   void initialize() override;
 
  private:
   std::string getSelectQuery();
 
+  bool saveState();
+
  private:
+  std::shared_ptr<core::CoreComponentStateManager> state_manager_;
   std::string tableName_;
   std::string columnNames_;
   std::string maxValueColumnNames_;
@@ -83,7 +86,6 @@ class QueryDatabaseTable: public SQLProcessor<QueryDatabaseTable>, public Output
   std::vector<std::string> listMaxValueColumnName_;
   std::unordered_map<std::string, std::string> mapState_;
   std::unordered_map<std::string, soci::data_type> mapColumnType_;
-  std::unique_ptr<State> pState_;
 };
 
 REGISTER_RESOURCE(QueryDatabaseTable, "QueryDatabaseTable to execute SELECT statement via ODBC.");
diff --git a/extensions/standard-processors/CMakeLists.txt b/extensions/standard-processors/CMakeLists.txt
index d49a29f..f4ca71d 100644
--- a/extensions/standard-processors/CMakeLists.txt
+++ b/extensions/standard-processors/CMakeLists.txt
@@ -20,7 +20,7 @@
 
 include(${CMAKE_SOURCE_DIR}/extensions/ExtensionHeader.txt)
 
-file(GLOB SOURCES  "processors/*.cpp" )
+file(GLOB SOURCES  "processors/*.cpp" "controllers/*.cpp" )
 
 add_library(minifi-standard-processors STATIC ${SOURCES})
 set_property(TARGET minifi-standard-processors PROPERTY POSITION_INDEPENDENT_CODE ON)
diff --git a/extensions/standard-processors/controllers/UnorderedMapKeyValueStoreService.cpp b/extensions/standard-processors/controllers/UnorderedMapKeyValueStoreService.cpp
new file mode 100644
index 0000000..bd654a3
--- /dev/null
+++ b/extensions/standard-processors/controllers/UnorderedMapKeyValueStoreService.cpp
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "UnorderedMapKeyValueStoreService.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace controllers {
+
+UnorderedMapKeyValueStoreService::UnorderedMapKeyValueStoreService(const std::string& name, const std::string& id)
+    : KeyValueStoreService(name, id)
+    , logger_(logging::LoggerFactory<UnorderedMapKeyValueStoreService>::getLogger()) {
+}
+
+UnorderedMapKeyValueStoreService::UnorderedMapKeyValueStoreService(const std::string& name, utils::Identifier uuid /*= utils::Identifier()*/)
+    : KeyValueStoreService(name, uuid)
+    , logger_(logging::LoggerFactory<UnorderedMapKeyValueStoreService>::getLogger()) {
+}
+
+UnorderedMapKeyValueStoreService::UnorderedMapKeyValueStoreService(const std::string& name, const std::shared_ptr<Configure> &configuration)
+    : KeyValueStoreService(name)
+    , logger_(logging::LoggerFactory<UnorderedMapKeyValueStoreService>::getLogger())  {
+  setConfiguration(configuration);
+  initialize();
+}
+
+UnorderedMapKeyValueStoreService::~UnorderedMapKeyValueStoreService() {
+}
+
+bool UnorderedMapKeyValueStoreService::set(const std::string& key, const std::string& value) {
+  std::lock_guard<std::recursive_mutex> lock(mutex_);
+  map_[key] = value;
+  return true;
+}
+
+bool UnorderedMapKeyValueStoreService::get(const std::string& key, std::string& value) {
+  std::lock_guard<std::recursive_mutex> lock(mutex_);
+  auto it = map_.find(key);
+  if (it == map_.end()) {
+    return false;
+  } else {
+    value = it->second;
+    return true;
+  }
+}
+
+bool UnorderedMapKeyValueStoreService::get(std::unordered_map<std::string, std::string>& kvs) {
+  std::lock_guard<std::recursive_mutex> lock(mutex_);
+  kvs = map_;
+  return true;
+}
+
+bool UnorderedMapKeyValueStoreService::remove(const std::string& key) {
+  std::lock_guard<std::recursive_mutex> lock(mutex_);
+  return map_.erase(key) == 1U;
+}
+
+bool UnorderedMapKeyValueStoreService::clear() {
+  std::lock_guard<std::recursive_mutex> lock(mutex_);
+  map_.clear();
+  return true;
+}
+
+bool UnorderedMapKeyValueStoreService::update(const std::string& key, const std::function<bool(bool /*exists*/, std::string& /*value*/)>& update_func) {
+  std::lock_guard<std::recursive_mutex> lock(mutex_);
+  bool exists = false;
+  std::string value;
+  auto it = map_.find(key);
+  if (it != map_.end()) {
+    exists = true;
+    value = it->second;
+  }
+  try {
+    if (!update_func(exists, value)) {
+      return false;
+    }
+  } catch (const std::exception& e) {
+    logger_->log_error("update_func failed with an exception: %s", e.what());
+    return false;
+  } catch (...) {
+    logger_->log_error("update_func failed with an exception");
+    return false;
+  }
+  if (!exists) {
+    it = map_.emplace(key, "").first;
+  }
+  it->second = std::move(value);
+  return true;
+}
+
+} /* namespace controllers */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
diff --git a/extensions/standard-processors/controllers/UnorderedMapKeyValueStoreService.h b/extensions/standard-processors/controllers/UnorderedMapKeyValueStoreService.h
new file mode 100644
index 0000000..e41e915
--- /dev/null
+++ b/extensions/standard-processors/controllers/UnorderedMapKeyValueStoreService.h
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __UNORDERED_MAP_KEY_VALUE_STORE_SERVICE_H__
+#define __UNORDERED_MAP_KEY_VALUE_STORE_SERVICE_H__
+
+#include "controllers/keyvalue/KeyValueStoreService.h"
+#include "core/Core.h"
+#include "properties/Configure.h"
+#include "core/logging/Logger.h"
+#include "core/logging/LoggerConfiguration.h"
+#include "core/Resource.h"
+
+#include <unordered_map>
+#include <string>
+#include <mutex>
+#include <memory>
+#include <utility>
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace controllers {
+
+class UnorderedMapKeyValueStoreService : virtual public KeyValueStoreService {
+ public:
+  explicit UnorderedMapKeyValueStoreService(const std::string& name, const std::string& id);
+  explicit UnorderedMapKeyValueStoreService(const std::string& name, utils::Identifier uuid = utils::Identifier());
+  explicit UnorderedMapKeyValueStoreService(const std::string& name, const std::shared_ptr<Configure>& configuration);
+
+  virtual ~UnorderedMapKeyValueStoreService();
+
+  virtual bool set(const std::string& key, const std::string& value) override;
+
+  virtual bool get(const std::string& key, std::string& value) override;
+
+  virtual bool get(std::unordered_map<std::string, std::string>& kvs) override;
+
+  virtual bool remove(const std::string& key) override;
+
+  virtual bool clear() override;
+
+  virtual bool update(const std::string& key, const std::function<bool(bool /*exists*/, std::string& /*value*/)>& update_func) override;
+
+ protected:
+  std::unordered_map<std::string, std::string> map_;
+  std::recursive_mutex mutex_;
+
+ private:
+  std::shared_ptr<logging::Logger> logger_;
+};
+
+REGISTER_RESOURCE(UnorderedMapKeyValueStoreService, "A key-value service implemented by a locked std::unordered_map<std::string, std::string>");
+
+} /* namespace controllers */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif /* __UNORDERED_MAP_KEY_VALUE_STORE_SERVICE_H__ */
diff --git a/extensions/standard-processors/controllers/UnorderedMapPersistableKeyValueStoreService.cpp b/extensions/standard-processors/controllers/UnorderedMapPersistableKeyValueStoreService.cpp
new file mode 100644
index 0000000..b41baf1
--- /dev/null
+++ b/extensions/standard-processors/controllers/UnorderedMapPersistableKeyValueStoreService.cpp
@@ -0,0 +1,260 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "UnorderedMapPersistableKeyValueStoreService.h"
+
+#include <fstream>
+
+#include "utils/file/FileUtils.h"
+#include "utils/StringUtils.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace controllers {
+
+constexpr int UnorderedMapPersistableKeyValueStoreService::FORMAT_VERSION;
+
+core::Property UnorderedMapPersistableKeyValueStoreService::File(
+    core::PropertyBuilder::createProperty("File")->withDescription("Path to a file to store state")
+        ->isRequired(true)->build());
+
+UnorderedMapPersistableKeyValueStoreService::UnorderedMapPersistableKeyValueStoreService(const std::string& name, const std::string& id)
+    : KeyValueStoreService(name, id)
+    , AbstractAutoPersistingKeyValueStoreService(name, id)
+    , UnorderedMapKeyValueStoreService(name, id)
+    , logger_(logging::LoggerFactory<UnorderedMapPersistableKeyValueStoreService>::getLogger()) {
+}
+
+UnorderedMapPersistableKeyValueStoreService::UnorderedMapPersistableKeyValueStoreService(const std::string& name, utils::Identifier uuid /*= utils::Identifier()*/)
+    : KeyValueStoreService(name, uuid)
+    , AbstractAutoPersistingKeyValueStoreService(name, uuid)
+    , UnorderedMapKeyValueStoreService(name, uuid)
+    , logger_(logging::LoggerFactory<UnorderedMapPersistableKeyValueStoreService>::getLogger()) {
+}
+
+UnorderedMapPersistableKeyValueStoreService::UnorderedMapPersistableKeyValueStoreService(const std::string& name, const std::shared_ptr<Configure> &configuration)
+    : KeyValueStoreService(name)
+    , AbstractAutoPersistingKeyValueStoreService(name)
+    , UnorderedMapKeyValueStoreService(name)
+    , logger_(logging::LoggerFactory<UnorderedMapPersistableKeyValueStoreService>::getLogger())  {
+  setConfiguration(configuration);
+  initialize();
+}
+
+UnorderedMapPersistableKeyValueStoreService::~UnorderedMapPersistableKeyValueStoreService() {
+  persist();
+}
+
+std::string UnorderedMapPersistableKeyValueStoreService::escape(const std::string& str) {
+  std::stringstream escaped;
+  for (const auto c : str) {
+    switch (c) {
+      case '\\':
+        escaped << "\\\\";
+        break;
+      case '\n':
+        escaped << "\\n";
+        break;
+      case '=':
+        escaped << "\\=";
+        break;
+      default:
+        escaped << c;
+        break;
+    }
+  }
+  return escaped.str();
+}
+
+bool UnorderedMapPersistableKeyValueStoreService::parseLine(const std::string& line, std::string& key, std::string& value) {
+  std::stringstream key_ss;
+  std::stringstream value_ss;
+  bool in_escape_sequence = false;
+  bool key_complete = false;
+  for (const auto c : line) {
+    auto& current = key_complete ? value_ss : key_ss;
+    if (in_escape_sequence) {
+      switch (c) {
+        case '\\':
+          current << '\\';
+          break;
+        case 'n':
+          current << '\n';
+          break;
+        case '=':
+          current << '=';
+          break;
+        default:
+          logger_->log_error("Invalid escape sequence in \"%s\": \"\\%c\"", line.c_str(), c);
+          return false;
+      }
+      in_escape_sequence = false;
+    } else {
+      if (c == '\\') {
+        in_escape_sequence = true;
+      } else if (c == '=') {
+        if (key_complete) {
+          logger_->log_error("Unterminated \'=\' in line \"%s\"", line.c_str());
+          return false;
+        } else {
+          key_complete = true;
+        }
+      } else {
+        current << c;
+      }
+    }
+  }
+  if (in_escape_sequence) {
+    logger_->log_error("Unterminated escape sequence in \"%s\"", line.c_str());
+    return false;
+  }
+  if (!key_complete) {
+    logger_->log_error("Key not found in \"%s\"", line.c_str());
+    return false;
+  }
+  key = key_ss.str();
+  if (key.empty()) {
+    logger_->log_error("Line with empty key found in \"%s\": \"%s\"", file_.c_str(), line.c_str());
+    return false;
+  }
+  value = value_ss.str();
+  return true;
+}
+
+void UnorderedMapPersistableKeyValueStoreService::initialize() {
+  AbstractAutoPersistingKeyValueStoreService::initialize();
+  std::set<core::Property> supportedProperties;
+  supportedProperties.insert(File);
+  updateSupportedProperties(supportedProperties);
+}
+
+void UnorderedMapPersistableKeyValueStoreService::onEnable() {
+  if (configuration_ == nullptr) {
+    logger_->log_debug("Cannot enable UnorderedMapPersistableKeyValueStoreService");
+    return;
+  }
+
+  if (!getProperty(File.getName(), file_)) {
+    logger_->log_error("Invalid or missing property: File");
+    return;
+  }
+
+  /* We must not start the persistence thread until we attempted to load the state */
+  load();
+
+  AbstractAutoPersistingKeyValueStoreService::onEnable();
+
+  logger_->log_trace("Enabled UnorderedMapPersistableKeyValueStoreService");
+}
+
+void UnorderedMapPersistableKeyValueStoreService::notifyStop() {
+  AbstractAutoPersistingKeyValueStoreService::notifyStop();
+  persist();
+}
+
+bool UnorderedMapPersistableKeyValueStoreService::set(const std::string& key, const std::string& value) {
+  std::lock_guard<std::recursive_mutex> lock(mutex_);
+  bool res = UnorderedMapKeyValueStoreService::set(key, value);
+  if (always_persist_ && res) {
+    return persist();
+  }
+  return res;
+}
+
+bool UnorderedMapPersistableKeyValueStoreService::remove(const std::string& key) {
+  std::lock_guard<std::recursive_mutex> lock(mutex_);
+  bool res = UnorderedMapKeyValueStoreService::remove(key);
+  if (always_persist_ && res) {
+    return persist();
+  }
+  return res;
+}
+
+bool UnorderedMapPersistableKeyValueStoreService::clear() {
+  std::lock_guard<std::recursive_mutex> lock(mutex_);
+  bool res = UnorderedMapKeyValueStoreService::clear();
+  if (always_persist_ && res) {
+    return persist();
+  }
+  return res;
+}
+
+bool UnorderedMapPersistableKeyValueStoreService::update(const std::string& key, const std::function<bool(bool /*exists*/, std::string& /*value*/)>& update_func) {
+  std::lock_guard<std::recursive_mutex> lock(mutex_);
+  bool res = UnorderedMapKeyValueStoreService::update(key, update_func);
+  if (always_persist_ && res) {
+    return persist();
+  }
+  return res;
+}
+
+bool UnorderedMapPersistableKeyValueStoreService::persist() {
+  std::lock_guard<std::recursive_mutex> lock(mutex_);
+  std::ofstream ofs(file_);
+  if (!ofs.is_open()) {
+    logger_->log_error("Failed to open file \"%s\" to store state", file_.c_str());
+    return false;
+  }
+  ofs << escape(FORMAT_VERSION_KEY) << "=" << escape(std::to_string(FORMAT_VERSION)) << "\n";
+  for (const auto& kv : map_) {
+    ofs << escape(kv.first) << "=" << escape(kv.second) << "\n";
+  }
+  return true;
+}
+
+bool UnorderedMapPersistableKeyValueStoreService::load() {
+  std::lock_guard<std::recursive_mutex> lock(mutex_);
+  std::ifstream ifs(file_);
+  if (!ifs.is_open()) {
+    logger_->log_debug("Failed to open file \"%s\" to load state", file_.c_str());
+    return false;
+  }
+  std::unordered_map<std::string, std::string> map;
+  std::string line;
+  while (std::getline(ifs, line)) {
+    std::string key, value;
+    if (!parseLine(line, key, value)) {
+      continue;
+    }
+    if (key == FORMAT_VERSION_KEY) {
+      int format_version = 0;
+      try {
+        format_version = std::stoi(value);
+      } catch (...) {
+        logger_->log_error("Invalid format version number found in \"%s\": \"%s\"", file_.c_str(), value.c_str());
+        return false;
+      }
+      if (format_version > FORMAT_VERSION) {
+        logger_->log_error("\"%s\" has been serialized with a larger format version than currently known: %d > %d", file_.c_str(), format_version, FORMAT_VERSION);
+        return false;
+      }
+    } else {
+        map[key] = value;
+    }
+  }
+  map_ = std::move(map);
+  logger_->log_debug("Loaded state from \"%s\"", file_.c_str());
+  return true;
+}
+
+} /* namespace controllers */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
diff --git a/extensions/standard-processors/controllers/UnorderedMapPersistableKeyValueStoreService.h b/extensions/standard-processors/controllers/UnorderedMapPersistableKeyValueStoreService.h
new file mode 100644
index 0000000..f098d7f
--- /dev/null
+++ b/extensions/standard-processors/controllers/UnorderedMapPersistableKeyValueStoreService.h
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __UNORDERED_MAP_PERSISTABLE_KEY_VALUE_STORE_SERVICE_H__
+#define __UNORDERED_MAP_PERSISTABLE_KEY_VALUE_STORE_SERVICE_H__
+
+#include "controllers/keyvalue/AbstractAutoPersistingKeyValueStoreService.h"
+#include "UnorderedMapKeyValueStoreService.h"
+#include "core/Core.h"
+#include "properties/Configure.h"
+#include "core/logging/Logger.h"
+#include "core/logging/LoggerConfiguration.h"
+#include "core/Resource.h"
+
+#include <unordered_map>
+#include <string>
+#include <mutex>
+#include <memory>
+#include <utility>
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace controllers {
+
+class UnorderedMapPersistableKeyValueStoreService : public AbstractAutoPersistingKeyValueStoreService,
+                                                    public UnorderedMapKeyValueStoreService {
+ public:
+  explicit UnorderedMapPersistableKeyValueStoreService(const std::string& name, const std::string& id);
+  explicit UnorderedMapPersistableKeyValueStoreService(const std::string& name, utils::Identifier uuid = utils::Identifier());
+  explicit UnorderedMapPersistableKeyValueStoreService(const std::string& name, const std::shared_ptr<Configure>& configuration);
+
+  virtual ~UnorderedMapPersistableKeyValueStoreService();
+
+  static core::Property File;
+
+  virtual void onEnable() override;
+  virtual void initialize() override;
+  virtual void notifyStop() override;
+
+  virtual bool set(const std::string& key, const std::string& value) override;
+
+  virtual bool remove(const std::string& key) override;
+
+  virtual bool clear() override;
+
+  virtual bool update(const std::string& key, const std::function<bool(bool /*exists*/, std::string& /*value*/)>& update_func) override;
+
+  virtual bool persist() override;
+
+ protected:
+  static constexpr const char* FORMAT_VERSION_KEY = "__UnorderedMapPersistableKeyValueStoreService_FormatVersion";
+  static constexpr int FORMAT_VERSION = 1;
+
+  std::string file_;
+
+  bool load();
+
+  std::string escape(const std::string& str);
+  bool parseLine(const std::string& line, std::string& key, std::string& value);
+
+ private:
+  std::shared_ptr<logging::Logger> logger_;
+};
+
+REGISTER_RESOURCE(UnorderedMapPersistableKeyValueStoreService, "A persistable key-value service implemented by a locked std::unordered_map<std::string, std::string> and persisted into a file");
+
+} /* namespace controllers */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif /* __UNORDERED_MAP_PERSISTABLE_KEY_VALUE_STORE_SERVICE_H__ */
diff --git a/extensions/standard-processors/processors/TailFile.cpp b/extensions/standard-processors/processors/TailFile.cpp
index 9e7bbf1..0a52a93 100644
--- a/extensions/standard-processors/processors/TailFile.cpp
+++ b/extensions/standard-processors/processors/TailFile.cpp
@@ -69,8 +69,7 @@ namespace minifi {
 namespace processors {
 
 core::Property TailFile::FileName("File to Tail", "Fully-qualified filename of the file that should be tailed when using single file mode, or a file regex when using multifile mode", "");
-core::Property TailFile::StateFile("State File", "Specifies the file that should be used for storing state about"
-                                   " what data has been ingested so that upon restart NiFi can resume from where it left off",
+core::Property TailFile::StateFile("State File", "DEPRECATED. Only use it for state migration from the legacy state file.",
                                    "TailFileState");
 core::Property TailFile::Delimiter("Input Delimiter", "Specifies the character that should be used for delimiting the data being tailed"
                                    "from the incoming file."
@@ -113,6 +112,11 @@ void TailFile::onSchedule(const std::shared_ptr<core::ProcessContext> &context,
   tail_states_.clear();
   state_recovered_ = false;
 
+  state_manager_ = context->getStateManager();
+  if (state_manager_ == nullptr) {
+    throw Exception(PROCESSOR_EXCEPTION, "Failed to get StateManager");
+  }
+
   std::string value;
 
   if (context->getProperty(Delimiter.getName(), value)) {
@@ -202,7 +206,7 @@ void TailFile::parseStateFileLine(char *buf) {
   if (key == "FILENAME") {
     std::string fileLocation, fileName;
     if (utils::file::PathUtils::getFileNameAndPath(value, fileLocation, fileName)) {
-      logger_->log_debug("Received path %s, file %s", fileLocation, fileName);
+      logger_->log_debug("State migration received path %s, file %s", fileLocation, fileName);
       tail_states_.insert(std::make_pair(fileName, TailState { fileLocation, fileName, 0, 0 }));
     } else {
       tail_states_.insert(std::make_pair(value, TailState { fileLocation, value, 0, 0 }));
@@ -232,20 +236,61 @@ void TailFile::parseStateFileLine(char *buf) {
     const auto file = key.substr(strlen(POSITION_STR));
     tail_states_[file].currentTailFilePosition_ = std::stoull(value);
   }
-
-  return;
 }
 
-bool TailFile::recoverState() {
-  std::ifstream file(state_file_.c_str(), std::ifstream::in);
-  if (!file.good()) {
-    logger_->log_error("load state file failed %s", state_file_);
-    return false;
+
+
+bool TailFile::recoverState(const std::shared_ptr<core::ProcessContext>& context) {
+  bool state_load_success = false;
+
+  std::unordered_map<std::string, std::string> state_map;
+  if (state_manager_->get(state_map)) {
+    std::map<std::string, TailState> new_tail_states;
+    size_t i = 0;
+    while (true) {
+      std::string name;
+      try {
+        name = state_map.at("file." + std::to_string(i) + ".name");
+      } catch (...) {
+        break;
+      }
+      try {
+        const std::string& current = state_map.at("file." + std::to_string(i) + ".current");
+        uint64_t position = std::stoull(state_map.at("file." + std::to_string(i) + ".position"));
+
+        std::string fileLocation, fileName;
+        if (utils::file::PathUtils::getFileNameAndPath(current, fileLocation, fileName)) {
+          logger_->log_debug("Received path %s, file %s", fileLocation, fileName);
+          new_tail_states.emplace(fileName, TailState { fileLocation, fileName, position, 0 });
+        } else {
+          new_tail_states.emplace(current, TailState { fileLocation, current, position, 0 });
+        }
+      } catch (...) {
+        continue;
+      }
+      ++i;
+    }
+    state_load_success = true;
+    tail_states_ = std::move(new_tail_states);
+    for (const auto& s : tail_states_) {
+      logger_->log_debug("TailState %s: %s, %s, %llu, %llu", s.first, s.second.path_, s.second.current_file_name_, s.second.currentTailFilePosition_, s.second.currentTailFileModificationTime_);
+    }
+  } else {
+    logger_->log_info("Found no stored state");
   }
-  tail_states_.clear();
-  char buf[BUFFER_SIZE];
-  for (file.getline(buf, BUFFER_SIZE); file.good(); file.getline(buf, BUFFER_SIZE)) {
-    parseStateFileLine(buf);
+
+  /* We could not get the state from the StateManager, try to migrate the old state file if it exists */
+  if (!state_load_success) {
+    std::ifstream file(state_file_.c_str(), std::ifstream::in);
+    if (!file.good()) {
+      logger_->log_error("load state file failed %s", state_file_);
+      return false;
+    }
+    tail_states_.clear();
+    char buf[BUFFER_SIZE];
+    for (file.getline(buf, BUFFER_SIZE); file.good(); file.getline(buf, BUFFER_SIZE)) {
+      parseStateFileLine(buf);
+    }
   }
 
   /**
@@ -264,28 +309,34 @@ bool TailFile::recoverState() {
     }
   }
 
-  logger_->log_debug("load state file succeeded for %s", state_file_);
+  logger_->log_debug("load state succeeded");
+
+  /* Save the state to the state manager */
+  storeState(context);
+
   return true;
 }
 
-void TailFile::storeState() {
-  std::ofstream file(state_file_.c_str());
-  if (!file.is_open()) {
-    logger_->log_error("store state file failed %s", state_file_);
-    return;
+bool TailFile::storeState(const std::shared_ptr<core::ProcessContext>& context) {
+  std::unordered_map<std::string, std::string> state;
+  size_t i = 0;
+  for (const auto& tail_state : tail_states_) {
+    state["file." + std::to_string(i) + ".name"] = tail_state.first;
+    state["file." + std::to_string(i) + ".current"] = utils::file::FileUtils::concat_path(tail_state.second.path_, tail_state.second.current_file_name_);
+    state["file." + std::to_string(i) + ".position"] = std::to_string(tail_state.second.currentTailFilePosition_);
+    ++i;
   }
-  for (const auto &state : tail_states_) {
-    file << "FILENAME=" << state.first << "\n";
-    file << CURRENT_STR << state.first << "=" << state.second.path_ << utils::file::FileUtils::get_separator() << state.second.current_file_name_ << "\n";
-    file << POSITION_STR << state.first << "=" << state.second.currentTailFilePosition_ << "\n";
+  if (!state_manager_->set(state)) {
+    logger_->log_error("Failed to set state");
+    return false;
   }
-  file.close();
+  return true;
 }
 
 static bool sortTailMatchedFileItem(TailMatchedFileItem i, TailMatchedFileItem j) {
   return (i.modifiedTime < j.modifiedTime);
 }
-void TailFile::checkRollOver(TailState &file, const std::string &base_file_name) {
+void TailFile::checkRollOver(const std::shared_ptr<core::ProcessContext>& context, TailState &file, const std::string &base_file_name) {
   struct stat statbuf;
   std::vector<TailMatchedFileItem> matchedFiles;
   std::string fullPath = file.path_ + utils::file::FileUtils::get_separator() + file.current_file_name_;
@@ -339,7 +390,7 @@ void TailFile::checkRollOver(TailState &file, const std::string &base_file_name)
 
     file.current_file_name_ = item.fileName;
 
-    storeState();
+    storeState(context);
   }
 }
 
@@ -352,7 +403,7 @@ void TailFile::onTrigger(const std::shared_ptr<core::ProcessContext> &context, c
   if (!this->state_recovered_) {
     state_recovered_ = true;
     // recover the state if we have not done so
-    this->recoverState();
+    this->recoverState(context);
   }
 
   /**
@@ -361,8 +412,7 @@ void TailFile::onTrigger(const std::shared_ptr<core::ProcessContext> &context, c
   for (auto &state : tail_states_) {
     auto fileLocation = state.second.path_;
 
-    logger_->log_debug("Tailing file %s from %llu", fileLocation, state.second.currentTailFilePosition_);
-    checkRollOver(state.second, state.first);
+    checkRollOver(context, state.second, state.first);
     std::string fullPath = fileLocation + utils::file::FileUtils::get_separator() + state.second.current_file_name_;
     struct stat statbuf;
 
@@ -414,7 +464,7 @@ void TailFile::onTrigger(const std::shared_ptr<core::ProcessContext> &context, c
           ffr->updateKeyedAttribute(FILENAME, logName);
           session->transfer(ffr, Success);
           state.second.currentTailFilePosition_ += ffr->getSize() + 1;
-          storeState();
+          storeState(context);
         }
 
       } else {
@@ -429,7 +479,7 @@ void TailFile::onTrigger(const std::shared_ptr<core::ProcessContext> &context, c
               + extension;
           flowFile->updateKeyedAttribute(FILENAME, logName);
           state.second.currentTailFilePosition_ += flowFile->getSize();
-          storeState();
+          storeState(context);
         }
       }
       state.second.currentTailFileModificationTime_ = ((uint64_t) (statbuf.st_mtime) * 1000);
diff --git a/extensions/standard-processors/processors/TailFile.h b/extensions/standard-processors/processors/TailFile.h
index 470689a..1443af0 100644
--- a/extensions/standard-processors/processors/TailFile.h
+++ b/extensions/standard-processors/processors/TailFile.h
@@ -64,7 +64,6 @@ class TailFile : public core::Processor {
   }
   // Destructor
   virtual ~TailFile() {
-    storeState();
   }
   // Processor Name
   static constexpr char const* ProcessorName = "TailFile";
@@ -92,9 +91,9 @@ class TailFile : public core::Processor {
   // Initialize, over write by NiFi TailFile
   void initialize(void) override;
   // recoverState
-  bool recoverState();
+  bool recoverState(const std::shared_ptr<core::ProcessContext>& context);
   // storeState
-  void storeState();
+  bool storeState(const std::shared_ptr<core::ProcessContext>& context);
 
  private:
 
@@ -105,6 +104,8 @@ class TailFile : public core::Processor {
   std::string state_file_;
   // Delimiter for the data incoming from the tailed file.
   std::string delimiter_;
+  // StateManager
+  std::shared_ptr<core::CoreComponentStateManager> state_manager_;
   // determine if state is recovered;
   bool state_recovered_;
 
@@ -119,7 +120,7 @@ class TailFile : public core::Processor {
   /**
    * Check roll over for the provided file.
    */
-  void checkRollOver(TailState &file, const std::string &base_file_name);
+  void checkRollOver(const std::shared_ptr<core::ProcessContext>& context, TailState &file, const std::string &base_file_name);
   std::shared_ptr<logging::Logger> logger_;
 };
 
diff --git a/extensions/standard-processors/tests/unit/TailFileTests.cpp b/extensions/standard-processors/tests/unit/TailFileTests.cpp
index b715fde..a255c92 100644
--- a/extensions/standard-processors/tests/unit/TailFileTests.cpp
+++ b/extensions/standard-processors/tests/unit/TailFileTests.cpp
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-#include <stdio.h>
+#include <cstdio>
 #include <fstream>
 #include <map>
 #include <memory>
@@ -32,6 +32,7 @@
 #include "core/Core.h"
 #include "core/FlowFile.h"
 #include "utils/file/FileUtils.h"
+#include "utils/file/PathUtils.h"
 #include "unit/ProvenanceTestHelper.h"
 #include "core/Processor.h"
 #include "core/ProcessContext.h"
@@ -45,6 +46,7 @@ static std::string NEWLINE_FILE = ""  // NOLINT
         "four,five,six, seven";
 static const char *TMP_FILE = "minifi-tmpfile.txt";
 static const char *STATE_FILE = "minifi-state-file.txt";
+
 TEST_CASE("TailFileWithDelimiter", "[tailfiletest2]") {
   // Create and write to the test file
 
@@ -72,7 +74,6 @@ TEST_CASE("TailFileWithDelimiter", "[tailfiletest2]") {
   state_file << dir << utils::file::FileUtils::get_separator() << STATE_FILE;
 
   plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::FileName.getName(), temp_file.str());
-  plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::StateFile.getName(), state_file.str());
   plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::Delimiter.getName(), "\n");
 
   testController.runSession(plan, false);
@@ -85,10 +86,6 @@ TEST_CASE("TailFileWithDelimiter", "[tailfiletest2]") {
   REQUIRE(LogTestController::getInstance().contains("Size:" + std::to_string(NEWLINE_FILE.find_first_of('\n')) + " Offset:0"));
 
   LogTestController::getInstance().reset();
-
-  // Delete the test and state file.
-
-  remove(std::string(std::string(STATE_FILE) + "." + id).c_str());
 }
 
 TEST_CASE("TestNewContent", "[tailFileWithDelimiterState]") {
@@ -116,11 +113,7 @@ TEST_CASE("TestNewContent", "[tailFileWithDelimiterState]") {
   tmpfile << NEWLINE_FILE;
   tmpfile.close();
 
-  std::stringstream state_file;
-  state_file << dir << utils::file::FileUtils::get_separator() << STATE_FILE;
-
   plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::FileName.getName(), temp_file.str());
-  plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::StateFile.getName(), state_file.str());
   plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::Delimiter.getName(), "\n");
 
   testController.runSession(plan, true);
@@ -134,14 +127,9 @@ TEST_CASE("TestNewContent", "[tailFileWithDelimiterState]") {
   appendStream << std::endl;
   testController.runSession(plan, true);
 
-  REQUIRE(LogTestController::getInstance().contains("position 14"));
   REQUIRE(LogTestController::getInstance().contains("minifi-tmpfile.14-34.txt"));
 
   LogTestController::getInstance().reset();
-
-  // Delete the test and state file.
-
-  remove(std::string(std::string(STATE_FILE) + "." + id).c_str());
 }
 
 TEST_CASE("TestDeleteState", "[tailFileWithDelimiterState]") {
@@ -170,27 +158,22 @@ TEST_CASE("TestDeleteState", "[tailFileWithDelimiterState]") {
   tmpfile << NEWLINE_FILE;
   tmpfile.close();
 
-  std::stringstream state_file;
-  state_file << dir << utils::file::FileUtils::get_separator() << STATE_FILE;
-
   plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::FileName.getName(), temp_file.str());
-  plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::StateFile.getName(), state_file.str());
   plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::Delimiter.getName(), "\n");
+
   testController.runSession(plan, true);
 
   REQUIRE(LogTestController::getInstance().contains("minifi-tmpfile.0-13.txt"));
 
   plan->reset(true);  // start a new but with state file
-  remove(std::string(state_file.str() + "." + id).c_str());
+  LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output);
 
-  testController.runSession(plan, true);
+  plan->getStateManagerProvider()->getCoreComponentStateManager(*tailfile)->clear();
 
-  REQUIRE(LogTestController::getInstance().contains("position 0"));
+  testController.runSession(plan, true);
 
   // if we lose state we restart
   REQUIRE(LogTestController::getInstance().contains("minifi-tmpfile.0-13.txt"));
-
-  // Delete the test and state file.
 }
 
 TEST_CASE("TestChangeState", "[tailFileWithDelimiterState]") {
@@ -222,65 +205,46 @@ TEST_CASE("TestChangeState", "[tailFileWithDelimiterState]") {
   appendStream.write("\n", 1);
   appendStream.close();
 
-  std::stringstream state_file;
-  state_file << dir << utils::file::FileUtils::get_separator() << STATE_FILE;
-
   plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::FileName.getName(), temp_file.str());
-  plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::StateFile.getName(), state_file.str());
   plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::Delimiter.getName(), "\n");
 
   testController.runSession(plan, true);
-
   REQUIRE(LogTestController::getInstance().contains("minifi-tmpfile.0-13.txt"));
 
+  std::string filePath, fileName;
+  REQUIRE(utils::file::PathUtils::getFileNameAndPath(temp_file.str(), filePath, fileName));
 
   // should stay the same
   for (int i = 0; i < 5; i++) {
     plan->reset(true);  // start a new but with state file
 
-    auto statefile = state_file.str() + "." + id;
-
-    remove(statefile.c_str());
-
-    std::ofstream newstatefile;
-    newstatefile.open(statefile);
-    newstatefile << "FILENAME=" << temp_file.str() << std::endl;
-    newstatefile << "POSITION=14" << std::endl;
-    newstatefile.close();
+    plan->getStateManagerProvider()->getCoreComponentStateManager(*tailfile)->set({{"file.0.name", fileName},
+                                                                                   {"file.0.position", "14"},
+                                                                                   {"file.0.current", temp_file.str()}});
 
     testController.runSession(plan, true);
 
-    REQUIRE(LogTestController::getInstance().contains("position 14"));
-
     // if we lose state we restart
     REQUIRE(LogTestController::getInstance().contains("minifi-tmpfile.14-34.txt"));
   }
   for (int i = 14; i < 34; i++) {
     plan->reset(true);  // start a new but with state file
 
-    auto statefile = state_file.str() + "." + id;
+    plan->getStateManagerProvider()->getCoreComponentStateManager(*tailfile)->set({{"file.0.name", fileName},
+                                                                                   {"file.0.position", std::to_string(i)},
+                                                                                   {"file.0.current", temp_file.str()}});
 
-    remove(statefile.c_str());
-
-    std::ofstream newstatefile;
-    newstatefile.open(statefile);
-    newstatefile << "FILENAME=" << temp_file.str() << std::endl;
-    newstatefile << "POSITION=" << i << std::endl;
-    newstatefile.close();
     testController.runSession(plan, true);
-    REQUIRE(LogTestController::getInstance().contains("position " + std::to_string(i)));
   }
 
   plan->runCurrentProcessor();
   for (int i = 14; i < 34; i++) {
     REQUIRE(LogTestController::getInstance().contains("minifi-tmpfile." + std::to_string(i) + "-34.txt"));
   }
-  // Delete the test and state file.
-
-  remove(std::string(state_file.str() + "." + id).c_str());
 }
 
-TEST_CASE("TestInvalidState", "[tailFileWithDelimiterState]") {
+
+TEST_CASE("TestStateMigration", "[tailFileStateMigration]") {
   // Create and write to the test file
   TestController testController;
   LogTestController::getInstance().setTrace<minifi::processors::TailFile>();
@@ -288,79 +252,110 @@ TEST_CASE("TestInvalidState", "[tailFileWithDelimiterState]") {
   LogTestController::getInstance().setTrace<TestPlan>();
   LogTestController::getInstance().setDebug<minifi::processors::LogAttribute>();
 
-  std::shared_ptr<TestPlan> plan = testController.createPlan();
-  std::shared_ptr<core::Processor> tailfile = plan->addProcessor("TailFile", "tailfileProc");
+  auto plan = testController.createPlan();
+  auto tailfile = plan->addProcessor("TailFile", "tailfileProc");
   auto id = tailfile->getUUIDStr();
 
-  plan->addProcessor("LogAttribute", "logattribute", core::Relationship("success", "description"), true);
+  auto logattribute = plan->addProcessor("LogAttribute", "logattribute", core::Relationship("success", "description"), true);
+  plan->setProperty(logattribute, org::apache::nifi::minifi::processors::LogAttribute::FlowFilesToLog.getName(), "0");
 
   char format[] = "/tmp/gt.XXXXXX";
   auto dir = testController.createTempDirectory(format);
-  std::stringstream temp_file;
-  temp_file << dir << utils::file::FileUtils::get_separator() << TMP_FILE;
 
-  std::ofstream tmpfile;
-  tmpfile.open(temp_file.str());
-  tmpfile << NEWLINE_FILE;
-  tmpfile.close();
+  auto createTempFile = [&](const std::string& file_name) -> std::string /*file_path*/ {
+    std::stringstream temp_file;
+    temp_file << dir << utils::file::FileUtils::get_separator() << file_name;
 
-  std::ofstream appendStream;
-  appendStream.open(temp_file.str(), std::ios_base::app);
-  appendStream.write("\n", 1);
-  appendStream.close();
+    std::ofstream tmpfile;
+    tmpfile.open(temp_file.str(), std::ios::out | std::ios::binary);
+    tmpfile << NEWLINE_FILE << "\n";
+    tmpfile.close();
+
+    return temp_file.str();
+  };
 
   std::stringstream state_file;
   state_file << dir << utils::file::FileUtils::get_separator() << STATE_FILE;
 
-  plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::FileName.getName(), temp_file.str());
-  plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::StateFile.getName(), state_file.str());
-  plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::Delimiter.getName(), "\n");
+  auto statefile = state_file.str() + "." + id;
 
-  testController.runSession(plan, true);
+  SECTION("single") {
+    const std::string temp_file = createTempFile(TMP_FILE);
 
-#ifdef WIN32
-  REQUIRE(LogTestController::getInstance().contains("minifi-tmpfile.0-14.txt"));
-#else
-  REQUIRE(LogTestController::getInstance().contains("minifi-tmpfile.0-13.txt"));
-#endif
+    plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::FileName.getName(), temp_file);
+    plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::StateFile.getName(), state_file.str());
+    plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::Delimiter.getName(), "\n");
 
-  plan->reset(true);  // start a new but with state file
+    std::ofstream newstatefile;
+    newstatefile.open(statefile);
+    SECTION("legacy") {
+      newstatefile << "FILENAME=" << temp_file << std::endl;
+      newstatefile << "POSITION=14" << std::endl;
+    }
+    SECTION("newer single") {
+      newstatefile << "FILENAME=" << TMP_FILE << std::endl;
+      newstatefile << "POSITION." << TMP_FILE << "=14" << std::endl;
+      newstatefile << "CURRENT." << TMP_FILE << "=" << temp_file << std::endl;
+    }
+    newstatefile.close();
 
-  auto statefile = state_file.str() + "." + id;
+    testController.runSession(plan, true);
+    REQUIRE(LogTestController::getInstance().contains("minifi-tmpfile.14-34.txt"));
 
-  remove(statefile.c_str());
+    std::unordered_map<std::string, std::string> state;
+    REQUIRE(plan->getStateManagerProvider()->getCoreComponentStateManager(*tailfile)->get(state));
 
-  SECTION("No Filename") {
-  std::ofstream newstatefile;
-  newstatefile.open(statefile);
-  newstatefile << "POSITION=14" << std::endl;
-  newstatefile.close();
-  REQUIRE_THROWS(testController.runSession(plan, true));
+    std::string filePath, fileName;
+    REQUIRE(utils::file::PathUtils::getFileNameAndPath(temp_file, filePath, fileName));
+    std::unordered_map<std::string, std::string> expected_state{{"file.0.name", fileName},
+                                                                {"file.0.position", "35"},
+                                                                {"file.0.current", temp_file}};
+    REQUIRE(expected_state == state);
   }
 
-  SECTION("Invalid current filename") {
-  std::ofstream newstatefile;
-  newstatefile.open(statefile);
-  newstatefile << "FILENAME=minifi-tmpfile.txt" << std::endl;
-  newstatefile << "CURRENT.minifi-tempfile.txt=minifi-tmpfile.txt" << std::endl;
-  newstatefile << "POSITION=14" << std::endl;
-  newstatefile.close();
-  REQUIRE_THROWS(testController.runSession(plan, true));
-  }
-  SECTION("No current filename and partial path") {
-  std::ofstream newstatefile;
-  newstatefile.open(statefile);
-  newstatefile << "FILENAME=minifi-tmpfile.txt" << std::endl;
-  newstatefile << "POSITION=14" << std::endl;
-  newstatefile.close();
-  REQUIRE_THROWS(testController.runSession(plan, true));
-  }
+  SECTION("multiple") {
+    const std::string file_name_1 = "bar.txt";
+    const std::string file_name_2 = "foo.txt";
+    const std::string temp_file_1 = createTempFile(file_name_1);
+    const std::string temp_file_2 = createTempFile(file_name_2);
+
+    plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::TailMode.getName(), "Multiple file");
+    plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::BaseDirectory.getName(), dir);
+    plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::FileName.getName(), ".*");
+    plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::StateFile.getName(), state_file.str());
+    plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::Delimiter.getName(), "\n");
 
-// Delete the test and state file.
+    std::ofstream newstatefile;
+    newstatefile.open(statefile);
+    newstatefile << "FILENAME=" << file_name_1 << std::endl;
+    newstatefile << "POSITION." << file_name_1 << "=14" << std::endl;
+    newstatefile << "CURRENT." << file_name_1 << "=" << temp_file_1 << std::endl;
+    newstatefile << "FILENAME=" << file_name_2 << std::endl;
+    newstatefile << "POSITION." << file_name_2 << "=15" << std::endl;
+    newstatefile << "CURRENT." << file_name_2 << "=" << temp_file_2 << std::endl;
+    newstatefile.close();
 
-  remove(std::string(std::string(STATE_FILE) + "." + id).c_str());
+    testController.runSession(plan, true);
+    REQUIRE(LogTestController::getInstance().contains(file_name_1.substr(0, file_name_1.rfind('.')) + ".14-34.txt"));
+    REQUIRE(LogTestController::getInstance().contains(file_name_2.substr(0, file_name_2.rfind('.')) + ".15-34.txt"));
+
+    std::unordered_map<std::string, std::string> state;
+    REQUIRE(plan->getStateManagerProvider()->getCoreComponentStateManager(*tailfile)->get(state));
+
+    std::string filePath1, filePath2, fileName1, fileName2;
+    REQUIRE(utils::file::PathUtils::getFileNameAndPath(temp_file_1, filePath1, fileName1));
+    REQUIRE(utils::file::PathUtils::getFileNameAndPath(temp_file_2, filePath2, fileName2));
+    std::unordered_map<std::string, std::string> expected_state{{"file.0.name", fileName1},
+                                                                {"file.0.position", "35"},
+                                                                {"file.0.current", temp_file_1},
+                                                                {"file.1.name", fileName2},
+                                                                {"file.1.position", "35"},
+                                                                {"file.1.current", temp_file_2}};
+    REQUIRE(expected_state == state);
+  }
 }
 
+
 TEST_CASE("TailFileWithOutDelimiter", "[tailfiletest2]") {
   // Create and write to the test file
 
@@ -385,11 +380,15 @@ TEST_CASE("TailFileWithOutDelimiter", "[tailfiletest2]") {
   tmpfile << NEWLINE_FILE;
   tmpfile.close();
 
-  std::stringstream state_file;
-  state_file << dir << utils::file::FileUtils::get_separator() << STATE_FILE;
+  SECTION("Single") {
+    plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::FileName.getName(), temp_file.str());
+  }
 
-  plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::FileName.getName(), temp_file.str());
-  plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::StateFile.getName(), state_file.str());
+  SECTION("Multiple") {
+    plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::FileName.getName(), "minifi-.*\\.txt");
+    plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::TailMode.getName(), "Multiple file");
+    plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::BaseDirectory.getName(), dir);
+  }
 
   testController.runSession(plan, false);
   auto records = plan->getProvenanceRecords();
@@ -432,11 +431,7 @@ TEST_CASE("TailFileLongWithDelimiter", "[tailfiletest2]") {
   tmpfile << line1 << "\n" << line2 << "\n" << line3 << "\n" << line4;
   tmpfile.close();
 
-  std::stringstream state_file;
-  state_file << dir << utils::file::FileUtils::get_separator() << STATE_FILE;
-
   plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::FileName.getName(), temp_file.str());
-  plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::StateFile.getName(), state_file.str());
   plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::Delimiter.getName(), "\n");
 
   std::shared_ptr<core::Processor> log_attr = plan->addProcessor("LogAttribute", "Log", core::Relationship("success", "description"), true);
@@ -486,7 +481,7 @@ TEST_CASE("TailFileWithDelimiterMultipleDelimiters", "[tailfiletest2]") {
   std::string line1(4097, '\n');
   std::mt19937 gen(std::random_device { }());
   std::generate_n(line1.begin(), 4095, [&]() -> char {
-    return 32 + gen() % (127 - 32);
+  return 32 + gen() % (127 - 32);
   });
   std::string line2("foo");
   std::string line3("bar");
@@ -514,11 +509,7 @@ TEST_CASE("TailFileWithDelimiterMultipleDelimiters", "[tailfiletest2]") {
   tmpfile << line1 << "\n" << line2 << "\n" << line3 << "\n" << line4;
   tmpfile.close();
 
-  std::stringstream state_file;
-  state_file << dir << utils::file::FileUtils::get_separator() << STATE_FILE;
-
   plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::FileName.getName(), temp_file.str());
-  plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::StateFile.getName(), state_file.str());
   plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::Delimiter.getName(), "\n");
 
   std::shared_ptr<core::Processor> log_attr = plan->addProcessor("LogAttribute", "Log", core::Relationship("success", "description"), true);
@@ -556,14 +547,13 @@ TEST_CASE("TailWithInvalid", "[tailfiletest2]") {
   auto dir = testController.createTempDirectory(format);
 
   SECTION("No File and No base") {
-  plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::TailMode.getName(), "Multiple file");
+    plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::TailMode.getName(), "Multiple file");
   }
 
   SECTION("No base") {
-  plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::FileName.getName(), "minifi-.*\\.txt");
-  plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::TailMode.getName(), "Multiple file");
+    plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::FileName.getName(), "minifi-.*\\.txt");
+    plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::TailMode.getName(), "Multiple file");
   }
-  plan->setProperty(tailfile, org::apache::nifi::minifi::processors::TailFile::StateFile.getName(), STATE_FILE);
 
   REQUIRE_THROWS(plan->runNextProcessor());
 }
@@ -592,9 +582,6 @@ TEST_CASE("TailFileWithRealDelimiterAndRotate", "[tailfiletest2]") {
 #endif
   in_file.append("testfifo.txt");
 
-  std::string state_file(dir);
-  state_file.append("tailfile.state");
-
   std::ofstream in_file_stream(in_file);
   in_file_stream << NEWLINE_FILE;
   in_file_stream.flush();
@@ -604,16 +591,16 @@ TEST_CASE("TailFileWithRealDelimiterAndRotate", "[tailfiletest2]") {
   plan->setProperty(tail_file, processors::TailFile::Delimiter.getName(), std::string(1, DELIM));
 
   SECTION("single") {
-  plan->setProperty(
-      tail_file,
-      processors::TailFile::FileName.getName(), in_file);
+    plan->setProperty(
+        tail_file,
+        processors::TailFile::FileName.getName(), in_file);
   }
   SECTION("Multiple") {
-  plan->setProperty(tail_file, org::apache::nifi::minifi::processors::TailFile::FileName.getName(), "test.*");
-  plan->setProperty(tail_file, org::apache::nifi::minifi::processors::TailFile::TailMode.getName(), "Multiple file");
-  plan->setProperty(tail_file, org::apache::nifi::minifi::processors::TailFile::BaseDirectory.getName(), dir);
+    plan->setProperty(tail_file, org::apache::nifi::minifi::processors::TailFile::FileName.getName(), "test.*");
+    plan->setProperty(tail_file, org::apache::nifi::minifi::processors::TailFile::TailMode.getName(), "Multiple file");
+    plan->setProperty(tail_file, org::apache::nifi::minifi::processors::TailFile::BaseDirectory.getName(), dir);
   }
-  plan->setProperty(tail_file, processors::TailFile::StateFile.getName(), state_file);
+
   auto log_attr = plan->addProcessor("LogAttribute", "Log", core::Relationship("success", "description"), true);
   plan->setProperty(log_attr, processors::LogAttribute::FlowFilesToLog.getName(), "0");
   plan->setProperty(log_attr, processors::LogAttribute::LogPayload.getName(), "true");
@@ -621,7 +608,7 @@ TEST_CASE("TailFileWithRealDelimiterAndRotate", "[tailfiletest2]") {
 
   plan->runNextProcessor();  // Tail
   plan->runNextProcessor();  // Log
-  std::cout << " find " << expected_pieces << std::endl;
+
   REQUIRE(LogTestController::getInstance().contains(std::string("Logged ") + std::to_string(expected_pieces) + " flow files"));
 
   in_file_stream << DELIM;
@@ -665,9 +652,6 @@ TEST_CASE("TailFileWithMultileRolledOverFiles", "[tailfiletest2]") {
   char format[] = "/tmp/gt.XXXXXX";
   auto dir = testController.createTempDirectory(format);
 
-  std::string state_file(dir);
-  state_file.append("tailfile.state");
-
   // Define test input file
   std::string in_file(dir);
   in_file.append("fruits.txt");
@@ -687,7 +671,6 @@ TEST_CASE("TailFileWithMultileRolledOverFiles", "[tailfiletest2]") {
   auto tail_file = plan->addProcessor("TailFile", "Tail");
   plan->setProperty(tail_file, processors::TailFile::Delimiter.getName(), std::string(1, DELIM));
   plan->setProperty(tail_file, processors::TailFile::FileName.getName(), in_file);
-  plan->setProperty(tail_file, processors::TailFile::StateFile.getName(), state_file);
   auto log_attr = plan->addProcessor("LogAttribute", "Log", core::Relationship("success", "description"), true);
   plan->setProperty(log_attr, processors::LogAttribute::FlowFilesToLog.getName(), "0");
   // Log as many FFs as it can to make sure exactly the expected amount is produced
@@ -711,3 +694,4 @@ TEST_CASE("TailFileWithMultileRolledOverFiles", "[tailfiletest2]") {
 
   REQUIRE(LogTestController::getInstance().contains(std::string("Logged 2 flow files")));
 }
+
diff --git a/extensions/windows-event-log/Bookmark.cpp b/extensions/windows-event-log/Bookmark.cpp
index d727de2..1bbcfe9 100644
--- a/extensions/windows-event-log/Bookmark.cpp
+++ b/extensions/windows-event-log/Bookmark.cpp
@@ -2,6 +2,7 @@
 
 #include <direct.h>
 
+#include "wel/UnicodeConversion.h"
 #include "utils/file/FileUtils.h"
 #include "utils/ScopeGuard.h"
 
@@ -10,16 +11,30 @@ namespace apache {
 namespace nifi {
 namespace minifi {
 namespace processors {
-
-Bookmark::Bookmark(const std::wstring& channel, const std::wstring& query, const std::string& bookmarkRootDir, const std::string& uuid, bool processOldEvents, std::shared_ptr<logging::Logger> logger)
-  :logger_(logger) {
-  if (!createUUIDDir(bookmarkRootDir, uuid, filePath_))
-    return;
-
-  filePath_ += "Bookmark.txt";
-
-  if (!getBookmarkXmlFromFile(bookmarkXml_)) {
-    return;
+static const std::string BOOKMARK_KEY = "bookmark";
+
+Bookmark::Bookmark(const std::wstring& channel, const std::wstring& query, const std::string& bookmarkRootDir, const std::string& uuid, bool processOldEvents, std::shared_ptr<core::CoreComponentStateManager> state_manager, std::shared_ptr<logging::Logger> logger)
+  : logger_(logger)
+  , state_manager_(state_manager) {
+  std::unordered_map<std::string, std::string> state_map;
+  if (state_manager_->get(state_map) && state_map.count(BOOKMARK_KEY) == 1U) {
+    bookmarkXml_ = wel::to_wstring(state_map[BOOKMARK_KEY].c_str());
+  } else if (!bookmarkRootDir.empty()) {
+    filePath_ = utils::file::FileUtils::concat_path(
+      utils::file::FileUtils::concat_path(
+        utils::file::FileUtils::concat_path(bookmarkRootDir, "uuid"), uuid), "Bookmark.txt");
+
+    std::wstring bookmarkXml;
+    if (getBookmarkXmlFromFile(bookmarkXml)) {
+      if (saveBookmarkXml(bookmarkXml) && state_manager_->persist()) {
+        logger_->log_info("State migration successful");
+        rename(filePath_.c_str(), (filePath_ + "-migrated").c_str());
+      } else {
+        logger_->log_warn("Could not migrate state from specified State Directory %s", bookmarkRootDir);
+      }
+    }
+  } else {
+    logger_->log_info("Found no stored state");
   }
 
   if (!bookmarkXml_.empty()) {
@@ -31,9 +46,8 @@ Bookmark::Bookmark(const std::wstring& channel, const std::wstring& query, const
     LOG_LAST_ERROR(EvtCreateBookmark);
 
     bookmarkXml_.clear();
-    if (!createEmptyBookmarkXmlFile()) {
-      return;
-    }
+    state_map.erase(BOOKMARK_KEY);
+    state_manager_->set(state_map);
   }
 
   if (!(hBookmark_ = EvtCreateBookmark(0))) {
@@ -88,6 +102,15 @@ EVT_HANDLE Bookmark::getBookmarkHandleFromXML() {
   return hBookmark_;
 }
 
+bool Bookmark::saveBookmarkXml(const std::wstring& bookmarkXml) {
+  bookmarkXml_ = bookmarkXml;
+
+  std::unordered_map<std::string, std::string> state_map;
+  state_map[BOOKMARK_KEY] = wel::to_string(bookmarkXml_.c_str());
+
+  return state_manager_->set(state_map);
+}
+
 bool Bookmark::saveBookmark(EVT_HANDLE hEvent)
 {
   std::wstring bookmarkXml;
@@ -95,9 +118,7 @@ bool Bookmark::saveBookmark(EVT_HANDLE hEvent)
     return false;
   }
 
-  saveBookmarkXml(bookmarkXml);
-
-  return true;
+  return saveBookmarkXml(bookmarkXml);
 }
 
 bool Bookmark::getNewBookmarkXml(EVT_HANDLE hEvent, std::wstring& bookmarkXml) {
@@ -122,7 +143,7 @@ bool Bookmark::getNewBookmarkXml(EVT_HANDLE hEvent, std::wstring& bookmarkXml) {
         return false;
       }
 
-      bookmarkXml = &buf[0];
+      bookmarkXml = buf.data();
 
       return true;
     }
@@ -135,62 +156,12 @@ bool Bookmark::getNewBookmarkXml(EVT_HANDLE hEvent, std::wstring& bookmarkXml) {
   return false;
 }
 
-void Bookmark::saveBookmarkXml(const std::wstring& bookmarkXml) {
-  bookmarkXml_ = bookmarkXml;
-
-  // Write new bookmark over old and in the end write '!'. Then new bookmark is read until '!'. This is faster than truncate.
-  file_.seekp(std::ios::beg);
-
-  file_ << bookmarkXml << L'!';
-
-  file_.flush();
-}
-
-bool Bookmark::createEmptyBookmarkXmlFile() {
-  if (file_.is_open()) {
-    file_.close();
-  }
-
-  file_.open(filePath_, std::ios::out);
-  if (!file_.is_open()) {
-    logger_->log_error("Cannot open %s", filePath_.c_str());
-    return false;
-  }
-
-  return true;
-}
-
-bool Bookmark::createUUIDDir(const std::string& bookmarkRootDir, const std::string& uuid, std::string& dir)
-{
-  if (bookmarkRootDir.empty()) {
-    dir.clear();
-    return false;
-  }
-
-  auto dirWithBackslash = bookmarkRootDir;
-  if (bookmarkRootDir.back() != '\\') {
-    dirWithBackslash += '\\';
-  }
-  
-  dir = dirWithBackslash + "uuid\\" + uuid + "\\";
-
-  utils::file::FileUtils::create_dir(dir);
-
-  auto dirCreated = utils::file::FileUtils::is_directory(dir.c_str());
-  if (!dirCreated) {
-    logger_->log_error("Cannot create %s", dir.c_str());
-    dir.clear();
-  }
-
-  return dirCreated;
-}
-
 bool Bookmark::getBookmarkXmlFromFile(std::wstring& bookmarkXml) {
   bookmarkXml.clear();
 
   std::wifstream file(filePath_);
   if (!file.is_open()) {
-    return createEmptyBookmarkXmlFile();
+    return false;
   }
 
   // Generically is not efficient, but bookmarkXML is small ~100 bytes. 
@@ -206,13 +177,6 @@ bool Bookmark::getBookmarkXmlFromFile(std::wstring& bookmarkXml) {
 
   file.close();
 
-  file_.open(filePath_);
-  if (!file_.is_open()) {
-    logger_->log_error("Cannot open %s", filePath_.c_str());
-    bookmarkXml.clear();
-    return false;
-  }
-
   if (bookmarkXml.empty()) {
     return true;
   }
@@ -222,7 +186,7 @@ bool Bookmark::getBookmarkXmlFromFile(std::wstring& bookmarkXml) {
   if (std::wstring::npos == pos) {
     logger_->log_error("No '!' in bookmarXml '%ls'", bookmarkXml.c_str());
     bookmarkXml.clear();
-    return createEmptyBookmarkXmlFile();
+    return false;
   }
 
   // Remove '!'.
diff --git a/extensions/windows-event-log/Bookmark.h b/extensions/windows-event-log/Bookmark.h
index 02e47bb..4d40064 100644
--- a/extensions/windows-event-log/Bookmark.h
+++ b/extensions/windows-event-log/Bookmark.h
@@ -19,25 +19,22 @@ namespace processors {
 class Bookmark
 {
 public:
-  Bookmark(const std::wstring& channel, const std::wstring& query, const std::string& bookmarkRootDir, const std::string& uuid, bool processOldEvents, std::shared_ptr<logging::Logger> logger);
+  Bookmark(const std::wstring& channel, const std::wstring& query, const std::string& bookmarkRootDir, const std::string& uuid, bool processOldEvents, std::shared_ptr<core::CoreComponentStateManager> state_manager, std::shared_ptr<logging::Logger> logger);
   ~Bookmark();
   operator bool() const;
   EVT_HANDLE getBookmarkHandleFromXML();
   bool getNewBookmarkXml(EVT_HANDLE hEvent, std::wstring& bookmarkXml);
-  void saveBookmarkXml(const std::wstring& bookmarkXml);
+  bool saveBookmarkXml(const std::wstring& bookmarkXml);
 private:
   bool saveBookmark(EVT_HANDLE hEvent);
-  bool createEmptyBookmarkXmlFile();
-  bool createUUIDDir(const std::string& bookmarkRootDir, const std::string& uuid, std::string& dir);
-  std::string filePath(const std::string& uuid);
   bool getBookmarkXmlFromFile(std::wstring& bookmarkXml);
 
 private:
   std::shared_ptr<logging::Logger> logger_;
+  std::shared_ptr<core::CoreComponentStateManager> state_manager_;
   std::string filePath_;
   bool ok_{};
   EVT_HANDLE hBookmark_{};
-  std::wfstream file_;
   std::wstring bookmarkXml_;
 };
 
diff --git a/extensions/windows-event-log/ConsumeWindowsEventLog.cpp b/extensions/windows-event-log/ConsumeWindowsEventLog.cpp
index c7f41c7..85686c0 100644
--- a/extensions/windows-event-log/ConsumeWindowsEventLog.cpp
+++ b/extensions/windows-event-log/ConsumeWindowsEventLog.cpp
@@ -146,7 +146,7 @@ core::Property ConsumeWindowsEventLog::BookmarkRootDirectory(
   core::PropertyBuilder::createProperty("State Directory")->
   isRequired(false)->
   withDefaultValue("CWELState")->
-  withDescription("Directory which contains processor state data.")->
+  withDescription("DEPRECATED. Only use it for state migration from the state file, supplying the legacy state directory.")->
   build());
 
 core::Property ConsumeWindowsEventLog::ProcessOldEvents(
@@ -201,6 +201,11 @@ bool ConsumeWindowsEventLog::insertHeaderName(wel::METADATA_NAMES &header, const
 }
 
 void ConsumeWindowsEventLog::onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) {
+  state_manager_ = context->getStateManager();
+  if (state_manager_ == nullptr) {
+    throw Exception(PROCESSOR_EXCEPTION, "Failed to get StateManager");
+  }
+
   context->getProperty(IdentifierMatcher.getName(), regex_);
   context->getProperty(ResolveAsAttributes.getName(), resolve_as_attributes_);
   context->getProperty(IdentifierFunction.getName(), apply_identifier_function_);
@@ -263,7 +268,7 @@ void ConsumeWindowsEventLog::onSchedule(const std::shared_ptr<core::ProcessConte
       logger_->log_error("State Directory is empty");
       return;
     }
-    pBookmark_ = std::make_unique<Bookmark>(wstrChannel_, wstrQuery_, bookmarkDir, getUUIDStr(), processOldEvents, logger_);
+    pBookmark_ = std::make_unique<Bookmark>(wstrChannel_, wstrQuery_, bookmarkDir, getUUIDStr(), processOldEvents, state_manager_, logger_);
     if (!*pBookmark_) {
       pBookmark_.reset();
       return;
@@ -591,10 +596,6 @@ void ConsumeWindowsEventLog::putEventRenderFlowFileToSession(const EventRender&
   }
 }
 
-void ConsumeWindowsEventLog::notifyStop()
-{
-}
-
 void ConsumeWindowsEventLog::LogWindowsError()
 {
   auto error_id = GetLastError();
diff --git a/extensions/windows-event-log/ConsumeWindowsEventLog.h b/extensions/windows-event-log/ConsumeWindowsEventLog.h
index 02ce868..c01eb89 100644
--- a/extensions/windows-event-log/ConsumeWindowsEventLog.h
+++ b/extensions/windows-event-log/ConsumeWindowsEventLog.h
@@ -97,7 +97,6 @@ public:
   virtual void onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) override;
   //! Initialize, overwrite by NiFi ConsumeWindowsEventLog
   virtual void initialize(void) override;
-  virtual void notifyStop() override;
   
 
 protected:
@@ -114,12 +113,13 @@ protected:
 
 private:
   // Logger
+  std::shared_ptr<logging::Logger> logger_;
+  std::shared_ptr<core::CoreComponentStateManager> state_manager_;
   wel::METADATA_NAMES header_names_;
   std::string header_delimiter_;
   std::string channel_;
   std::wstring wstrChannel_;
   std::wstring wstrQuery_;
-  std::shared_ptr<logging::Logger> logger_;
   std::string regex_;
   bool resolve_as_attributes_;
   bool apply_identifier_function_;
diff --git a/extensions/windows-event-log/wel/UnicodeConversion.h b/extensions/windows-event-log/wel/UnicodeConversion.h
index 01c48bc..90f5200 100644
--- a/extensions/windows-event-log/wel/UnicodeConversion.h
+++ b/extensions/windows-event-log/wel/UnicodeConversion.h
@@ -34,6 +34,11 @@ namespace org {
             ATL::CW2A aString(pChar, CP_UTF8);
             return std::string(aString);
           }
+
+          inline std::wstring to_wstring(const char* pChar) {
+            ATL::CA2W wString(pChar, CP_UTF8);
+            return std::wstring(wString);
+          }
         } /* namespace wel */
       } /* namespace minifi */
     } /* namespace nifi */
diff --git a/libminifi/CMakeLists.txt b/libminifi/CMakeLists.txt
index 9eaaf58..4a2c296 100644
--- a/libminifi/CMakeLists.txt
+++ b/libminifi/CMakeLists.txt
@@ -89,7 +89,7 @@ if (NOT OPENSSL_OFF)
 	set(TLS_SOURCES "src/io/tls/*.cpp")
 endif()
 
-file(GLOB SOURCES  "src/utils/file/*.cpp" "src/sitetosite/*.cpp"  "src/core/logging/*.cpp"  "src/core/state/*.cpp" "src/core/state/nodes/*.cpp" "src/c2/protocols/*.cpp" "src/c2/triggers/*.cpp" "src/c2/*.cpp" "src/io/*.cpp" ${SOCKET_SOURCES} ${TLS_SOURCES} "src/core/controller/*.cpp" "src/controllers/*.cpp" "src/core/*.cpp"  "src/core/repository/*.cpp" "src/core/yaml/*.cpp" "src/core/reporting/*.cpp"  "src/provenance/*.cpp" "src/utils/*.cpp" "src/*.cpp")
+file(GLOB SOURCES  "src/utils/file/*.cpp" "src/sitetosite/*.cpp"  "src/core/logging/*.cpp"  "src/core/state/*.cpp" "src/core/state/nodes/*.cpp" "src/c2/protocols/*.cpp" "src/c2/triggers/*.cpp" "src/c2/*.cpp" "src/io/*.cpp" ${SOCKET_SOURCES} ${TLS_SOURCES} "src/core/controller/*.cpp" "src/controllers/*.cpp" "src/controllers/keyvalue/*.cpp" "src/core/*.cpp"  "src/core/repository/*.cpp" "src/core/yaml/*.cpp" "src/core/reporting/*.cpp"  "src/provenance/*.cpp" "src/utils/*.cpp" "src/*.cpp")
 
 if(WIN32)
 	include(FindMessageCompiler)
diff --git a/libminifi/include/FlowController.h b/libminifi/include/FlowController.h
index 5efac83..0ee0e01 100644
--- a/libminifi/include/FlowController.h
+++ b/libminifi/include/FlowController.h
@@ -219,6 +219,11 @@ class FlowController : public core::controller::ControllerServiceProvider, publi
   virtual std::future<utils::TaskRescheduleInfo> disableControllerService(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode);
 
   /**
+   * Removes all controller services.
+   */
+  virtual void clearControllerServices();
+
+  /**
    * Gets all controller services.
    */
   virtual std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> getAllControllerServices();
@@ -286,6 +291,11 @@ class FlowController : public core::controller::ControllerServiceProvider, publi
   virtual void enableAllControllerServices();
 
   /**
+   * Disables all controller services for the provider.
+   */
+  virtual void disableAllControllerServices();
+
+  /**
    * Retrieves metrics node
    * @return metrics response node
    */
diff --git a/libminifi/include/controllers/keyvalue/AbstractAutoPersistingKeyValueStoreService.h b/libminifi/include/controllers/keyvalue/AbstractAutoPersistingKeyValueStoreService.h
new file mode 100644
index 0000000..e632503
--- /dev/null
+++ b/libminifi/include/controllers/keyvalue/AbstractAutoPersistingKeyValueStoreService.h
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_KEYVALUE_AbstractAutoPersistingKeyValueStoreService_H_
+#define LIBMINIFI_INCLUDE_KEYVALUE_AbstractAutoPersistingKeyValueStoreService_H_
+
+#include "PersistableKeyValueStoreService.h"
+#include "core/Core.h"
+#include "properties/Configure.h"
+#include "core/logging/Logger.h"
+#include "core/logging/LoggerConfiguration.h"
+#include "core/Resource.h"
+
+#include <string>
+#include <thread>
+#include <mutex>
+#include <memory>
+#include <utility>
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace controllers {
+
+class AbstractAutoPersistingKeyValueStoreService : public PersistableKeyValueStoreService {
+ public:
+  explicit AbstractAutoPersistingKeyValueStoreService(const std::string& name, const std::string& id);
+  explicit AbstractAutoPersistingKeyValueStoreService(const std::string& name, utils::Identifier uuid = utils::Identifier());
+
+  virtual ~AbstractAutoPersistingKeyValueStoreService();
+
+  static core::Property AlwaysPersist;
+  static core::Property AutoPersistenceInterval;
+
+  virtual void initialize() override;
+  virtual void onEnable() override;
+  virtual void notifyStop() override;
+
+ protected:
+  bool always_persist_;
+  uint64_t auto_persistence_interval_;
+
+  std::thread persisting_thread_;
+  bool running_;
+  std::mutex persisting_mutex_;
+  std::condition_variable persisting_cv_;
+  void persistingThreadFunc();
+
+ private:
+  std::shared_ptr<logging::Logger> logger_;
+
+  void stopPersistingThread();
+};
+
+} /* namespace controllers */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif /* LIBMINIFI_INCLUDE_KEYVALUE_AbstractAutoPersistingKeyValueStoreService_H_ */
diff --git a/libminifi/include/controllers/keyvalue/AbstractCoreComponentStateManagerProvider.h b/libminifi/include/controllers/keyvalue/AbstractCoreComponentStateManagerProvider.h
new file mode 100644
index 0000000..cddd152
--- /dev/null
+++ b/libminifi/include/controllers/keyvalue/AbstractCoreComponentStateManagerProvider.h
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_KEYVALUE_AbstractCoreComponentStateManagerProvider_H_
+#define LIBMINIFI_INCLUDE_KEYVALUE_AbstractCoreComponentStateManagerProvider_H_
+
+#include "core/Core.h"
+#include "core/CoreComponentState.h"
+
+#include <memory>
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace controllers {
+
+ class AbstractCoreComponentStateManagerProvider : public std::enable_shared_from_this<AbstractCoreComponentStateManagerProvider>,
+                                                   public core::CoreComponentStateManagerProvider {
+ public:
+  virtual ~AbstractCoreComponentStateManagerProvider();
+
+  virtual std::shared_ptr<core::CoreComponentStateManager> getCoreComponentStateManager(const std::string& uuid) override;
+
+  virtual std::unordered_map<std::string, std::unordered_map<std::string, std::string>> getAllCoreComponentStates() override;
+
+  class AbstractCoreComponentStateManager : public core::CoreComponentStateManager{
+   private:
+    std::shared_ptr<AbstractCoreComponentStateManagerProvider> provider_;
+    std::string id_;
+    bool state_valid_;
+    std::unordered_map<std::string, std::string> state_;
+
+   public:
+    AbstractCoreComponentStateManager(std::shared_ptr<AbstractCoreComponentStateManagerProvider> provider, const std::string& id);
+
+    virtual bool set(const std::unordered_map<std::string, std::string>& kvs) override;
+
+    virtual bool get(std::unordered_map<std::string, std::string>& kvs) override;
+
+    virtual bool clear() override;
+
+    virtual bool persist() override;
+ };
+
+ protected:
+  virtual bool setImpl(const std::string& key, const std::string& value) = 0;
+  virtual bool getImpl(const std::string& key, std::string& value) = 0;
+  virtual bool getImpl(std::unordered_map<std::string, std::string>& kvs) = 0;
+  virtual bool removeImpl(const std::string& key) = 0;
+  virtual bool persistImpl() = 0;
+
+  virtual std::string serialize(const std::unordered_map<std::string, std::string>& kvs);
+  virtual bool deserialize(const std::string& serialized, std::unordered_map<std::string, std::string>& kvs);
+};
+
+} /* namespace controllers */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif /* LIBMINIFI_INCLUDE_KEYVALUE_AbstractCoreComponentStateManagerProvider_H_ */
diff --git a/libminifi/include/controllers/keyvalue/KeyValueStoreService.h b/libminifi/include/controllers/keyvalue/KeyValueStoreService.h
new file mode 100644
index 0000000..5594435
--- /dev/null
+++ b/libminifi/include/controllers/keyvalue/KeyValueStoreService.h
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_KEYVALUE_KeyValueStoreService_H_
+#define LIBMINIFI_INCLUDE_KEYVALUE_KeyValueStoreService_H_
+
+#include "core/Core.h"
+#include "properties/Configure.h"
+#include "core/controller/ControllerService.h"
+
+#include <unordered_map>
+#include <unordered_set>
+#include <string>
+#include <cstdint>
+#include <functional>
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace controllers {
+
+class KeyValueStoreService : public core::controller::ControllerService {
+ public:
+  explicit KeyValueStoreService(const std::string& name, const std::string& id);
+  explicit KeyValueStoreService(const std::string& name, utils::Identifier uuid = utils::Identifier());
+
+  virtual ~KeyValueStoreService();
+
+  virtual void yield() override;
+  virtual bool isRunning() override;
+  virtual bool isWorkAvailable() override;
+
+  virtual bool set(const std::string& key, const std::string& value) = 0;
+
+  virtual bool get(const std::string& key, std::string& value) = 0;
+
+  virtual bool get(std::unordered_map<std::string, std::string>& kvs) = 0;
+
+  virtual bool remove(const std::string& key) = 0;
+
+  virtual bool clear() = 0;
+
+  virtual bool update(const std::string& key, const std::function<bool(bool /*exists*/, std::string& /*value*/)>& update_func) = 0;
+};
+
+} /* namespace controllers */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif /* LIBMINIFI_INCLUDE_KEYVALUE_KeyValueStoreService_H_ */
diff --git a/libminifi/include/controllers/keyvalue/PersistableKeyValueStoreService.h b/libminifi/include/controllers/keyvalue/PersistableKeyValueStoreService.h
new file mode 100644
index 0000000..c9bda4f
--- /dev/null
+++ b/libminifi/include/controllers/keyvalue/PersistableKeyValueStoreService.h
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef LIBMINIFI_INCLUDE_KEYVALUE_PersistableKeyValueStoreService_H_
+#define LIBMINIFI_INCLUDE_KEYVALUE_PersistableKeyValueStoreService_H_
+
+#include "KeyValueStoreService.h"
+#include "AbstractCoreComponentStateManagerProvider.h"
+#include "core/Core.h"
+#include "properties/Configure.h"
+
+#include <string>
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace controllers {
+
+class PersistableKeyValueStoreService : virtual public KeyValueStoreService, public AbstractCoreComponentStateManagerProvider {
+ public:
+  explicit PersistableKeyValueStoreService(const std::string& name, const std::string& id);
+  explicit PersistableKeyValueStoreService(const std::string& name, utils::Identifier uuid = utils::Identifier());
+
+  virtual ~PersistableKeyValueStoreService();
+
+  virtual bool persist() = 0;
+
+ protected:
+  virtual bool setImpl(const std::string& key, const std::string& value) override;
+  virtual bool getImpl(const std::string& key, std::string& value) override;
+  virtual bool getImpl(std::unordered_map<std::string, std::string>& kvs) override;
+  virtual bool removeImpl(const std::string& key) override;
+  virtual bool persistImpl() override;
+};
+
+} /* namespace controllers */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif /* LIBMINIFI_INCLUDE_KEYVALUE_PersistableKeyValueStoreService_H_ */
diff --git a/libminifi/include/core/ConfigurableComponent.h b/libminifi/include/core/ConfigurableComponent.h
index 4905001..e3d6d62 100644
--- a/libminifi/include/core/ConfigurableComponent.h
+++ b/libminifi/include/core/ConfigurableComponent.h
@@ -98,11 +98,13 @@ class ConfigurableComponent {
    * @return result of set operation.
    */
   bool setSupportedProperties(std::set<Property> properties);
+
   /**
-   * Sets supported properties for the ConfigurableComponent
-   * @param supported properties
-   * @return result of set operation.
+   * Updates the supported properties for the ConfigurableComponent
+   * @param new supported properties
+   * @return result of update operation.
    */
+  bool updateSupportedProperties(std::set<Property> properties);
 
   /**
    * Gets whether or not this processor supports dynamic properties.
diff --git a/libminifi/include/core/CoreComponentState.h b/libminifi/include/core/CoreComponentState.h
new file mode 100644
index 0000000..21c7613
--- /dev/null
+++ b/libminifi/include/core/CoreComponentState.h
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef LIBMINIFI_INCLUDE_CORE_CoreComponentState_H_
+#define LIBMINIFI_INCLUDE_CORE_CoreComponentState_H_
+
+#include "Core.h"
+
+#include <cstdint>
+#include <memory>
+#include <unordered_map>
+#include <string>
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace core {
+
+class CoreComponentStateManager {
+ public:
+  virtual ~CoreComponentStateManager() {
+  }
+
+  virtual bool set(const std::unordered_map<std::string, std::string>& kvs) = 0;
+
+  virtual bool get(std::unordered_map<std::string, std::string>& kvs) = 0;
+
+  virtual bool clear() = 0;
+
+  virtual bool persist() = 0;
+};
+
+class CoreComponentStateManagerProvider {
+ public:
+  virtual ~CoreComponentStateManagerProvider() = default;
+
+  virtual std::shared_ptr<CoreComponentStateManager> getCoreComponentStateManager(const std::string& uuid) = 0;
+
+  virtual std::shared_ptr<CoreComponentStateManager> getCoreComponentStateManager(const CoreComponent& component) {
+    return getCoreComponentStateManager(component.getUUIDStr());
+  }
+
+  virtual std::unordered_map<std::string, std::unordered_map<std::string, std::string>> getAllCoreComponentStates() = 0;
+};
+
+} /* namespace core */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
+
+#endif /* LIBMINIFI_INCLUDE_CORE_CoreComponentState_H_ */
diff --git a/libminifi/include/core/ProcessContext.h b/libminifi/include/core/ProcessContext.h
index db44db9..a6be32d 100644
--- a/libminifi/include/core/ProcessContext.h
+++ b/libminifi/include/core/ProcessContext.h
@@ -32,9 +32,12 @@
 #include "core/controller/ControllerServiceProvider.h"
 #include "core/controller/ControllerServiceLookup.h"
 #include "core/logging/LoggerConfiguration.h"
+#include "controllers/keyvalue/AbstractAutoPersistingKeyValueStoreService.h"
 #include "ProcessorNode.h"
 #include "core/Repository.h"
 #include "core/FlowFile.h"
+#include "core/CoreComponentState.h"
+#include "utils/file/FileUtils.h"
 #include "VariableRegistry.h"
 
 namespace org {
@@ -62,6 +65,7 @@ class ProcessContext : public controller::ControllerServiceLookup, public core::
         configure_(std::make_shared<minifi::Configure>()),
         initialized_(false) {
     repo_ = repo;
+    state_manager_provider_ = getStateManagerProvider(logger_, controller_service_provider_, nullptr);
   }
 
   // Constructor
@@ -79,6 +83,7 @@ class ProcessContext : public controller::ControllerServiceLookup, public core::
         logger_(logging::LoggerFactory<ProcessContext>::getLogger()),
         initialized_(false) {
     repo_ = repo;
+    state_manager_provider_ = getStateManagerProvider(logger_, controller_service_provider_, configuration);
   }
   // Destructor
   virtual ~ProcessContext() {
@@ -208,6 +213,112 @@ class ProcessContext : public controller::ControllerServiceLookup, public core::
   bool isInitialized() const {
       return initialized_;
   }
+
+  static constexpr char const* DefaultStateManagerProviderName = "defaultstatemanagerprovider";
+
+  std::shared_ptr<CoreComponentStateManager> getStateManager() {
+    if (state_manager_provider_ == nullptr) {
+      return nullptr;
+    }
+    return state_manager_provider_->getCoreComponentStateManager(*processor_node_);
+  }
+
+  static std::shared_ptr<core::CoreComponentStateManagerProvider> getOrCreateDefaultStateManagerProvider(
+      std::shared_ptr<controller::ControllerServiceProvider> controller_service_provider,
+      std::shared_ptr<minifi::Configure> configuration,
+      const char *base_path = "") {
+    static std::mutex mutex;
+    std::lock_guard<std::mutex> lock(mutex);
+
+    /* See if we have already created a default provider */
+    std::shared_ptr<core::controller::ControllerServiceNode> node = controller_service_provider->getControllerServiceNode(DefaultStateManagerProviderName);
+    if (node != nullptr) {
+      return std::dynamic_pointer_cast<core::CoreComponentStateManagerProvider>(node->getControllerServiceImplementation());
+    }
+
+    /* Try to get configuration options for default provider */
+    std::string always_persist, auto_persistence_interval;
+    configuration->get(Configure::nifi_state_management_provider_local_always_persist, always_persist);
+    configuration->get(Configure::nifi_state_management_provider_local_auto_persistence_interval, auto_persistence_interval);
+
+    /* Function to help creating a provider */
+    auto create_provider = [&](
+        const std::string& type,
+        const std::string& longType,
+        const std::unordered_map<std::string, std::string>& extraProperties) -> std::shared_ptr<core::CoreComponentStateManagerProvider> {
+      node = controller_service_provider->createControllerService(type, longType, DefaultStateManagerProviderName, true /*firstTimeAdded*/);
+      if (node == nullptr) {
+        return nullptr;
+      }
+      node->initialize();
+      auto provider = node->getControllerServiceImplementation();
+      if (provider == nullptr) {
+        return nullptr;
+      }
+      if (!always_persist.empty() && !provider->setProperty(
+          controllers::AbstractAutoPersistingKeyValueStoreService::AlwaysPersist.getName(), always_persist)) {
+        return nullptr;
+      }
+      if (!auto_persistence_interval.empty() && !provider->setProperty(
+          controllers::AbstractAutoPersistingKeyValueStoreService::AutoPersistenceInterval.getName(), auto_persistence_interval)) {
+        return nullptr;
+      }
+      for (const auto& extraProperty : extraProperties) {
+        if (!provider->setProperty(extraProperty.first, extraProperty.second)) {
+          return nullptr;
+        }
+      }
+      if (!node->enable()) {
+        return nullptr;
+      }
+      return std::dynamic_pointer_cast<core::CoreComponentStateManagerProvider>(provider);
+    };
+
+    /* Try to create a RocksDB-backed provider */
+    auto provider = create_provider("RocksDbPersistableKeyValueStoreService",
+        "org.apache.nifi.minifi.controllers.RocksDbPersistableKeyValueStoreService",
+        {{"Directory", utils::file::FileUtils::concat_path(base_path, "corecomponentstate")}});
+    if (provider != nullptr) {
+      return provider;
+    }
+
+    /* Fall back to a locked unordered map-backed provider */
+    provider = create_provider("UnorderedMapPersistableKeyValueStoreService",
+        "org.apache.nifi.minifi.controllers.UnorderedMapPersistableKeyValueStoreService",
+        {{"File", utils::file::FileUtils::concat_path(base_path, "corecomponentstate.txt")}});
+    if (provider != nullptr) {
+      return provider;
+    }
+
+    /* Give up */
+    return nullptr;
+  }
+
+  static std::shared_ptr<core::CoreComponentStateManagerProvider> getStateManagerProvider(
+      std::shared_ptr<logging::Logger> logger,
+      std::shared_ptr<controller::ControllerServiceProvider> controller_service_provider,
+      std::shared_ptr<minifi::Configure> configuration) {
+    if (controller_service_provider == nullptr) {
+      return nullptr;
+    }
+    std::string id;
+    if (configuration != nullptr && configuration->get(minifi::Configure::nifi_state_management_provider_local, id)) {
+      auto node = controller_service_provider->getControllerServiceNode(id);
+      if (node == nullptr) {
+        logger->log_error("Failed to find the CoreComponentStateManagerProvider %s defined by %s", id, minifi::Configure::nifi_state_management_provider_local);
+        return nullptr;
+      } else {
+        return std::dynamic_pointer_cast<core::CoreComponentStateManagerProvider>(node->getControllerServiceImplementation());
+      }
+    } else {
+      auto state_manager_provider = getOrCreateDefaultStateManagerProvider(controller_service_provider, configuration);
+      if (state_manager_provider == nullptr) {
+        logger->log_error("Failed to create default CoreComponentStateManagerProvider");
+      }
+      return state_manager_provider;
+    }
+  }
+
  private:
 
   template<typename T>
@@ -217,6 +328,8 @@ class ProcessContext : public controller::ControllerServiceLookup, public core::
 
   // controller service provider.
   std::shared_ptr<controller::ControllerServiceProvider> controller_service_provider_;
+  // state manager provider
+  std::shared_ptr<core::CoreComponentStateManagerProvider> state_manager_provider_;
   // repository shared pointer.
   std::shared_ptr<core::Repository> repo_;
   std::shared_ptr<core::Repository> flow_repo_;
diff --git a/libminifi/include/core/controller/ControllerService.h b/libminifi/include/core/controller/ControllerService.h
index 5231592..d42d6f5 100644
--- a/libminifi/include/core/controller/ControllerService.h
+++ b/libminifi/include/core/controller/ControllerService.h
@@ -137,6 +137,9 @@ class ControllerService : public ConfigurableComponent, public Connectable {
 
   void setState(ControllerServiceState state) {
     current_state_ = state;
+    if (state == DISABLED) {
+      notifyStop();
+    }
   }
 
   virtual bool supportsDynamicProperties() {
diff --git a/libminifi/include/core/controller/ControllerServiceProvider.h b/libminifi/include/core/controller/ControllerServiceProvider.h
index 6a147e1..75f8b23 100644
--- a/libminifi/include/core/controller/ControllerServiceProvider.h
+++ b/libminifi/include/core/controller/ControllerServiceProvider.h
@@ -113,6 +113,11 @@ class ControllerServiceProvider : public CoreComponent, public ConfigurableCompo
   virtual std::future<utils::TaskRescheduleInfo> disableControllerService(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) = 0;
 
   /**
+   * Removes all controller services.
+   */
+  virtual void clearControllerServices() = 0;
+
+  /**
    * Gets a list of all controller services.
    */
   virtual std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> getAllControllerServices() {
@@ -215,6 +220,8 @@ class ControllerServiceProvider : public CoreComponent, public ConfigurableCompo
 
   virtual void enableAllControllerServices() = 0;
 
+  virtual void disableAllControllerServices() = 0;
+
   virtual bool supportsDynamicProperties() {
     return false;
   }
diff --git a/libminifi/include/core/controller/StandardControllerServiceProvider.h b/libminifi/include/core/controller/StandardControllerServiceProvider.h
index 6ce6651..8dc6f1b 100644
--- a/libminifi/include/core/controller/StandardControllerServiceProvider.h
+++ b/libminifi/include/core/controller/StandardControllerServiceProvider.h
@@ -116,7 +116,6 @@ class StandardControllerServiceProvider : public ControllerServiceProvider, publ
   virtual void enableAllControllerServices() {
     logger_->log_info("Enabling %u controller services", controller_map_->getAllControllerServices().size());
     for (auto service : controller_map_->getAllControllerServices()) {
-
       if (service->canEnable()) {
         logger_->log_info("Enabling %s", service->getName());
         agent_->enableControllerService(service);
@@ -126,6 +125,15 @@ class StandardControllerServiceProvider : public ControllerServiceProvider, publ
     }
   }
 
+  virtual void disableAllControllerServices() {
+    logger_->log_info("Disabling %u controller services", controller_map_->getAllControllerServices().size());
+    for (auto service : controller_map_->getAllControllerServices()) {
+      if (!service->disable()) {
+        logger_->log_warn("Could not disable %s", service->getName());
+      }
+    }
+  }
+
   void enableControllerServices(std::vector<std::shared_ptr<ControllerServiceNode>> serviceNodes) {
     for (auto node : serviceNodes) {
       enableControllerService(node);
@@ -141,6 +149,10 @@ class StandardControllerServiceProvider : public ControllerServiceProvider, publ
     }
   }
 
+  void clearControllerServices() {
+    controller_map_->clear();
+  }
+
   void verifyCanStopReferencingComponents(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode) {
   }
 
diff --git a/libminifi/include/properties/Configure.h b/libminifi/include/properties/Configure.h
index 824d386..99adea5 100644
--- a/libminifi/include/properties/Configure.h
+++ b/libminifi/include/properties/Configure.h
@@ -100,6 +100,11 @@ class Configure : public Properties {
   static const char *nifi_c2_flow_base_url;
   static const char *nifi_c2_full_heartbeat;
 
+  // state management options
+  static const char *nifi_state_management_provider_local;
+  static const char *nifi_state_management_provider_local_always_persist;
+  static const char *nifi_state_management_provider_local_auto_persistence_interval;
+
  private:
   std::string agent_identifier_;
   std::string agent_class_;
diff --git a/libminifi/src/Configure.cpp b/libminifi/src/Configure.cpp
index fbb0fa9..04f585a 100644
--- a/libminifi/src/Configure.cpp
+++ b/libminifi/src/Configure.cpp
@@ -63,6 +63,9 @@ const char *Configure::nifi_c2_flow_id = "nifi.c2.flow.id";
 const char *Configure::nifi_c2_flow_url = "nifi.c2.flow.url";
 const char *Configure::nifi_c2_flow_base_url = "nifi.c2.flow.base.url";
 const char *Configure::nifi_c2_full_heartbeat = "nifi.c2.full.heartbeat";
+const char *Configure::nifi_state_management_provider_local = "nifi.state.management.provider.local";
+const char *Configure::nifi_state_management_provider_local_always_persist = "nifi.state.management.provider.local.always.persist";
+const char *Configure::nifi_state_management_provider_local_auto_persistence_interval = "nifi.state.management.provider.local.auto.persistence.interval";
 
 } /* namespace minifi */
 } /* namespace nifi */
diff --git a/libminifi/src/FlowController.cpp b/libminifi/src/FlowController.cpp
index 350b64d..1cff30e 100644
--- a/libminifi/src/FlowController.cpp
+++ b/libminifi/src/FlowController.cpp
@@ -248,6 +248,8 @@ int16_t FlowController::stop(bool force, uint64_t timeToWait) {
     this->timer_scheduler_->stop();
     this->event_scheduler_->stop();
     this->cron_scheduler_->stop();
+    // stop the ControllerServices
+    this->controller_service_provider_->disableAllControllerServices();
     thread_pool_.shutdown();
     running_ = false;
   }
@@ -793,6 +795,13 @@ std::future<utils::TaskRescheduleInfo> FlowController::disableControllerService(
 }
 
 /**
+ * Removes all controller services.
+ */
+void FlowController::clearControllerServices() {
+  controller_service_provider_->clearControllerServices();
+}
+
+/**
  * Gets all controller services.
  */
 std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> FlowController::getAllControllerServices() {
@@ -888,6 +897,13 @@ void FlowController::enableAllControllerServices() {
   controller_service_provider_->enableAllControllerServices();
 }
 
+/**
+ * Disables all controller services for the provider.
+ */
+void FlowController::disableAllControllerServices() {
+  controller_service_provider_->disableAllControllerServices();
+}
+
 int16_t FlowController::applyUpdate(const std::string &source, const std::string &configuration) {
   if (applyConfiguration(source, configuration)) {
     return 1;
diff --git a/libminifi/src/c2/C2Agent.cpp b/libminifi/src/c2/C2Agent.cpp
index 6670a73..b71b8f7 100644
--- a/libminifi/src/c2/C2Agent.cpp
+++ b/libminifi/src/c2/C2Agent.cpp
@@ -25,6 +25,8 @@
 #include <string>
 #include <memory>
 #include "c2/ControllerSocketProtocol.h"
+#include "core/ProcessContext.h"
+#include "core/CoreComponentState.h"
 #include "core/state/UpdateController.h"
 #include "core/logging/Logger.h"
 #include "core/logging/LoggerConfiguration.h"
@@ -431,6 +433,28 @@ void C2Agent::handle_c2_server_response(const C2ContentResponse &resp) {
         update_sink_->drainRepositories();
         C2Payload response(Operation::ACKNOWLEDGE, resp.ident, false, true);
         enqueue_c2_response(std::move(response));
+      } else if (resp.name == "corecomponentstate") {
+        // TODO(bakaid): untested
+        std::vector<std::shared_ptr<state::StateController>> components = update_sink_->getComponents(resp.name);
+        auto state_manager_provider = core::ProcessContext::getStateManagerProvider(logger_, controller_, configuration_);
+        if (state_manager_provider != nullptr) {
+          for (auto &component : components) {
+            logger_->log_debug("Clearing state for component %s", component->getComponentName());
+            auto state_manager = state_manager_provider->getCoreComponentStateManager(component->getComponentUUID());
+            if (state_manager != nullptr) {
+              component->stop(true);
+              state_manager->clear();
+              state_manager->persist();
+              component->start();
+            } else {
+              logger_->log_warn("Failed to get StateManager for component %s", component->getComponentUUID());
+            }
+          }
+        } else {
+          logger_->log_error("Failed to get StateManagerProvider");
+        }
+        C2Payload response(Operation::ACKNOWLEDGE, resp.ident, false, true);
+        enqueue_c2_response(std::move(response));
       } else {
         logger_->log_debug("Clearing unknown %s", resp.name);
       }
@@ -565,7 +589,30 @@ void C2Agent::handle_describe(const C2ContentResponse &resp) {
         response.addPayload(std::move(options));
       }
       enqueue_c2_response(std::move(response));
+      return;
     }
+  } else if (resp.name == "corecomponentstate") {
+    C2Payload response(Operation::ACKNOWLEDGE, resp.ident, false, true);
+    response.setLabel("corecomponentstate");
+    C2Payload states(Operation::ACKNOWLEDGE, resp.ident, false, true);
+    states.setLabel("corecomponentstate");
+    auto state_manager_provider = core::ProcessContext::getStateManagerProvider(logger_, controller_, configuration_);
+    if (state_manager_provider != nullptr) {
+      auto core_component_states = state_manager_provider->getAllCoreComponentStates();
+      for (const auto& core_component_state : core_component_states) {
+        C2Payload state(Operation::ACKNOWLEDGE, resp.ident, false, true);
+        state.setLabel(core_component_state.first);
+        for (const auto& kv : core_component_state.second) {
+          C2ContentResponse entry(Operation::ACKNOWLEDGE);
+          entry.name = kv.first;
+          entry.operation_arguments[kv.first] = kv.second;
+          state.addContent(std::move(entry));
+        }
+        states.addPayload(std::move(state));
+      }
+    }
+    response.addPayload(std::move(states));
+    enqueue_c2_response(std::move(response));
     return;
   }
   C2Payload response(Operation::ACKNOWLEDGE, resp.ident, false, true);
diff --git a/libminifi/src/controllers/keyvalue/AbstractAutoPersistingKeyValueStoreService.cpp b/libminifi/src/controllers/keyvalue/AbstractAutoPersistingKeyValueStoreService.cpp
new file mode 100644
index 0000000..6487eab
--- /dev/null
+++ b/libminifi/src/controllers/keyvalue/AbstractAutoPersistingKeyValueStoreService.cpp
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "controllers/keyvalue/AbstractAutoPersistingKeyValueStoreService.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace controllers {
+
+core::Property AbstractAutoPersistingKeyValueStoreService::AlwaysPersist(
+    core::PropertyBuilder::createProperty("Always Persist")->withDescription("Persist every change instead of persisting it periodically.")
+        ->isRequired(false)->withDefaultValue<bool>(false)->build());
+core::Property AbstractAutoPersistingKeyValueStoreService::AutoPersistenceInterval(
+    core::PropertyBuilder::createProperty("Auto Persistence Interval")->withDescription("The interval of the periodic task persisting all values. "
+                                                                                        "Only used if Always Persist is false. "
+                                                                                        "If set to 0 seconds, auto persistence will be disabled.")
+        ->isRequired(false)->withDefaultValue<core::TimePeriodValue>("1 min")->build());
+
+AbstractAutoPersistingKeyValueStoreService::AbstractAutoPersistingKeyValueStoreService(const std::string& name, const std::string& id)
+    : KeyValueStoreService(name, id)
+    , PersistableKeyValueStoreService(name, id)
+    , always_persist_(false)
+    , auto_persistence_interval_(0U)
+    , running_(false)
+    , logger_(logging::LoggerFactory<AbstractAutoPersistingKeyValueStoreService>::getLogger()) {
+}
+
+AbstractAutoPersistingKeyValueStoreService::AbstractAutoPersistingKeyValueStoreService(const std::string& name, utils::Identifier uuid /*= utils::Identifier()*/)
+    : KeyValueStoreService(name, uuid)
+    , PersistableKeyValueStoreService(name, uuid)
+    , always_persist_(false)
+    , auto_persistence_interval_(0U)
+    , running_(false)
+    , logger_(logging::LoggerFactory<AbstractAutoPersistingKeyValueStoreService>::getLogger()) {
+}
+
+AbstractAutoPersistingKeyValueStoreService::~AbstractAutoPersistingKeyValueStoreService() {
+  stopPersistingThread();
+}
+
+void AbstractAutoPersistingKeyValueStoreService::stopPersistingThread() {
+  std::unique_lock<std::mutex> lock(persisting_mutex_);
+  if (persisting_thread_.joinable()) {
+    running_ = false;
+    persisting_cv_.notify_one();
+    lock.unlock();
+    persisting_thread_.join();
+  }
+}
+
+void AbstractAutoPersistingKeyValueStoreService::initialize() {
+  ControllerService::initialize();
+  std::set<core::Property> supportedProperties;
+  supportedProperties.insert(AlwaysPersist);
+  supportedProperties.insert(AutoPersistenceInterval);
+  updateSupportedProperties(supportedProperties);
+}
+
+void AbstractAutoPersistingKeyValueStoreService::onEnable() {
+  std::unique_lock<std::mutex> lock(persisting_mutex_);
+
+  if (configuration_ == nullptr) {
+    logger_->log_debug("Cannot enable AbstractAutoPersistingKeyValueStoreService");
+    return;
+  }
+
+  std::string value;
+  if (!getProperty(AlwaysPersist.getName(), value)) {
+    logger_->log_error("Always Persist attribute is missing or invalid");
+  } else {
+    utils::StringUtils::StringToBool(value, always_persist_);
+  }
+  if (!getProperty(AutoPersistenceInterval.getName(), value)) {
+    logger_->log_error("Auto Persistence Interval attribute is missing or invalid");
+  } else {
+    core::TimeUnit unit;
+    if (!core::Property::StringToTime(value, auto_persistence_interval_, unit) || !core::Property::ConvertTimeUnitToMS(auto_persistence_interval_, unit, auto_persistence_interval_)) {
+      logger_->log_error("Auto Persistence Interval attribute is invalid");
+    }
+  }
+
+  if (!always_persist_ && auto_persistence_interval_ != 0U) {
+    if (!persisting_thread_.joinable()) {
+      logger_->log_trace("Starting auto persistence thread");
+      running_ = true;
+      persisting_thread_ = std::thread(&AbstractAutoPersistingKeyValueStoreService::persistingThreadFunc, this);
+    }
+  }
+
+  logger_->log_trace("Enabled AbstractAutoPersistingKeyValueStoreService");
+}
+
+void AbstractAutoPersistingKeyValueStoreService::notifyStop() {
+  stopPersistingThread();
+}
+
+void AbstractAutoPersistingKeyValueStoreService::persistingThreadFunc() {
+  std::unique_lock<std::mutex> lock(persisting_mutex_);
+
+  while (true) {
+    logger_->log_trace("Persisting thread is going to sleep for %d ms", auto_persistence_interval_);
+    persisting_cv_.wait_for(lock, std::chrono::milliseconds(auto_persistence_interval_), [this] {
+      return !running_;
+    });
+
+    if (!running_) {
+      logger_->log_trace("Stopping persistence thread");
+      return;
+    }
+
+    persist();
+  }
+}
+
+} /* namespace controllers */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
diff --git a/libminifi/src/controllers/keyvalue/AbstractCoreComponentStateManagerProvider.cpp b/libminifi/src/controllers/keyvalue/AbstractCoreComponentStateManagerProvider.cpp
new file mode 100644
index 0000000..8d817d5
--- /dev/null
+++ b/libminifi/src/controllers/keyvalue/AbstractCoreComponentStateManagerProvider.cpp
@@ -0,0 +1,143 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "controllers/keyvalue/AbstractCoreComponentStateManagerProvider.h"
+
+#include "rapidjson/rapidjson.h"
+#include "rapidjson/document.h"
+#include "rapidjson/stringbuffer.h"
+#include "rapidjson/writer.h"
+
+#include <memory>
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace controllers {
+
+AbstractCoreComponentStateManagerProvider::AbstractCoreComponentStateManager::AbstractCoreComponentStateManager(
+    std::shared_ptr<AbstractCoreComponentStateManagerProvider> provider,
+    const std::string& id)
+    : provider_(std::move(provider))
+    , id_(id)
+    , state_valid_(false) {
+  std::string serialized;
+  if (provider_->getImpl(id_, serialized) && provider_->deserialize(serialized, state_)) {
+    state_valid_ = true;
+  }
+}
+
+bool AbstractCoreComponentStateManagerProvider::AbstractCoreComponentStateManager::set(const std::unordered_map<std::string, std::string>& kvs) {
+  if (provider_->setImpl(id_, provider_->serialize(kvs))) {
+    state_valid_ = true;
+    state_ = kvs;
+    return true;
+  } else {
+    return false;
+  }
+}
+
+bool AbstractCoreComponentStateManagerProvider::AbstractCoreComponentStateManager::get(std::unordered_map<std::string, std::string>& kvs) {
+  if (!state_valid_) {
+    return false;
+  }
+  kvs = state_;
+  return true;
+}
+
+bool AbstractCoreComponentStateManagerProvider::AbstractCoreComponentStateManager::clear() {
+  if (!state_valid_) {
+    return false;
+  }
+  if (provider_->removeImpl(id_)) {
+    state_valid_ = false;
+    state_.clear();
+    return true;
+  } else {
+    return false;
+  }
+}
+
+bool AbstractCoreComponentStateManagerProvider::AbstractCoreComponentStateManager::persist() {
+  if (!state_valid_) {
+    return false;
+  }
+  return provider_->persistImpl();
+}
+
+AbstractCoreComponentStateManagerProvider::~AbstractCoreComponentStateManagerProvider() = default;
+
+std::shared_ptr<core::CoreComponentStateManager> AbstractCoreComponentStateManagerProvider::getCoreComponentStateManager(const std::string& uuid) {
+  return std::make_shared<AbstractCoreComponentStateManager>(shared_from_this(), uuid);
+}
+
+std::unordered_map<std::string, std::unordered_map<std::string, std::string>> AbstractCoreComponentStateManagerProvider::getAllCoreComponentStates() {
+  std::unordered_map<std::string, std::string> all_serialized;
+  if (!getImpl(all_serialized)) {
+    return {};
+  }
+
+  std::unordered_map<std::string, std::unordered_map<std::string, std::string>> all_deserialized;
+  for (const auto& serialized : all_serialized) {
+    std::unordered_map<std::string, std::string> deserialized;
+    if (deserialize(serialized.second, deserialized)) {
+      all_deserialized.emplace(serialized.first, std::move(deserialized));
+    }
+  }
+
+  return all_deserialized;
+}
+
+std::string AbstractCoreComponentStateManagerProvider::serialize(const std::unordered_map<std::string, std::string>& kvs) {
+  rapidjson::Document doc(rapidjson::kObjectType);
+  rapidjson::Document::AllocatorType &alloc = doc.GetAllocator();
+  for (const auto& kv : kvs) {
+    doc.AddMember(rapidjson::StringRef(kv.first.c_str(), kv.first.size()), rapidjson::StringRef(kv.second.c_str(), kv.second.size()), alloc);
+  }
+
+  rapidjson::StringBuffer buffer;
+  rapidjson::Writer<rapidjson::StringBuffer> writer(buffer);
+  doc.Accept(writer);
+
+  return buffer.GetString();
+}
+
+bool AbstractCoreComponentStateManagerProvider::deserialize(const std::string& serialized, std::unordered_map<std::string, std::string>& kvs) {
+  rapidjson::StringStream stream(serialized.c_str());
+  rapidjson::Document doc;
+  rapidjson::ParseResult res = doc.ParseStream(stream);
+  if (!res) {
+    return false;
+  }
+  if (!doc.IsObject()) {
+    return false;
+  }
+
+  kvs.clear();
+  for (const auto& kv : doc.GetObject()) {
+    kvs[kv.name.GetString()] = kv.value.GetString();
+  }
+
+  return true;
+}
+
+} /* namespace controllers */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
diff --git a/extensions/windows-event-log/wel/UnicodeConversion.h b/libminifi/src/controllers/keyvalue/KeyValueStoreService.cpp
similarity index 51%
copy from extensions/windows-event-log/wel/UnicodeConversion.h
copy to libminifi/src/controllers/keyvalue/KeyValueStoreService.cpp
index 01c48bc..a7176ce 100644
--- a/extensions/windows-event-log/wel/UnicodeConversion.h
+++ b/libminifi/src/controllers/keyvalue/KeyValueStoreService.cpp
@@ -1,7 +1,4 @@
 /**
- * @file UnicodeConversion.h
- * Unicode conversion functions
- *
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -18,24 +15,39 @@
  * limitations under the License.
  */
 
-#pragma once
+#include "controllers/keyvalue/KeyValueStoreService.h"
 
-#include <string>
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace controllers {
 
-#include <atlbase.h>
-#include <atlconv.h>
+KeyValueStoreService::KeyValueStoreService(const std::string& name, const std::string& id)
+    : ControllerService(name, id) {
+}
 
-namespace org {
-  namespace apache {
-    namespace nifi {
-      namespace minifi {
-        namespace wel {
-          inline std::string to_string(const wchar_t* pChar) {
-            ATL::CW2A aString(pChar, CP_UTF8);
-            return std::string(aString);
-          }
-        } /* namespace wel */
-      } /* namespace minifi */
-    } /* namespace nifi */
-  } /* namespace apache */
+KeyValueStoreService::KeyValueStoreService(const std::string& name, utils::Identifier uuid /*= utils::Identifier()*/)
+    : ControllerService(name, uuid) {
+}
+
+KeyValueStoreService::~KeyValueStoreService() {
+}
+
+void KeyValueStoreService::yield() {
+}
+
+bool KeyValueStoreService::isRunning() {
+  return getState() == core::controller::ControllerServiceState::ENABLED;
+}
+
+bool KeyValueStoreService::isWorkAvailable() {
+  return false;
+}
+
+
+} /* namespace controllers */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
 } /* namespace org */
diff --git a/libminifi/src/controllers/keyvalue/PersistableKeyValueStoreService.cpp b/libminifi/src/controllers/keyvalue/PersistableKeyValueStoreService.cpp
new file mode 100644
index 0000000..0a1c933
--- /dev/null
+++ b/libminifi/src/controllers/keyvalue/PersistableKeyValueStoreService.cpp
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "controllers/keyvalue/PersistableKeyValueStoreService.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace controllers {
+PersistableKeyValueStoreService::PersistableKeyValueStoreService(const std::string& name, const std::string& id)
+    : KeyValueStoreService(name, id) {
+}
+
+PersistableKeyValueStoreService::PersistableKeyValueStoreService(const std::string& name, utils::Identifier uuid /*= utils::Identifier()*/)
+    : KeyValueStoreService(name, uuid) {
+}
+
+PersistableKeyValueStoreService::~PersistableKeyValueStoreService() {
+}
+
+bool PersistableKeyValueStoreService::setImpl(const std::string& key, const std::string& value) {
+  return set(key, value);
+}
+
+bool PersistableKeyValueStoreService::getImpl(const std::string& key, std::string& value) {
+  return get(key, value);
+}
+
+bool PersistableKeyValueStoreService::getImpl(std::unordered_map<std::string, std::string>& kvs) {
+  return get(kvs);
+}
+
+bool PersistableKeyValueStoreService::removeImpl(const std::string& key) {
+  return remove(key);
+}
+
+bool PersistableKeyValueStoreService::persistImpl() {
+  return persist();
+}
+
+} /* namespace controllers */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
diff --git a/libminifi/src/core/ConfigurableComponent.cpp b/libminifi/src/core/ConfigurableComponent.cpp
index 2ef6f0b..bdc92c4 100644
--- a/libminifi/src/core/ConfigurableComponent.cpp
+++ b/libminifi/src/core/ConfigurableComponent.cpp
@@ -195,6 +195,19 @@ bool ConfigurableComponent::setSupportedProperties(std::set<Property> properties
   return true;
 }
 
+bool ConfigurableComponent::updateSupportedProperties(std::set<Property> properties) {
+  if (!canEdit()) {
+    return false;
+  }
+
+  std::lock_guard<std::mutex> lock(configuration_mutex_);
+
+  for (auto item : properties) {
+    properties_[item.getName()] = item;
+  }
+  return true;
+}
+
 bool ConfigurableComponent::getDynamicProperty(const std::string name, std::string &value) const {
   std::lock_guard<std::mutex> lock(configuration_mutex_);
 
diff --git a/libminifi/src/core/FlowConfiguration.cpp b/libminifi/src/core/FlowConfiguration.cpp
index dafa6c5..b1e9cea 100644
--- a/libminifi/src/core/FlowConfiguration.cpp
+++ b/libminifi/src/core/FlowConfiguration.cpp
@@ -36,6 +36,10 @@ static_initializers &get_static_functions() {
 }
 
 FlowConfiguration::~FlowConfiguration() {
+  if (service_provider_ != nullptr) {
+    /* This is needed to counteract the StandardControllerServiceProvider <-> StandardControllerServiceNode shared_ptr cycle */
+    service_provider_->clearControllerServices();
+  }
 }
 
 std::shared_ptr<core::Processor> FlowConfiguration::createProcessor(std::string name, utils::Identifier & uuid) {
diff --git a/libminifi/test/TestBase.cpp b/libminifi/test/TestBase.cpp
index 52cf39a..51dbad5 100644
--- a/libminifi/test/TestBase.cpp
+++ b/libminifi/test/TestBase.cpp
@@ -34,7 +34,7 @@ void LogTestController::setLevel(const std::string name, spdlog::level::level_en
 }
 
 TestPlan::TestPlan(std::shared_ptr<core::ContentRepository> content_repo, std::shared_ptr<core::Repository> flow_repo, std::shared_ptr<core::Repository> prov_repo,
-                   const std::shared_ptr<minifi::state::response::FlowVersion> &flow_version, const std::shared_ptr<minifi::Configure> &configuration)
+                   const std::shared_ptr<minifi::state::response::FlowVersion> &flow_version, const std::shared_ptr<minifi::Configure> &configuration, const char* state_dir)
     : configuration_(configuration),
       content_repo_(content_repo),
       flow_repo_(flow_repo),
@@ -45,6 +45,23 @@ TestPlan::TestPlan(std::shared_ptr<core::ContentRepository> content_repo, std::s
       flow_version_(flow_version),
       logger_(logging::LoggerFactory<TestPlan>::getLogger()) {
   stream_factory = org::apache::nifi::minifi::io::StreamFactory::getInstance(std::make_shared<minifi::Configure>());
+  controller_services_ = std::make_shared<core::controller::ControllerServiceMap>();
+  controller_services_provider_ = std::make_shared<core::controller::StandardControllerServiceProvider>(controller_services_, nullptr, configuration_);
+  /* Inject the default state provider ahead of ProcessContext to make sure we have a unique state directory */
+  if (state_dir == nullptr) {
+    char state_dir_name_template[] = "/var/tmp/teststate.XXXXXX";
+    state_dir_ = utils::file::FileUtils::create_temp_directory(state_dir_name_template);
+  } else {
+    state_dir_ = state_dir;
+  }
+  state_manager_provider_ = core::ProcessContext::getOrCreateDefaultStateManagerProvider(controller_services_provider_, configuration_, state_dir_.c_str());
+}
+
+TestPlan::~TestPlan() {
+  for (auto& processor : configured_processors_) {
+    processor->setScheduledState(core::ScheduledState::STOPPED);
+  }
+  controller_services_provider_->clearControllerServices();
 }
 
 std::shared_ptr<core::Processor> TestPlan::addProcessor(const std::shared_ptr<core::Processor> &processor, const std::string &name, const std::initializer_list<core::Relationship>& relationships,
@@ -151,6 +168,31 @@ std::shared_ptr<core::Processor> TestPlan::addProcessor(const std::string &proce
   return addProcessor(processor_name, uuid, name, relationships, linkToPrevious);
 }
 
+std::shared_ptr<core::controller::ControllerServiceNode> TestPlan::addController(const std::string &controller_name, const std::string &name) {
+  if (finalized) {
+    return nullptr;
+  }
+  std::lock_guard<std::recursive_mutex> guard(mutex);
+
+  utils::Identifier uuid;
+
+  utils::IdGenerator::getIdGenerator()->generate(uuid);
+
+  std::shared_ptr<core::controller::ControllerServiceNode> controller_service_node =
+      controller_services_provider_->createControllerService(controller_name, controller_name, name, true /*firstTimeAdded*/);
+  if (controller_service_node == nullptr) {
+    return nullptr;
+  }
+
+  controller_service_nodes_.push_back(controller_service_node);
+
+  controller_service_node->initialize();
+  controller_service_node->setUUID(uuid);
+  controller_service_node->setName(name);
+
+  return controller_service_node;
+}
+
 bool TestPlan::setProperty(const std::shared_ptr<core::Processor> proc, const std::string &prop, const std::string &value, bool dynamic) {
   std::lock_guard<std::recursive_mutex> guard(mutex);
   int32_t i = 0;
@@ -172,6 +214,16 @@ bool TestPlan::setProperty(const std::shared_ptr<core::Processor> proc, const st
   }
 }
 
+bool TestPlan::setProperty(const std::shared_ptr<core::controller::ControllerServiceNode> controller_service_node, const std::string &prop, const std::string &value, bool dynamic /*= false*/) {
+  if (dynamic) {
+    controller_service_node->setDynamicProperty(prop, value);
+    return controller_service_node->getControllerServiceImplementation()->setDynamicProperty(prop, value);
+  } else {
+    controller_service_node->setProperty(prop, value);
+    return controller_service_node->getControllerServiceImplementation()->setProperty(prop, value);
+  }
+}
+
 void TestPlan::reset(bool reschedule) {
   std::lock_guard<std::recursive_mutex> guard(mutex);
   process_sessions_.clear();
@@ -283,6 +335,10 @@ void TestPlan::finalize() {
     }
   }
 
+  for (auto& controller_service_node : controller_service_nodes_) {
+    controller_service_node->enable();
+  }
+
   finalized = true;
 }
 
diff --git a/libminifi/test/TestBase.h b/libminifi/test/TestBase.h
index 73d1430..6a7fe95 100644
--- a/libminifi/test/TestBase.h
+++ b/libminifi/test/TestBase.h
@@ -44,6 +44,7 @@
 #include "core/ProcessContextBuilder.h"
 #include "core/ProcessSession.h"
 #include "core/ProcessorNode.h"
+#include "core/controller/ControllerServiceNode.h"
 #include "core/reporting/SiteToSiteProvenanceReportingTask.h"
 #include "core/state/nodes/FlowInformation.h"
 #include "properties/Configure.h"
@@ -182,7 +183,6 @@ class LogTestController {
     TestBootstrapLogger(std::shared_ptr<spdlog::logger> logger)
         : Logger(logger) {
     }
-    ;
   };
   LogTestController()
       : LogTestController(nullptr) {
@@ -228,7 +228,9 @@ class TestPlan {
  public:
 
   explicit TestPlan(std::shared_ptr<core::ContentRepository> content_repo, std::shared_ptr<core::Repository> flow_repo, std::shared_ptr<core::Repository> prov_repo,
-                    const std::shared_ptr<minifi::state::response::FlowVersion> &flow_version, const std::shared_ptr<minifi::Configure> &configuration);
+                    const std::shared_ptr<minifi::state::response::FlowVersion> &flow_version, const std::shared_ptr<minifi::Configure> &configuration, const char* state_dir);
+
+  virtual ~TestPlan();
 
   std::shared_ptr<core::Processor> addProcessor(const std::shared_ptr<core::Processor> &processor, const std::string &name,
                                                 core::Relationship relationship = core::Relationship("success", "description"), bool linkToPrevious = false) {
@@ -249,8 +251,12 @@ class TestPlan {
   std::shared_ptr<core::Processor> addProcessor(const std::string &processor_name, utils::Identifier& uuid, const std::string &name, const std::initializer_list<core::Relationship>& relationships,
                                                 bool linkToPrevious = false);
 
+  std::shared_ptr<core::controller::ControllerServiceNode> addController(const std::string &controller_name, const std::string &name);
+
   bool setProperty(const std::shared_ptr<core::Processor> proc, const std::string &prop, const std::string &value, bool dynamic = false);
 
+  bool setProperty(const std::shared_ptr<core::controller::ControllerServiceNode> controller_service_node, const std::string &prop, const std::string &value, bool dynamic = false);
+
   void reset(bool reschedule = false);
 
   bool runNextProcessor(std::function<void(const std::shared_ptr<core::ProcessContext>, const std::shared_ptr<core::ProcessSession>)> verify = nullptr);
@@ -277,10 +283,18 @@ class TestPlan {
     return logger_;
   }
 
- protected:
+  std::string getStateDir() {
+    return state_dir_;
+  }
+
+  std::shared_ptr<core::CoreComponentStateManagerProvider> getStateManagerProvider() {
+    return state_manager_provider_;
+  }
 
   void finalize();
 
+ protected:
+
   std::shared_ptr<minifi::Connection> buildFinalConnection(std::shared_ptr<core::Processor> processor, bool setDest = false);
 
   std::shared_ptr<org::apache::nifi::minifi::io::StreamFactory> stream_factory;
@@ -292,8 +306,13 @@ class TestPlan {
   std::shared_ptr<core::Repository> flow_repo_;
   std::shared_ptr<core::Repository> prov_repo_;
 
+  std::shared_ptr<core::controller::ControllerServiceMap> controller_services_;
   std::shared_ptr<core::controller::ControllerServiceProvider> controller_services_provider_;
 
+  std::shared_ptr<core::CoreComponentStateManagerProvider> state_manager_provider_;
+
+  std::string state_dir_;
+
   std::recursive_mutex mutex;
 
   std::atomic<bool> finalized;
@@ -303,6 +322,7 @@ class TestPlan {
   std::shared_ptr<core::FlowFile> current_flowfile_;
 
   std::shared_ptr<minifi::state::response::FlowVersion> flow_version_;
+  std::vector<std::shared_ptr<core::controller::ControllerServiceNode>> controller_service_nodes_;
   std::map<std::string, std::shared_ptr<core::Processor>> processor_mapping_;
   std::vector<std::shared_ptr<core::Processor>> processor_queue_;
   std::vector<std::shared_ptr<core::Processor>> configured_processors_;
@@ -330,7 +350,7 @@ class TestController {
     flow_version_ = std::make_shared<minifi::state::response::FlowVersion>("test", "test", "test");
   }
 
-  std::shared_ptr<TestPlan> createPlan(std::shared_ptr<minifi::Configure> configuration = nullptr) {
+  std::shared_ptr<TestPlan> createPlan(std::shared_ptr<minifi::Configure> configuration = nullptr, const char* state_dir = nullptr) {
     if (configuration == nullptr) {
       configuration = std::make_shared<minifi::Configure>();
     }
@@ -340,7 +360,7 @@ class TestController {
 
     std::shared_ptr<core::Repository> flow_repo = std::make_shared<TestRepository>();
     std::shared_ptr<core::Repository> repo = std::make_shared<TestRepository>();
-    return std::make_shared<TestPlan>(content_repo, flow_repo, repo, flow_version_, configuration);
+    return std::make_shared<TestPlan>(content_repo, flow_repo, repo, flow_version_, configuration, state_dir);
   }
 
   void runSession(std::shared_ptr<TestPlan> &plan, bool runToCompletion = true, std::function<void(const std::shared_ptr<core::ProcessContext>&, const std::shared_ptr<core::ProcessSession>&)> verify =
diff --git a/libminifi/test/integration/IntegrationBase.h b/libminifi/test/integration/IntegrationBase.h
index 91bafe8..3310b97 100644
--- a/libminifi/test/integration/IntegrationBase.h
+++ b/libminifi/test/integration/IntegrationBase.h
@@ -89,6 +89,7 @@ class IntegrationBase {
   uint64_t wait_time_;
   std::string port, scheme, path;
   std::string key_dir;
+  std::string state_dir;
 };
 
 IntegrationBase::IntegrationBase(uint64_t waitTime)
@@ -122,6 +123,11 @@ void IntegrationBase::run(std::string test_file_location) {
 
   core::YamlConfiguration yaml_config(test_repo, test_repo, content_repo, stream_factory, configuration, test_file_location);
 
+  auto controller_service_provider = yaml_ptr->getControllerServiceProvider();
+  char state_dir_name_template[] = "/var/tmp/integrationstate.XXXXXX";
+  state_dir = utils::file::FileUtils::create_temp_directory(state_dir_name_template);
+  core::ProcessContext::getOrCreateDefaultStateManagerProvider(controller_service_provider, configuration, state_dir.c_str());
+
   std::unique_ptr<core::ProcessGroup> ptr = yaml_config.getRoot(test_file_location);
   std::shared_ptr<core::ProcessGroup> pg = std::shared_ptr<core::ProcessGroup>(ptr.get());
 
diff --git a/libminifi/test/keyvalue-tests/CMakeLists.txt b/libminifi/test/keyvalue-tests/CMakeLists.txt
new file mode 100644
index 0000000..dfafebe
--- /dev/null
+++ b/libminifi/test/keyvalue-tests/CMakeLists.txt
@@ -0,0 +1,42 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+file(GLOB KEYVALUE_TESTS  "*.cpp")
+
+SET(KEYVALUE_INT_TEST_COUNT 0)
+
+FOREACH(testfile ${KEYVALUE_TESTS})
+    get_filename_component(testfilename "${testfile}" NAME_WE)
+    add_executable("${testfilename}" "${testfile}" )
+    target_include_directories(${testfilename} PRIVATE BEFORE "${CMAKE_SOURCE_DIR}/libminifi/test/")
+    createTests("${testfilename}")
+    target_wholearchive_library(${testfilename} minifi-standard-processors)
+    if (NOT DISABLE_ROCKSDB)
+        target_wholearchive_library(${testfilename} minifi-rocksdb-repos)
+    endif()
+
+    MATH(EXPR KEYVALUE_INT_TEST_COUNT "${KEYVALUE_INT_TEST_COUNT}+1")
+ENDFOREACH()
+
+message("-- Finished building ${KEYVALUE_INT_TEST_COUNT} keyvalue controller test file(s)...")
+
+add_test(NAME UnorderedMapPersistableKeyValueStoreServiceTest COMMAND PersistableKeyValueStoreServiceTest --config-yaml "${TEST_RESOURCES}/UnorderedMapPersistableKeyValueStoreServiceTest.yml")
+if (NOT DISABLE_ROCKSDB)
+    add_test(NAME RocksdDbPersistableKeyValueStoreServiceTest COMMAND PersistableKeyValueStoreServiceTest --config-yaml "${TEST_RESOURCES}/RocksDbPersistableKeyValueStoreServiceTest.yml")
+endif()
diff --git a/libminifi/test/keyvalue-tests/PersistableKeyValueStoreServiceTest.cpp b/libminifi/test/keyvalue-tests/PersistableKeyValueStoreServiceTest.cpp
new file mode 100644
index 0000000..a30de53
--- /dev/null
+++ b/libminifi/test/keyvalue-tests/PersistableKeyValueStoreServiceTest.cpp
@@ -0,0 +1,254 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#define CATCH_CONFIG_RUNNER
+#include "catch.hpp"
+
+#include <vector>
+#include <memory>
+#include <utility>
+#include <string>
+#include <array>
+#include "../TestBase.h"
+#include "../../controller/Controller.h"
+#include "core/controller/ControllerService.h"
+#include "core/ProcessGroup.h"
+#include "core/yaml/YamlConfiguration.h"
+#include "controllers/keyvalue/PersistableKeyValueStoreService.h"
+#include "controllers/keyvalue/AbstractAutoPersistingKeyValueStoreService.h"
+
+static std::string config_yaml; // NOLINT
+
+static inline void configYamlHandler(Catch::ConfigData&, const std::string& path) {
+  config_yaml = path;
+}
+
+int main(int argc, char* argv[]) {
+  Catch::Session session;
+
+  Catch::Clara::CommandLine<Catch::ConfigData>& cli = const_cast<Catch::Clara::CommandLine<Catch::ConfigData>&>(session.cli());
+  cli["--config-yaml"]
+      .describe("path to the config.yaml containing the PersistableKeyValueStoreService controller service configuration")
+      .bind(&configYamlHandler, "path");
+
+  int ret = session.applyCommandLine(argc, argv);
+  if (ret != 0) {
+    return ret;
+  }
+
+  if (config_yaml.empty()) {
+    std::cerr << "Missing --config-yaml <path>. It must contain the path to the config.yaml containing the PersistableKeyValueStoreService controller service configuration." << std::endl;
+    return -1;
+  }
+
+  return session.run();
+}
+
+class PersistableKeyValueStoreServiceTestsFixture {
+ public:
+  PersistableKeyValueStoreServiceTestsFixture() {
+    LogTestController::getInstance().setTrace<TestPlan>();
+    LogTestController::getInstance().setTrace<minifi::controllers::PersistableKeyValueStoreService>();
+    LogTestController::getInstance().setTrace<minifi::controllers::AbstractAutoPersistingKeyValueStoreService>();
+
+    // Create temporary directories
+    char format[] = "/var/tmp/state.XXXXXX";
+    state_dir = testController.createTempDirectory(format);
+    REQUIRE(false == state_dir.empty());
+    REQUIRE(0 == chdir(state_dir.c_str()));
+
+    loadYaml();
+  }
+
+  virtual ~PersistableKeyValueStoreServiceTestsFixture() {
+    LogTestController::getInstance().reset();
+  }
+
+  void loadYaml() {
+    controller.reset();
+    persistable_key_value_store_service_node.reset();
+
+    process_group.reset();
+    yaml_config.reset();
+
+    stream_factory.reset();
+    content_repo.reset();
+    test_flow_repo.reset();
+    test_repo.reset();
+    configuration.reset();
+
+    configuration = std::make_shared<minifi::Configure>();
+    test_repo = std::make_shared<TestRepository>();
+    test_flow_repo = std::make_shared<TestFlowRepository>();
+
+    configuration->set(minifi::Configure::nifi_flow_configuration_file, config_yaml);
+
+    content_repo = std::make_shared<core::repository::VolatileContentRepository>();
+    content_repo->initialize(configuration);
+    stream_factory = minifi::io::StreamFactory::getInstance(configuration);
+
+    yaml_config = std::unique_ptr<core::YamlConfiguration>(new core::YamlConfiguration(test_repo, test_repo, content_repo, stream_factory, configuration, config_yaml));
+
+    process_group = yaml_config->getRoot(config_yaml);
+    persistable_key_value_store_service_node = process_group->findControllerService("testcontroller");
+    REQUIRE(persistable_key_value_store_service_node != nullptr);
+    persistable_key_value_store_service_node->enable();
+
+    controller = std::dynamic_pointer_cast<minifi::controllers::PersistableKeyValueStoreService>(
+        persistable_key_value_store_service_node->getControllerServiceImplementation());
+    REQUIRE(controller != nullptr);
+  }
+
+ protected:
+  std::string state_dir;
+  std::shared_ptr<minifi::Configure> configuration;
+  std::shared_ptr<core::Repository> test_repo;
+  std::shared_ptr<core::Repository> test_flow_repo;
+  std::shared_ptr<core::ContentRepository> content_repo;
+  std::shared_ptr<minifi::io::StreamFactory> stream_factory;
+
+  std::unique_ptr<core::YamlConfiguration> yaml_config;
+  std::unique_ptr<core::ProcessGroup> process_group;
+
+  std::shared_ptr<core::controller::ControllerServiceNode> persistable_key_value_store_service_node;
+  std::shared_ptr<minifi::controllers::PersistableKeyValueStoreService> controller;
+
+  TestController testController;
+};
+
+
+TEST_CASE_METHOD(PersistableKeyValueStoreServiceTestsFixture, "PersistableKeyValueStoreServiceTestsFixture set and get", "[basic]") {
+  const char* key = "foobar";
+  const char* value = "234";
+  REQUIRE(true == controller->set(key, value));
+
+  SECTION("without persistence") {
+  }
+  SECTION("with persistence") {
+    controller->persist();
+    loadYaml();
+  }
+
+  std::string res;
+  REQUIRE(true == controller->get(key, res));
+  REQUIRE(value == res);
+}
+
+TEST_CASE_METHOD(PersistableKeyValueStoreServiceTestsFixture, "PersistableKeyValueStoreServiceTestsFixture special characters", "[basic]") {
+  const char* key = "[]{}()==\\=\n\n";
+  const char* value = ":./'\\=!\n=[]{}()";
+  REQUIRE(true == controller->set(key, value));
+
+  SECTION("without persistence") {
+  }
+  SECTION("with persistence") {
+    controller->persist();
+    loadYaml();
+  }
+
+  std::string res;
+  REQUIRE(true == controller->get(key, res));
+  REQUIRE(value == res);
+}
+
+TEST_CASE_METHOD(PersistableKeyValueStoreServiceTestsFixture, "PersistableKeyValueStoreServiceTestsFixture set and get all", "[basic]") {
+  const std::unordered_map<std::string, std::string> kvs = {
+      {"foobar", "234"},
+      {"buzz", "value"},
+  };
+  for (const auto& kv : kvs) {
+    REQUIRE(true == controller->set(kv.first, kv.second));
+  }
+
+  SECTION("without persistence") {
+  }
+  SECTION("with persistence") {
+    controller->persist();
+    loadYaml();
+  }
+
+  std::unordered_map<std::string, std::string> kvs_res;
+  REQUIRE(true == controller->get(kvs_res));
+  REQUIRE(kvs == kvs_res);
+}
+
+TEST_CASE_METHOD(PersistableKeyValueStoreServiceTestsFixture, "PersistableKeyValueStoreServiceTestsFixture set and overwrite", "[basic]") {
+  const char* key = "foobar";
+  const char* value = "234";
+  const char* new_value = "baz";
+  REQUIRE(true == controller->set(key, value));
+  REQUIRE(true == controller->set(key, new_value));
+
+  SECTION("without persistence") {
+  }
+  SECTION("with persistence") {
+    controller->persist();
+    loadYaml();
+  }
+
+  std::string res;
+  REQUIRE(true == controller->get(key, res));
+  REQUIRE(new_value == res);
+}
+
+TEST_CASE_METHOD(PersistableKeyValueStoreServiceTestsFixture, "PersistableKeyValueStoreServiceTestsFixture set and remove", "[basic]") {
+  const char* key = "foobar";
+  const char* value = "234";
+  REQUIRE(true == controller->set(key, value));
+  REQUIRE(true == controller->remove(key));
+
+  SECTION("without persistence") {
+  }
+  SECTION("with persistence") {
+    controller->persist();
+    loadYaml();
+  }
+
+  std::string res;
+  REQUIRE(false == controller->get(key, res));
+}
+
+
+TEST_CASE_METHOD(PersistableKeyValueStoreServiceTestsFixture, "PersistableKeyValueStoreServiceTestsFixture set and clear", "[basic]") {
+  const std::unordered_map<std::string, std::string> kvs = {
+      {"foobar", "234"},
+      {"buzz", "value"},
+  };
+  for (const auto& kv : kvs) {
+    REQUIRE(true == controller->set(kv.first, kv.second));
+  }
+  REQUIRE(true == controller->clear());
+
+  SECTION("without persistence") {
+  }
+  SECTION("with persistence") {
+    controller->persist();
+    loadYaml();
+  }
+
+  std::unordered_map<std::string, std::string> kvs_res;
+  REQUIRE(0U == kvs_res.size());
+
+  /* Make sure we can still insert after we cleared */
+  const char* key = "foo";
+  const char* value = "bar";
+  REQUIRE(true == controller->set(key, value));
+  std::string res;
+  REQUIRE(true == controller->get(key, res));
+  REQUIRE(value == res);
+}
diff --git a/extensions/rocksdb-repos/CMakeLists.txt b/libminifi/test/resources/RocksDbPersistableKeyValueStoreServiceTest.yml
similarity index 63%
copy from extensions/rocksdb-repos/CMakeLists.txt
copy to libminifi/test/resources/RocksDbPersistableKeyValueStoreServiceTest.yml
index 3c5395e..5819063 100644
--- a/extensions/rocksdb-repos/CMakeLists.txt
+++ b/libminifi/test/resources/RocksDbPersistableKeyValueStoreServiceTest.yml
@@ -16,16 +16,21 @@
 # specific language governing permissions and limitations
 # under the License.
 #
+Flow Controller:
+    name: MiNiFi Flow
 
-include(${CMAKE_SOURCE_DIR}/extensions/ExtensionHeader.txt) 
+Processors: []
 
-file(GLOB SOURCES  "*.cpp")
+Connections: []
 
-add_library(minifi-rocksdb-repos STATIC ${SOURCES})
-set_property(TARGET minifi-rocksdb-repos PROPERTY POSITION_INDEPENDENT_CODE ON)
+Controller Services:
+    - name: testcontroller
+      id: 2438e3c8-015a-1000-79ca-83af40ec1994
+      class: RocksDbPersistableKeyValueStoreService
+      Properties:
+        Auto Persistence Interval:
+            - value: 0 sec
+        Directory:
+            - value: state
 
-target_link_libraries(minifi-rocksdb-repos ${LIBMINIFI} Threads::Threads)
-target_link_libraries(minifi-rocksdb-repos RocksDB::RocksDB)
-
-SET (ROCKSDB-REPOS minifi-rocksdb-repos PARENT_SCOPE)
-register_extension(minifi-rocksdb-repos)
+Remote Processing Groups: []
diff --git a/libminifi/test/resources/TestC2DescribeCoreComponentState.yml b/libminifi/test/resources/TestC2DescribeCoreComponentState.yml
new file mode 100644
index 0000000..8565917
--- /dev/null
+++ b/libminifi/test/resources/TestC2DescribeCoreComponentState.yml
@@ -0,0 +1,46 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+Flow Controller:
+  name: MiNiFi Flow
+  id: 2438e3c8-015a-1000-79ca-83af40ec1990
+Processors:
+  - name: TailFile1
+    id: 2438e3c8-015a-1000-79ca-83af40ec1993
+    class: org.apache.nifi.processors.standard.TailFile
+    max concurrent tasks: 1
+    scheduling strategy: TIMER_DRIVEN
+    scheduling period: 1 sec
+    penalization period: 30 sec
+    yield period: 1 sec
+    run duration nanos: 0
+    auto-terminated relationships list: success
+  - name: TailFile2
+    id: 2438e3c8-015a-1000-79ca-83af40ec1994
+    class: org.apache.nifi.processors.standard.TailFile
+    max concurrent tasks: 1
+    scheduling strategy: TIMER_DRIVEN
+    scheduling period: 1 sec
+    penalization period: 30 sec
+    yield period: 1 sec
+    run duration nanos: 0
+    auto-terminated relationships list: success
+
+Connections: []
+
+Remote Processing Groups: []
diff --git a/extensions/rocksdb-repos/CMakeLists.txt b/libminifi/test/resources/UnorderedMapPersistableKeyValueStoreServiceTest.yml
similarity index 63%
copy from extensions/rocksdb-repos/CMakeLists.txt
copy to libminifi/test/resources/UnorderedMapPersistableKeyValueStoreServiceTest.yml
index 3c5395e..61d8f26 100644
--- a/extensions/rocksdb-repos/CMakeLists.txt
+++ b/libminifi/test/resources/UnorderedMapPersistableKeyValueStoreServiceTest.yml
@@ -16,16 +16,21 @@
 # specific language governing permissions and limitations
 # under the License.
 #
+Flow Controller:
+    name: MiNiFi Flow
 
-include(${CMAKE_SOURCE_DIR}/extensions/ExtensionHeader.txt) 
+Processors: []
 
-file(GLOB SOURCES  "*.cpp")
+Connections: []
 
-add_library(minifi-rocksdb-repos STATIC ${SOURCES})
-set_property(TARGET minifi-rocksdb-repos PROPERTY POSITION_INDEPENDENT_CODE ON)
+Controller Services:
+    - name: testcontroller
+      id: 2438e3c8-015a-1000-79ca-83af40ec1994
+      class: UnorderedMapPersistableKeyValueStoreService
+      Properties:
+        Auto Persistence Interval:
+            - value: 0 sec
+        File:
+            - value: state.txt
 
-target_link_libraries(minifi-rocksdb-repos ${LIBMINIFI} Threads::Threads)
-target_link_libraries(minifi-rocksdb-repos RocksDB::RocksDB)
-
-SET (ROCKSDB-REPOS minifi-rocksdb-repos PARENT_SCOPE)
-register_extension(minifi-rocksdb-repos)
+Remote Processing Groups: []