You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by sz...@apache.org on 2023/02/14 16:18:41 UTC

[nifi-minifi-cpp] 01/03: MINIFICPP-2024 Refactor repository hierarchy

This is an automated email from the ASF dual-hosted git repository.

szaszm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit 556407c86bfb641d1b64796105dc583e8f5dca25
Author: Gabor Gyimesi <ga...@gmail.com>
AuthorDate: Tue Feb 14 16:31:30 2023 +0100

    MINIFICPP-2024 Refactor repository hierarchy
    
    We would like to handle all repositories the same way for reporting
    metrics in MINIFICPP-2022 but the current hierarchy structure does not
    allow that to be done easily.
    
    The inheritance hierarchy of repositories is unnecessarily too complex.
    
    While the base of the flow repositories is core::Repository and the base
    of content repositories is core::ContentRepository, the
    VolatileContentRepository inherits from both that does not seem logical.
    
    The VolatileRepository is a template class that only works with two
    specific set of template argument and has unnecessary implementations of
    base classes that is not valid for all child volatile repositories.
    
    There is also a virtual inheritance from core::SerializableComponent for
    flow repositories that should not be needed to be virtual.
    
    This hierarchy should be cleaned up to be able to create a single
    interface for all repositories for metric reporting.
    
    Closes #1485
    Signed-off-by: Marton Szasz <sz...@apache.org>
---
 .../rocksdb-repos/DatabaseContentRepository.h      |  38 +--
 extensions/rocksdb-repos/FlowFileRepository.cpp    | 117 +++++++++
 extensions/rocksdb-repos/FlowFileRepository.h      | 154 ++---------
 extensions/rocksdb-repos/ProvenanceRepository.cpp  | 135 ++++++++++
 extensions/rocksdb-repos/ProvenanceRepository.h    | 155 ++---------
 libminifi/include/core/ContentRepository.h         |  14 +-
 libminifi/include/core/Repository.h                |  41 +--
 .../include/core/repository/FileSystemRepository.h |  14 +-
 .../core/repository/VolatileContentRepository.h    |  31 +--
 .../core/repository/VolatileFlowFileRepository.h   |  49 ++--
 .../core/repository/VolatileProvenanceRepository.h |  24 +-
 .../include/core/repository/VolatileRepository.h   | 287 ++-------------------
 .../core/repository/VolatileRepositoryData.h       |  49 ++++
 libminifi/src/core/RepositoryFactory.cpp           |   6 +-
 .../core/repository/VolatileContentRepository.cpp  |  16 +-
 .../src/core/repository/VolatileRepository.cpp     | 148 ++++++++++-
 .../src/core/repository/VolatileRepositoryData.cpp |  74 ++++++
 libminifi/test/flow-tests/SessionTests.cpp         |   1 -
 .../test/persistence-tests/PersistenceTests.cpp    |   1 -
 libminifi/test/rocksdb-tests/RepoTests.cpp         |   6 +-
 libminifi/test/rocksdb-tests/SwapTests.cpp         |   1 -
 libminifi/test/unit/ProcessSessionTests.cpp        |   2 +-
 libminifi/test/unit/ProvenanceTestHelper.h         |  14 +-
 libminifi/test/unit/SwapTestController.h           |   3 -
 24 files changed, 636 insertions(+), 744 deletions(-)

diff --git a/extensions/rocksdb-repos/DatabaseContentRepository.h b/extensions/rocksdb-repos/DatabaseContentRepository.h
index f00bc7905..9961d2607 100644
--- a/extensions/rocksdb-repos/DatabaseContentRepository.h
+++ b/extensions/rocksdb-repos/DatabaseContentRepository.h
@@ -20,8 +20,6 @@
 #include <string>
 #include <utility>
 
-#include "core/Core.h"
-#include "core/Connectable.h"
 #include "core/ContentRepository.h"
 #include "core/BufferedContentSession.h"
 #include "core/logging/LoggerConfiguration.h"
@@ -31,10 +29,7 @@
 
 namespace org::apache::nifi::minifi::core::repository {
 
-/**
- * DatabaseContentRepository is a content repository that stores data onto the local file system.
- */
-class DatabaseContentRepository : public core::ContentRepository, public core::Connectable {
+class DatabaseContentRepository : public core::ContentRepository {
   class Session : public BufferedContentSession {
    public:
     explicit Session(std::shared_ptr<ContentRepository> repository);
@@ -46,10 +41,10 @@ class DatabaseContentRepository : public core::ContentRepository, public core::C
   static constexpr const char* ENCRYPTION_KEY_NAME = "nifi.database.content.repository.encryption.key";
 
   explicit DatabaseContentRepository(std::string name = getClassName<DatabaseContentRepository>(), const utils::Identifier& uuid = {})
-      : core::Connectable(std::move(name), uuid),
-        is_valid_(false),
-        db_(nullptr),
-        logger_(logging::LoggerFactory<DatabaseContentRepository>::getLogger()) {
+    : core::ContentRepository(std::move(name), uuid),
+      is_valid_(false),
+      db_(nullptr),
+      logger_(logging::LoggerFactory<DatabaseContentRepository>::getLogger()) {
   }
   ~DatabaseContentRepository() override {
     stop();
@@ -60,13 +55,9 @@ class DatabaseContentRepository : public core::ContentRepository, public core::C
   EXTENSIONAPI static constexpr bool SupportsDynamicRelationships = false;
 
   std::shared_ptr<ContentSession> createSession() override;
-
   bool initialize(const std::shared_ptr<minifi::Configure> &configuration) override;
-
   void stop();
-
   std::shared_ptr<io::BaseStream> write(const minifi::ResourceClaim &claim, bool append = false) override;
-
   std::shared_ptr<io::BaseStream> read(const minifi::ResourceClaim &claim) override;
 
   bool close(const minifi::ResourceClaim &claim) override {
@@ -74,27 +65,8 @@ class DatabaseContentRepository : public core::ContentRepository, public core::C
   }
 
   bool remove(const minifi::ResourceClaim &claim) override;
-
   bool exists(const minifi::ResourceClaim &streamId) override;
 
-  void yield() override {
-  }
-
-  /**
-   * Determines if we are connected and operating
-   */
-  bool isRunning() const override {
-    return true;
-  }
-
-  /**
-   * Determines if work is available by this connectable
-   * @return boolean if work is available.
-   */
-  bool isWorkAvailable() override {
-    return true;
-  }
-
  private:
   std::shared_ptr<io::BaseStream> write(const minifi::ResourceClaim &claim, bool append, minifi::internal::WriteBatch* batch);
 
diff --git a/extensions/rocksdb-repos/FlowFileRepository.cpp b/extensions/rocksdb-repos/FlowFileRepository.cpp
index 15487359a..690a797a8 100644
--- a/extensions/rocksdb-repos/FlowFileRepository.cpp
+++ b/extensions/rocksdb-repos/FlowFileRepository.cpp
@@ -257,6 +257,123 @@ void FlowFileRepository::loadComponent(const std::shared_ptr<core::ContentReposi
   initialize_repository();
 }
 
+bool FlowFileRepository::initialize(const std::shared_ptr<Configure> &configure) {
+  config_ = configure;
+  std::string value;
+
+  if (configure->get(Configure::nifi_flowfile_repository_directory_default, value) && !value.empty()) {
+    directory_ = value;
+  }
+  logger_->log_debug("NiFi FlowFile Repository Directory %s", directory_);
+
+  value.clear();
+  if (configure->get(Configure::nifi_flowfile_checkpoint_directory_default, value) && !value.empty()) {
+    checkpoint_dir_ = value;
+  }
+  logger_->log_debug("NiFi FlowFile Checkpoint Directory %s", checkpoint_dir_.string());
+
+  const auto encrypted_env = createEncryptingEnv(utils::crypto::EncryptionManager{configure->getHome()}, DbEncryptionOptions{directory_, ENCRYPTION_KEY_NAME});
+  logger_->log_info("Using %s FlowFileRepository", encrypted_env ? "encrypted" : "plaintext");
+
+  auto db_options = [encrypted_env] (minifi::internal::Writable<rocksdb::DBOptions>& options) {
+    options.set(&rocksdb::DBOptions::create_if_missing, true);
+    options.set(&rocksdb::DBOptions::use_direct_io_for_flush_and_compaction, true);
+    options.set(&rocksdb::DBOptions::use_direct_reads, true);
+    if (encrypted_env) {
+      options.set(&rocksdb::DBOptions::env, encrypted_env.get(), EncryptionEq{});
+    } else {
+      options.set(&rocksdb::DBOptions::env, rocksdb::Env::Default());
+    }
+  };
+
+  // Write buffers are used as db operation logs. When they get filled the events are merged and serialized.
+  // The default size is 64MB.
+  // In our case it's usually too much, causing sawtooth in memory consumption. (Consumes more than the whole MiniFi)
+  // To avoid DB write issues during heavy load it's recommended to have high number of buffer.
+  // Rocksdb's stall feature can also trigger in case the number of buffers is >= 3.
+  // The more buffers we have the more memory rocksdb can utilize without significant memory consumption under low load.
+  auto cf_options = [] (rocksdb::ColumnFamilyOptions& cf_opts) {
+    cf_opts.OptimizeForPointLookup(4);
+    cf_opts.write_buffer_size = 8ULL << 20U;
+    cf_opts.max_write_buffer_number = 20;
+    cf_opts.min_write_buffer_number_to_merge = 1;
+  };
+  db_ = minifi::internal::RocksDatabase::create(db_options, cf_options, directory_);
+  if (db_->open()) {
+    logger_->log_debug("NiFi FlowFile Repository database open %s success", directory_);
+    return true;
+  } else {
+    logger_->log_error("NiFi FlowFile Repository database open %s fail", directory_);
+    return false;
+  }
+}
+
+bool FlowFileRepository::Put(const std::string& key, const uint8_t *buf, size_t bufLen) {
+  // persistent to the DB
+  auto opendb = db_->open();
+  if (!opendb) {
+    return false;
+  }
+  rocksdb::Slice value((const char *) buf, bufLen);
+  auto operation = [&key, &value, &opendb]() { return opendb->Put(rocksdb::WriteOptions(), key, value); };
+  return ExecuteWithRetry(operation);
+}
+
+bool FlowFileRepository::MultiPut(const std::vector<std::pair<std::string, std::unique_ptr<minifi::io::BufferStream>>>& data) {
+  auto opendb = db_->open();
+  if (!opendb) {
+    return false;
+  }
+  auto batch = opendb->createWriteBatch();
+  for (const auto &item : data) {
+    const auto buf = item.second->getBuffer().as_span<const char>();
+    rocksdb::Slice value(buf.data(), buf.size());
+    if (!batch.Put(item.first, value).ok()) {
+      logger_->log_error("Failed to add item to batch operation");
+      return false;
+    }
+  }
+  auto operation = [&batch, &opendb]() { return opendb->Write(rocksdb::WriteOptions(), &batch); };
+  return ExecuteWithRetry(operation);
+}
+
+bool FlowFileRepository::Delete(const std::string& key) {
+  keys_to_delete.enqueue(key);
+  return true;
+}
+
+bool FlowFileRepository::Get(const std::string &key, std::string &value) {
+  auto opendb = db_->open();
+  if (!opendb) {
+    return false;
+  }
+  return opendb->Get(rocksdb::ReadOptions(), key, &value).ok();
+}
+
+bool FlowFileRepository::start() {
+  const bool ret = ThreadedRepository::start();
+  if (swap_loader_) {
+    swap_loader_->start();
+  }
+  return ret;
+}
+
+bool FlowFileRepository::stop() {
+  if (swap_loader_) {
+    swap_loader_->stop();
+  }
+  return ThreadedRepository::stop();
+}
+
+void FlowFileRepository::store(std::vector<std::shared_ptr<core::FlowFile>> flow_files) {
+  gsl_Expects(ranges::all_of(flow_files, &FlowFile::isStored));
+  // pass, flowfiles are already persisted in the repository
+}
+
+std::future<std::vector<std::shared_ptr<core::FlowFile>>> FlowFileRepository::load(std::vector<SwappedFlowFile> flow_files) {
+  return swap_loader_->load(std::move(flow_files));
+}
+
 REGISTER_RESOURCE_AS(FlowFileRepository, InternalResource, ("FlowFileRepository", "flowfilerepository"));
 
 }  // namespace org::apache::nifi::minifi::core::repository
diff --git a/extensions/rocksdb-repos/FlowFileRepository.h b/extensions/rocksdb-repos/FlowFileRepository.h
index 812bde72d..aeda5af9c 100644
--- a/extensions/rocksdb-repos/FlowFileRepository.h
+++ b/extensions/rocksdb-repos/FlowFileRepository.h
@@ -62,22 +62,18 @@ class FlowFileRepository : public ThreadedRepository, public SwapManager {
   static constexpr const char* ENCRYPTION_KEY_NAME = "nifi.flowfile.repository.encryption.key";
 
   FlowFileRepository(std::string name, const utils::Identifier& /*uuid*/)
-      : FlowFileRepository(std::move(name)) {
+    : FlowFileRepository(std::move(name)) {
   }
 
   explicit FlowFileRepository(const std::string& repo_name = "",
-                     std::filesystem::path checkpoint_dir = FLOWFILE_CHECKPOINT_DIRECTORY,
-                     std::string directory = FLOWFILE_REPOSITORY_DIRECTORY,
-                     std::chrono::milliseconds maxPartitionMillis = MAX_FLOWFILE_REPOSITORY_ENTRY_LIFE_TIME,
-                     int64_t maxPartitionBytes = MAX_FLOWFILE_REPOSITORY_STORAGE_SIZE,
-                     std::chrono::milliseconds purgePeriod = FLOWFILE_REPOSITORY_PURGE_PERIOD)
-      : core::SerializableComponent(repo_name),
-        ThreadedRepository(repo_name.length() > 0 ? std::move(repo_name) : core::getClassName<FlowFileRepository>(), std::move(directory), maxPartitionMillis, maxPartitionBytes, purgePeriod),
-        checkpoint_dir_(std::move(checkpoint_dir)),
-        content_repo_(nullptr),
-        checkpoint_(nullptr),
-        logger_(logging::LoggerFactory<FlowFileRepository>::getLogger()) {
-    db_ = nullptr;
+                              std::filesystem::path checkpoint_dir = FLOWFILE_CHECKPOINT_DIRECTORY,
+                              std::string directory = FLOWFILE_REPOSITORY_DIRECTORY,
+                              std::chrono::milliseconds maxPartitionMillis = MAX_FLOWFILE_REPOSITORY_ENTRY_LIFE_TIME,
+                              int64_t maxPartitionBytes = MAX_FLOWFILE_REPOSITORY_STORAGE_SIZE,
+                              std::chrono::milliseconds purgePeriod = FLOWFILE_REPOSITORY_PURGE_PERIOD)
+    : ThreadedRepository(repo_name.length() > 0 ? std::move(repo_name) : core::getClassName<FlowFileRepository>(), std::move(directory), maxPartitionMillis, maxPartitionBytes, purgePeriod),
+      checkpoint_dir_(std::move(checkpoint_dir)),
+      logger_(logging::LoggerFactory<FlowFileRepository>::getLogger()) {
   }
 
   ~FlowFileRepository() override {
@@ -96,132 +92,18 @@ class FlowFileRepository : public ThreadedRepository, public SwapManager {
 
   virtual void printStats();
 
-  bool initialize(const std::shared_ptr<Configure> &configure) override {
-    config_ = configure;
-    std::string value;
+  bool initialize(const std::shared_ptr<Configure> &configure) override;
 
-    if (configure->get(Configure::nifi_flowfile_repository_directory_default, value) && !value.empty()) {
-      directory_ = value;
-    }
-    logger_->log_debug("NiFi FlowFile Repository Directory %s", directory_);
-
-    value.clear();
-    if (configure->get(Configure::nifi_flowfile_checkpoint_directory_default, value) && !value.empty()) {
-      checkpoint_dir_ = value;
-    }
-    logger_->log_debug("NiFi FlowFile Checkpoint Directory %s", checkpoint_dir_.string());
-
-    const auto encrypted_env = createEncryptingEnv(utils::crypto::EncryptionManager{configure->getHome()}, DbEncryptionOptions{directory_, ENCRYPTION_KEY_NAME});
-    logger_->log_info("Using %s FlowFileRepository", encrypted_env ? "encrypted" : "plaintext");
-
-    auto db_options = [encrypted_env] (minifi::internal::Writable<rocksdb::DBOptions>& options) {
-      options.set(&rocksdb::DBOptions::create_if_missing, true);
-      options.set(&rocksdb::DBOptions::use_direct_io_for_flush_and_compaction, true);
-      options.set(&rocksdb::DBOptions::use_direct_reads, true);
-      if (encrypted_env) {
-        options.set(&rocksdb::DBOptions::env, encrypted_env.get(), EncryptionEq{});
-      } else {
-        options.set(&rocksdb::DBOptions::env, rocksdb::Env::Default());
-      }
-    };
-
-    // Write buffers are used as db operation logs. When they get filled the events are merged and serialized.
-    // The default size is 64MB.
-    // In our case it's usually too much, causing sawtooth in memory consumption. (Consumes more than the whole MiniFi)
-    // To avoid DB write issues during heavy load it's recommended to have high number of buffer.
-    // Rocksdb's stall feature can also trigger in case the number of buffers is >= 3.
-    // The more buffers we have the more memory rocksdb can utilize without significant memory consumption under low load.
-    auto cf_options = [] (rocksdb::ColumnFamilyOptions& cf_opts) {
-      cf_opts.OptimizeForPointLookup(4);
-      cf_opts.write_buffer_size = 8ULL << 20U;
-      cf_opts.max_write_buffer_number = 20;
-      cf_opts.min_write_buffer_number_to_merge = 1;
-    };
-    db_ = minifi::internal::RocksDatabase::create(db_options, cf_options, directory_);
-    if (db_->open()) {
-      logger_->log_debug("NiFi FlowFile Repository database open %s success", directory_);
-      return true;
-    } else {
-      logger_->log_error("NiFi FlowFile Repository database open %s fail", directory_);
-      return false;
-    }
-  }
-
-  bool Put(const std::string& key, const uint8_t *buf, size_t bufLen) override {
-    // persistent to the DB
-    auto opendb = db_->open();
-    if (!opendb) {
-      return false;
-    }
-    rocksdb::Slice value((const char *) buf, bufLen);
-    auto operation = [&key, &value, &opendb]() { return opendb->Put(rocksdb::WriteOptions(), key, value); };
-    return ExecuteWithRetry(operation);
-  }
-
-  bool MultiPut(const std::vector<std::pair<std::string, std::unique_ptr<minifi::io::BufferStream>>>& data) override {
-    auto opendb = db_->open();
-    if (!opendb) {
-      return false;
-    }
-    auto batch = opendb->createWriteBatch();
-    for (const auto &item : data) {
-      const auto buf = item.second->getBuffer().as_span<const char>();
-      rocksdb::Slice value(buf.data(), buf.size());
-      if (!batch.Put(item.first, value).ok()) {
-        logger_->log_error("Failed to add item to batch operation");
-        return false;
-      }
-    }
-    auto operation = [&batch, &opendb]() { return opendb->Write(rocksdb::WriteOptions(), &batch); };
-    return ExecuteWithRetry(operation);
-  }
-
-  /**
-   * Deletes the key
-   * @return status of the delete operation
-   */
-  bool Delete(const std::string& key) override {
-    keys_to_delete.enqueue(key);
-    return true;
-  }
-
-  /**
-   * Sets the value from the provided key
-   * @return status of the get operation.
-   */
-  bool Get(const std::string &key, std::string &value) override {
-    auto opendb = db_->open();
-    if (!opendb) {
-      return false;
-    }
-    return opendb->Get(rocksdb::ReadOptions(), key, &value).ok();
-  }
+  bool Put(const std::string& key, const uint8_t *buf, size_t bufLen) override;
+  bool MultiPut(const std::vector<std::pair<std::string, std::unique_ptr<minifi::io::BufferStream>>>& data) override;
+  bool Delete(const std::string& key) override;
+  bool Get(const std::string &key, std::string &value) override;
 
   void loadComponent(const std::shared_ptr<core::ContentRepository> &content_repo) override;
-
-  bool start() override {
-    const bool ret = ThreadedRepository::start();
-    if (swap_loader_) {
-      swap_loader_->start();
-    }
-    return ret;
-  }
-
-  bool stop() override {
-    if (swap_loader_) {
-      swap_loader_->stop();
-    }
-    return ThreadedRepository::stop();
-  }
-
-  void store([[maybe_unused]] std::vector<std::shared_ptr<core::FlowFile>> flow_files) override {
-    gsl_Expects(ranges::all_of(flow_files, &FlowFile::isStored));
-    // pass, flowfiles are already persisted in the repository
-  }
-
-  std::future<std::vector<std::shared_ptr<core::FlowFile>>> load(std::vector<SwappedFlowFile> flow_files) override {
-    return swap_loader_->load(std::move(flow_files));
-  }
+  bool start() override;
+  bool stop() override;
+  void store([[maybe_unused]] std::vector<std::shared_ptr<core::FlowFile>> flow_files) override;
+  std::future<std::vector<std::shared_ptr<core::FlowFile>>> load(std::vector<SwappedFlowFile> flow_files) override;
 
  private:
   void run() override;
diff --git a/extensions/rocksdb-repos/ProvenanceRepository.cpp b/extensions/rocksdb-repos/ProvenanceRepository.cpp
index d4619ad6c..0c7da870d 100644
--- a/extensions/rocksdb-repos/ProvenanceRepository.cpp
+++ b/extensions/rocksdb-repos/ProvenanceRepository.cpp
@@ -50,6 +50,141 @@ void ProvenanceRepository::run() {
   }
 }
 
+bool ProvenanceRepository::initialize(const std::shared_ptr<org::apache::nifi::minifi::Configure> &config) {
+  std::string value;
+  if (config->get(Configure::nifi_provenance_repository_directory_default, value) && !value.empty()) {
+    directory_ = value;
+  }
+  logger_->log_debug("MiNiFi Provenance Repository Directory %s", directory_);
+  if (config->get(Configure::nifi_provenance_repository_max_storage_size, value)) {
+    core::Property::StringToInt(value, max_partition_bytes_);
+  }
+  logger_->log_debug("MiNiFi Provenance Max Partition Bytes %d", max_partition_bytes_);
+  if (config->get(Configure::nifi_provenance_repository_max_storage_time, value)) {
+    if (auto max_partition = utils::timeutils::StringToDuration<std::chrono::milliseconds>(value))
+      max_partition_millis_ = *max_partition;
+  }
+  logger_->log_debug("MiNiFi Provenance Max Storage Time: [%" PRId64 "] ms",
+                      int64_t{max_partition_millis_.count()});
+  rocksdb::Options options;
+  options.create_if_missing = true;
+  options.use_direct_io_for_flush_and_compaction = true;
+  options.use_direct_reads = true;
+  // Rocksdb write buffers act as a log of database operation: grow till reaching the limit, serialized after
+  // This shouldn't go above 16MB and the configured total size of the db should cap it as well
+  int64_t max_buffer_size = 16 << 20;
+  options.write_buffer_size = gsl::narrow<size_t>(std::min(max_buffer_size, max_partition_bytes_));
+  options.max_write_buffer_number = 4;
+  options.min_write_buffer_number_to_merge = 1;
+
+  options.compaction_style = rocksdb::CompactionStyle::kCompactionStyleFIFO;
+  options.compaction_options_fifo = rocksdb::CompactionOptionsFIFO(max_partition_bytes_, false);
+  if (max_partition_millis_ > std::chrono::milliseconds(0)) {
+    options.ttl = std::chrono::duration_cast<std::chrono::seconds>(max_partition_millis_).count();
+  }
+
+  logger_->log_info("Write buffer: %llu", options.write_buffer_size);
+  logger_->log_info("Max partition bytes: %llu", max_partition_bytes_);
+  logger_->log_info("Ttl: %llu", options.ttl);
+
+  rocksdb::DB* db;
+  rocksdb::Status status = rocksdb::DB::Open(options, directory_, &db);
+  if (status.ok()) {
+    logger_->log_debug("MiNiFi Provenance Repository database open %s success", directory_);
+    db_.reset(db);
+  } else {
+    logger_->log_error("MiNiFi Provenance Repository database open %s failed: %s", directory_, status.ToString());
+    return false;
+  }
+
+  return true;
+}
+
+bool ProvenanceRepository::Put(const std::string& key, const uint8_t *buf, size_t bufLen) {
+  // persist to the DB
+  rocksdb::Slice value((const char *) buf, bufLen);
+  return db_->Put(rocksdb::WriteOptions(), key, value).ok();
+}
+
+bool ProvenanceRepository::MultiPut(const std::vector<std::pair<std::string, std::unique_ptr<minifi::io::BufferStream>>>& data) {
+  rocksdb::WriteBatch batch;
+  for (const auto &item : data) {
+    const auto buf = item.second->getBuffer().as_span<const char>();
+    rocksdb::Slice value(buf.data(), buf.size());
+    if (!batch.Put(item.first, value).ok()) {
+      return false;
+    }
+  }
+  return db_->Write(rocksdb::WriteOptions(), &batch).ok();
+}
+
+bool ProvenanceRepository::Get(const std::string &key, std::string &value) {
+  return db_->Get(rocksdb::ReadOptions(), key, &value).ok();
+}
+
+bool ProvenanceRepository::Serialize(const std::string &key, const uint8_t *buffer, const size_t bufferSize) {
+  return Put(key, buffer, bufferSize);
+}
+
+bool ProvenanceRepository::get(std::vector<std::shared_ptr<core::CoreComponent>> &store, size_t max_size) {
+  std::unique_ptr<rocksdb::Iterator> it(db_->NewIterator(rocksdb::ReadOptions()));
+  for (it->SeekToFirst(); it->Valid(); it->Next()) {
+    std::shared_ptr<ProvenanceEventRecord> eventRead = std::make_shared<ProvenanceEventRecord>();
+    std::string key = it->key().ToString();
+    if (store.size() >= max_size)
+      break;
+    if (eventRead->DeSerialize(gsl::make_span(it->value()).as_span<const std::byte>())) {
+      store.push_back(std::dynamic_pointer_cast<core::CoreComponent>(eventRead));
+    }
+  }
+  return true;
+}
+
+bool ProvenanceRepository::DeSerialize(std::vector<std::shared_ptr<core::SerializableComponent>> &records, size_t &max_size,
+                                       std::function<std::shared_ptr<core::SerializableComponent>()> lambda) {
+  std::unique_ptr<rocksdb::Iterator> it(db_->NewIterator(rocksdb::ReadOptions()));
+  size_t requested_batch = max_size;
+  max_size = 0;
+  for (it->SeekToFirst(); it->Valid(); it->Next()) {
+    if (max_size >= requested_batch)
+      break;
+    std::shared_ptr<core::SerializableComponent> eventRead = lambda();
+    std::string key = it->key().ToString();
+    if (eventRead->DeSerialize(gsl::make_span(it->value()).as_span<const std::byte>())) {
+      max_size++;
+      records.push_back(eventRead);
+    }
+  }
+  return max_size > 0;
+}
+
+bool ProvenanceRepository::DeSerialize(std::vector<std::shared_ptr<core::SerializableComponent>> &store, size_t &max_size) {
+  std::unique_ptr<rocksdb::Iterator> it(db_->NewIterator(rocksdb::ReadOptions()));
+  max_size = 0;
+  for (it->SeekToFirst(); it->Valid(); it->Next()) {
+    std::shared_ptr<ProvenanceEventRecord> eventRead = std::make_shared<ProvenanceEventRecord>();
+    std::string key = it->key().ToString();
+
+    if (store.at(max_size)->DeSerialize(gsl::make_span(it->value()).as_span<const std::byte>())) {
+      max_size++;
+    }
+    if (store.size() >= max_size)
+      break;
+  }
+  return max_size > 0;
+}
+
+void ProvenanceRepository::destroy() {
+  db_.reset();
+}
+
+uint64_t ProvenanceRepository::getKeyCount() const {
+  std::string key_count;
+  db_->GetProperty("rocksdb.estimate-num-keys", &key_count);
+
+  return std::stoull(key_count);
+}
+
 REGISTER_RESOURCE_AS(ProvenanceRepository, InternalResource, ("ProvenanceRepository", "provenancerepository"));
 
 }  // namespace org::apache::nifi::minifi::provenance
diff --git a/extensions/rocksdb-repos/ProvenanceRepository.h b/extensions/rocksdb-repos/ProvenanceRepository.h
index 31ea9e37e..0dc9cb6e4 100644
--- a/extensions/rocksdb-repos/ProvenanceRepository.h
+++ b/extensions/rocksdb-repos/ProvenanceRepository.h
@@ -42,16 +42,16 @@ constexpr auto PROVENANCE_PURGE_PERIOD = std::chrono::milliseconds(2500);
 class ProvenanceRepository : public core::ThreadedRepository {
  public:
   ProvenanceRepository(std::string name, const utils::Identifier& /*uuid*/)
-      : ProvenanceRepository(std::move(name)) {
+    : ProvenanceRepository(std::move(name)) {
   }
 
-  explicit ProvenanceRepository(std::string repo_name = "", std::string directory = PROVENANCE_DIRECTORY,
-      std::chrono::milliseconds maxPartitionMillis = MAX_PROVENANCE_ENTRY_LIFE_TIME,
-      int64_t maxPartitionBytes = MAX_PROVENANCE_STORAGE_SIZE,
-      std::chrono::milliseconds purgePeriod = PROVENANCE_PURGE_PERIOD)
-    : core::SerializableComponent(repo_name),
-      ThreadedRepository(repo_name.length() > 0 ? std::move(repo_name) : core::getClassName<ProvenanceRepository>(), directory,
-        maxPartitionMillis, maxPartitionBytes, purgePeriod) {
+  explicit ProvenanceRepository(std::string repo_name = "",
+                                std::string directory = PROVENANCE_DIRECTORY,
+                                std::chrono::milliseconds maxPartitionMillis = MAX_PROVENANCE_ENTRY_LIFE_TIME,
+                                int64_t maxPartitionBytes = MAX_PROVENANCE_STORAGE_SIZE,
+                                std::chrono::milliseconds purgePeriod = PROVENANCE_PURGE_PERIOD)
+    : ThreadedRepository(repo_name.length() > 0 ? std::move(repo_name) : core::getClassName<ProvenanceRepository>(),
+        directory, maxPartitionMillis, maxPartitionBytes, purgePeriod) {
   }
 
   ~ProvenanceRepository() override {
@@ -68,145 +68,26 @@ class ProvenanceRepository : public core::ThreadedRepository {
     return false;
   }
 
-  bool initialize(const std::shared_ptr<org::apache::nifi::minifi::Configure> &config) override {
-    std::string value;
-    if (config->get(Configure::nifi_provenance_repository_directory_default, value) && !value.empty()) {
-      directory_ = value;
-    }
-    logger_->log_debug("MiNiFi Provenance Repository Directory %s", directory_);
-    if (config->get(Configure::nifi_provenance_repository_max_storage_size, value)) {
-      core::Property::StringToInt(value, max_partition_bytes_);
-    }
-    logger_->log_debug("MiNiFi Provenance Max Partition Bytes %d", max_partition_bytes_);
-    if (config->get(Configure::nifi_provenance_repository_max_storage_time, value)) {
-      if (auto max_partition = utils::timeutils::StringToDuration<std::chrono::milliseconds>(value))
-        max_partition_millis_ = *max_partition;
-    }
-    logger_->log_debug("MiNiFi Provenance Max Storage Time: [%" PRId64 "] ms",
-                       int64_t{max_partition_millis_.count()});
-    rocksdb::Options options;
-    options.create_if_missing = true;
-    options.use_direct_io_for_flush_and_compaction = true;
-    options.use_direct_reads = true;
-    // Rocksdb write buffers act as a log of database operation: grow till reaching the limit, serialized after
-    // This shouldn't go above 16MB and the configured total size of the db should cap it as well
-    int64_t max_buffer_size = 16 << 20;
-    options.write_buffer_size = gsl::narrow<size_t>(std::min(max_buffer_size, max_partition_bytes_));
-    options.max_write_buffer_number = 4;
-    options.min_write_buffer_number_to_merge = 1;
-
-    options.compaction_style = rocksdb::CompactionStyle::kCompactionStyleFIFO;
-    options.compaction_options_fifo = rocksdb::CompactionOptionsFIFO(max_partition_bytes_, false);
-    if (max_partition_millis_ > std::chrono::milliseconds(0)) {
-      options.ttl = std::chrono::duration_cast<std::chrono::seconds>(max_partition_millis_).count();
-    }
-
-    logger_->log_info("Write buffer: %llu", options.write_buffer_size);
-    logger_->log_info("Max partition bytes: %llu", max_partition_bytes_);
-    logger_->log_info("Ttl: %llu", options.ttl);
-
-    rocksdb::DB* db;
-    rocksdb::Status status = rocksdb::DB::Open(options, directory_, &db);
-    if (status.ok()) {
-      logger_->log_debug("MiNiFi Provenance Repository database open %s success", directory_);
-      db_.reset(db);
-    } else {
-      logger_->log_error("MiNiFi Provenance Repository database open %s failed: %s", directory_, status.ToString());
-      return false;
-    }
+  bool initialize(const std::shared_ptr<org::apache::nifi::minifi::Configure> &config) override;
 
-    return true;
-  }
-
-  bool Put(const std::string& key, const uint8_t *buf, size_t bufLen) override {
-    // persist to the DB
-    rocksdb::Slice value((const char *) buf, bufLen);
-    return db_->Put(rocksdb::WriteOptions(), key, value).ok();
-  }
-
-  bool MultiPut(const std::vector<std::pair<std::string, std::unique_ptr<minifi::io::BufferStream>>>& data) override {
-    rocksdb::WriteBatch batch;
-    for (const auto &item : data) {
-      const auto buf = item.second->getBuffer().as_span<const char>();
-      rocksdb::Slice value(buf.data(), buf.size());
-      if (!batch.Put(item.first, value).ok()) {
-        return false;
-      }
-    }
-    return db_->Write(rocksdb::WriteOptions(), &batch).ok();
-  }
+  bool Put(const std::string& key, const uint8_t *buf, size_t bufLen) override;
+  bool MultiPut(const std::vector<std::pair<std::string, std::unique_ptr<minifi::io::BufferStream>>>& data) override;
 
   bool Delete(const std::string& /*key*/) override {
     // The repo is cleaned up by itself, there is no need to delete items.
     return true;
   }
 
-  bool Get(const std::string &key, std::string &value) override {
-    return db_->Get(rocksdb::ReadOptions(), key, &value).ok();
-  }
-
-  bool Serialize(const std::string &key, const uint8_t *buffer, const size_t bufferSize) override {
-    return Put(key, buffer, bufferSize);
-  }
-
-  virtual bool get(std::vector<std::shared_ptr<core::CoreComponent>> &store, size_t max_size) {
-    std::unique_ptr<rocksdb::Iterator> it(db_->NewIterator(rocksdb::ReadOptions()));
-    for (it->SeekToFirst(); it->Valid(); it->Next()) {
-      std::shared_ptr<ProvenanceEventRecord> eventRead = std::make_shared<ProvenanceEventRecord>();
-      std::string key = it->key().ToString();
-      if (store.size() >= max_size)
-        break;
-      if (eventRead->DeSerialize(gsl::make_span(it->value()).as_span<const std::byte>())) {
-        store.push_back(std::dynamic_pointer_cast<core::CoreComponent>(eventRead));
-      }
-    }
-    return true;
-  }
+  bool Get(const std::string &key, std::string &value) override;
 
+  bool Serialize(const std::string &key, const uint8_t *buffer, const size_t bufferSize) override;
   bool DeSerialize(std::vector<std::shared_ptr<core::SerializableComponent>> &records, size_t &max_size,
-                   std::function<std::shared_ptr<core::SerializableComponent>()> lambda) override {
-    std::unique_ptr<rocksdb::Iterator> it(db_->NewIterator(rocksdb::ReadOptions()));
-    size_t requested_batch = max_size;
-    max_size = 0;
-    for (it->SeekToFirst(); it->Valid(); it->Next()) {
-      if (max_size >= requested_batch)
-        break;
-      std::shared_ptr<core::SerializableComponent> eventRead = lambda();
-      std::string key = it->key().ToString();
-      if (eventRead->DeSerialize(gsl::make_span(it->value()).as_span<const std::byte>())) {
-        max_size++;
-        records.push_back(eventRead);
-      }
-    }
-    return max_size > 0;
-  }
-
-  bool DeSerialize(std::vector<std::shared_ptr<core::SerializableComponent>> &store, size_t &max_size) override {
-    std::unique_ptr<rocksdb::Iterator> it(db_->NewIterator(rocksdb::ReadOptions()));
-    max_size = 0;
-    for (it->SeekToFirst(); it->Valid(); it->Next()) {
-      std::shared_ptr<ProvenanceEventRecord> eventRead = std::make_shared<ProvenanceEventRecord>();
-      std::string key = it->key().ToString();
-
-      if (store.at(max_size)->DeSerialize(gsl::make_span(it->value()).as_span<const std::byte>())) {
-        max_size++;
-      }
-      if (store.size() >= max_size)
-        break;
-    }
-    return max_size > 0;
-  }
+                   std::function<std::shared_ptr<core::SerializableComponent>()> lambda) override;
+  bool DeSerialize(std::vector<std::shared_ptr<core::SerializableComponent>> &store, size_t &max_size) override;
 
-  void destroy() {
-    db_.reset();
-  }
-
-  uint64_t getKeyCount() const {
-    std::string key_count;
-    db_->GetProperty("rocksdb.estimate-num-keys", &key_count);
-
-    return std::stoull(key_count);
-  }
+  void destroy();
+  uint64_t getKeyCount() const;
+  virtual bool get(std::vector<std::shared_ptr<core::CoreComponent>> &store, size_t max_size);
 
   // Prevent default copy constructor and assignment operation
   // Only support pass by reference or pointer
diff --git a/libminifi/include/core/ContentRepository.h b/libminifi/include/core/ContentRepository.h
index b53f66c9b..84ac4eded 100644
--- a/libminifi/include/core/ContentRepository.h
+++ b/libminifi/include/core/ContentRepository.h
@@ -20,22 +20,22 @@
 #include <map>
 #include <memory>
 #include <string>
+#include <utility>
 
 #include "properties/Configure.h"
 #include "ResourceClaim.h"
-#include "io/BufferStream.h"
 #include "StreamManager.h"
-#include "core/Connectable.h"
+#include "core/Core.h"
 #include "ContentSession.h"
-#include "utils/GeneralUtils.h"
 
 namespace org::apache::nifi::minifi::core {
 
 /**
  * Content repository definition that extends StreamManager.
  */
-class ContentRepository : public StreamManager<minifi::ResourceClaim>, public utils::EnableSharedFromThis<ContentRepository> {
+class ContentRepository : public StreamManager<minifi::ResourceClaim>, public utils::EnableSharedFromThis<ContentRepository>, public core::CoreComponent {
  public:
+  explicit ContentRepository(std::string name, const utils::Identifier& uuid = {}) : core::CoreComponent(std::move(name), uuid) {}
   ~ContentRepository() override = default;
 
   /**
@@ -44,22 +44,16 @@ class ContentRepository : public StreamManager<minifi::ResourceClaim>, public ut
   virtual bool initialize(const std::shared_ptr<Configure> &configure) = 0;
 
   std::string getStoragePath() const override;
-
   virtual std::shared_ptr<ContentSession> createSession();
-
   void reset();
 
   uint32_t getStreamCount(const minifi::ResourceClaim &streamId) override;
-
   void incrementStreamCount(const minifi::ResourceClaim &streamId) override;
-
   StreamState decrementStreamCount(const minifi::ResourceClaim &streamId) override;
 
  protected:
   std::string directory_;
-
   std::mutex count_map_mutex_;
-
   std::map<std::string, uint32_t> count_map_;
 };
 
diff --git a/libminifi/include/core/Repository.h b/libminifi/include/core/Repository.h
index 54ffeab2e..901efcecb 100644
--- a/libminifi/include/core/Repository.h
+++ b/libminifi/include/core/Repository.h
@@ -17,8 +17,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-#ifndef LIBMINIFI_INCLUDE_CORE_REPOSITORY_H_
-#define LIBMINIFI_INCLUDE_CORE_REPOSITORY_H_
+#pragma once
 
 #include <atomic>
 #include <cstdint>
@@ -55,21 +54,21 @@ constexpr auto MAX_REPOSITORY_STORAGE_SIZE = 10_MiB;
 constexpr auto MAX_REPOSITORY_ENTRY_LIFE_TIME = std::chrono::minutes(10);
 constexpr auto REPOSITORY_PURGE_PERIOD = std::chrono::milliseconds(2500);
 
-class Repository : public virtual core::SerializableComponent {
+class Repository : public core::SerializableComponent {
  public:
   explicit Repository(std::string repo_name = "Repository",
-             std::string directory = REPOSITORY_DIRECTORY,
-             std::chrono::milliseconds maxPartitionMillis = MAX_REPOSITORY_ENTRY_LIFE_TIME,
-             int64_t maxPartitionBytes = MAX_REPOSITORY_STORAGE_SIZE,
-             std::chrono::milliseconds purgePeriod = REPOSITORY_PURGE_PERIOD)
-      : core::SerializableComponent(std::move(repo_name)),
-        directory_(std::move(directory)),
-        max_partition_millis_(maxPartitionMillis),
-        max_partition_bytes_(maxPartitionBytes),
-        purge_period_(purgePeriod),
-        repo_full_(false),
-        repo_size_(0),
-        logger_(logging::LoggerFactory<Repository>::getLogger()) {
+                      std::string directory = REPOSITORY_DIRECTORY,
+                      std::chrono::milliseconds maxPartitionMillis = MAX_REPOSITORY_ENTRY_LIFE_TIME,
+                      int64_t maxPartitionBytes = MAX_REPOSITORY_STORAGE_SIZE,
+                      std::chrono::milliseconds purgePeriod = REPOSITORY_PURGE_PERIOD)
+    : core::SerializableComponent(std::move(repo_name)),
+      directory_(std::move(directory)),
+      max_partition_millis_(maxPartitionMillis),
+      max_partition_bytes_(maxPartitionBytes),
+      purge_period_(purgePeriod),
+      repo_full_(false),
+      repo_size_(0),
+      logger_(logging::LoggerFactory<Repository>::getLogger()) {
   }
 
   virtual bool initialize(const std::shared_ptr<Configure>& /*configure*/) = 0;
@@ -117,7 +116,6 @@ class Repository : public virtual core::SerializableComponent {
     return false;
   }
 
-  // whether the repo is full
   virtual bool isFull() {
     return repo_full_;
   }
@@ -164,23 +162,14 @@ class Repository : public virtual core::SerializableComponent {
     return true;
   }
 
-  /**
-   * Base implementation returns true;
-   */
   bool Serialize(const std::shared_ptr<core::SerializableComponent>& /*store*/) override {
     return true;
   }
 
-  /**
-   * Base implementation returns true;
-   */
   bool DeSerialize(const std::shared_ptr<core::SerializableComponent>& /*store*/) override {
     return true;
   }
 
-  /**
-   * Base implementation returns true;
-   */
   bool DeSerialize(gsl::span<const std::byte>) override {
     return true;
   }
@@ -215,7 +204,6 @@ class Repository : public virtual core::SerializableComponent {
   // max db size
   int64_t max_partition_bytes_;
   std::chrono::milliseconds purge_period_;
-  // whether to stop accepting provenance event
   std::atomic<bool> repo_full_;
 
   std::atomic<uint64_t> repo_size_;
@@ -225,4 +213,3 @@ class Repository : public virtual core::SerializableComponent {
 };
 
 }  // namespace org::apache::nifi::minifi::core
-#endif  // LIBMINIFI_INCLUDE_CORE_REPOSITORY_H_
diff --git a/libminifi/include/core/repository/FileSystemRepository.h b/libminifi/include/core/repository/FileSystemRepository.h
index 5b6edaecb..66cec39ef 100644
--- a/libminifi/include/core/repository/FileSystemRepository.h
+++ b/libminifi/include/core/repository/FileSystemRepository.h
@@ -22,31 +22,24 @@
 #include <string>
 #include <utility>
 
-#include "core/Core.h"
 #include "../ContentRepository.h"
 #include "properties/Configure.h"
 #include "core/logging/LoggerFactory.h"
 
 namespace org::apache::nifi::minifi::core::repository {
 
-/**
- * FileSystemRepository is a content repository that stores data onto the local file system.
- */
-class FileSystemRepository : public core::ContentRepository, public core::CoreComponent {
+class FileSystemRepository : public core::ContentRepository {
  public:
   explicit FileSystemRepository(std::string name = getClassName<FileSystemRepository>())
-      : core::CoreComponent(std::move(name)),
-        logger_(logging::LoggerFactory<FileSystemRepository>::getLogger()) {
+    : core::ContentRepository(std::move(name)),
+      logger_(logging::LoggerFactory<FileSystemRepository>::getLogger()) {
   }
 
   ~FileSystemRepository() override = default;
 
   bool initialize(const std::shared_ptr<minifi::Configure>& configuration) override;
-
   bool exists(const minifi::ResourceClaim& streamId) override;
-
   std::shared_ptr<io::BaseStream> write(const minifi::ResourceClaim& claim, bool append = false) override;
-
   std::shared_ptr<io::BaseStream> read(const minifi::ResourceClaim& claim) override;
 
   bool close(const minifi::ResourceClaim& claim) override {
@@ -54,7 +47,6 @@ class FileSystemRepository : public core::ContentRepository, public core::CoreCo
   }
 
   bool remove(const minifi::ResourceClaim& claim) override;
-
   std::shared_ptr<ContentSession> createSession() override;
 
  private:
diff --git a/libminifi/include/core/repository/VolatileContentRepository.h b/libminifi/include/core/repository/VolatileContentRepository.h
index 2759af81c..54c1dcc6f 100644
--- a/libminifi/include/core/repository/VolatileContentRepository.h
+++ b/libminifi/include/core/repository/VolatileContentRepository.h
@@ -15,14 +15,12 @@
  * limitations under the License.
  */
 
-#ifndef LIBMINIFI_INCLUDE_CORE_REPOSITORY_VOLATILECONTENTREPOSITORY_H_
-#define LIBMINIFI_INCLUDE_CORE_REPOSITORY_VOLATILECONTENTREPOSITORY_H_
+#pragma once
 
 #include <map>
 #include <memory>
 #include <string>
 
-#include "core/Core.h"
 #include "AtomicRepoEntries.h"
 #include "io/AtomicEntryStream.h"
 #include "../ContentRepository.h"
@@ -31,23 +29,22 @@
 #include "core/Connectable.h"
 #include "core/logging/LoggerFactory.h"
 #include "utils/GeneralUtils.h"
+#include "VolatileRepositoryData.h"
 
 namespace org::apache::nifi::minifi::core::repository {
-
 /**
  * Purpose: Stages content into a volatile area of memory. Note that when the maximum number
  * of entries is consumed we will rollback a session to wait for others to be freed.
  */
-class VolatileContentRepository : public core::ContentRepository, public core::repository::VolatileRepository<ResourceClaim::Path> {
+class VolatileContentRepository : public core::ContentRepository {
  public:
   static const char *minimal_locking;
 
   explicit VolatileContentRepository(std::string name = getClassName<VolatileContentRepository>())
-      : core::SerializableComponent(name),
-        core::repository::VolatileRepository<ResourceClaim::Path>(name),
-        minimize_locking_(true),
-        logger_(logging::LoggerFactory<VolatileContentRepository>::getLogger()) {
-    max_count_ = 15000;
+    : core::ContentRepository(name),
+      repo_data_(15000, static_cast<size_t>(10_MiB * 0.75)),
+      minimize_locking_(true),
+      logger_(logging::LoggerFactory<VolatileContentRepository>::getLogger()) {
   }
 
   ~VolatileContentRepository() override {
@@ -61,14 +58,6 @@ class VolatileContentRepository : public core::ContentRepository, public core::r
     }
   }
 
-  bool start() override {
-    return true;
-  }
-
-  bool stop() override {
-    return true;
-  }
-
   /**
    * Initialize the volatile content repo
    * @param configure configuration
@@ -106,18 +95,14 @@ class VolatileContentRepository : public core::ContentRepository, public core::r
   bool remove(const minifi::ResourceClaim &claim) override;
 
  private:
+  VolatileRepositoryData repo_data_;
   bool minimize_locking_;
 
   // mutex and master list that represent a cache of Atomic entries. this exists so that we don't have to walk the atomic entry list.
   // The idea is to reduce the computational complexity while keeping access as maximally lock free as we can.
   std::mutex map_mutex_;
-
   std::map<ResourceClaim::Path, AtomicEntry<ResourceClaim::Path>*> master_list_;
-
-  // logger
   std::shared_ptr<logging::Logger> logger_;
 };
 
 }  // namespace org::apache::nifi::minifi::core::repository
-
-#endif  // LIBMINIFI_INCLUDE_CORE_REPOSITORY_VOLATILECONTENTREPOSITORY_H_
diff --git a/libminifi/include/core/repository/VolatileFlowFileRepository.h b/libminifi/include/core/repository/VolatileFlowFileRepository.h
index 23b00be97..a3b0d9261 100644
--- a/libminifi/include/core/repository/VolatileFlowFileRepository.h
+++ b/libminifi/include/core/repository/VolatileFlowFileRepository.h
@@ -15,8 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-#ifndef LIBMINIFI_INCLUDE_CORE_REPOSITORY_VOLATILEFLOWFILEREPOSITORY_H_
-#define LIBMINIFI_INCLUDE_CORE_REPOSITORY_VOLATILEFLOWFILEREPOSITORY_H_
+#pragma once
 
 #include <memory>
 #include <string>
@@ -28,18 +27,13 @@
 
 struct VolatileFlowFileRepositoryTestAccessor;
 
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace core {
-namespace repository {
+namespace org::apache::nifi::minifi::core::repository {
 
 /**
  * Volatile flow file repository. keeps a running counter of the current location, freeing
  * those which we no longer hold.
  */
-class VolatileFlowFileRepository : public VolatileRepository<std::string, core::ThreadedRepository> {
+class VolatileFlowFileRepository : public VolatileRepository {
   friend struct ::VolatileFlowFileRepositoryTestAccessor;
 
  public:
@@ -48,10 +42,7 @@ class VolatileFlowFileRepository : public VolatileRepository<std::string, core::
                                       std::chrono::milliseconds maxPartitionMillis = MAX_REPOSITORY_ENTRY_LIFE_TIME,
                                       int64_t maxPartitionBytes = MAX_REPOSITORY_STORAGE_SIZE,
                                       std::chrono::milliseconds purgePeriod = REPOSITORY_PURGE_PERIOD)
-      : core::SerializableComponent(repo_name),
-        VolatileRepository(repo_name.length() > 0 ? repo_name : core::getClassName<VolatileRepository>(), "", maxPartitionMillis, maxPartitionBytes, purgePeriod) {
-    purge_required_ = true;
-    content_repo_ = nullptr;
+    : VolatileRepository(repo_name.length() > 0 ? repo_name : core::getClassName<VolatileRepository>(), "", maxPartitionMillis, maxPartitionBytes, purgePeriod) {
   }
 
   ~VolatileFlowFileRepository() override {
@@ -72,19 +63,20 @@ class VolatileFlowFileRepository : public VolatileRepository<std::string, core::
   }
 
   void flush() override {
-    if (purge_required_ && nullptr != content_repo_) {
-      std::lock_guard<std::mutex> lock(purge_mutex_);
-      for (auto purgeItem : purge_list_) {
-        utils::Identifier containerId;
-        auto eventRead = FlowFileRecord::DeSerialize(gsl::make_span(purgeItem).as_span<const std::byte>(), content_repo_, containerId);
-        if (eventRead) {
-          auto claim = eventRead->getResourceClaim();
-          if (claim) claim->decreaseFlowFileRecordOwnedCount();
-        }
+    if (!content_repo_) {
+      return;
+    }
+    std::lock_guard<std::mutex> lock(purge_mutex_);
+    for (auto purgeItem : purge_list_) {
+      utils::Identifier containerId;
+      auto eventRead = FlowFileRecord::DeSerialize(gsl::make_span(purgeItem).as_span<const std::byte>(), content_repo_, containerId);
+      if (eventRead) {
+        auto claim = eventRead->getResourceClaim();
+        if (claim) claim->decreaseFlowFileRecordOwnedCount();
       }
-      purge_list_.resize(0);
-      purge_list_.clear();
     }
+    purge_list_.resize(0);
+    purge_list_.clear();
   }
 
   void loadComponent(const std::shared_ptr<core::ContentRepository> &content_repo) override {
@@ -101,11 +93,4 @@ class VolatileFlowFileRepository : public VolatileRepository<std::string, core::
   std::shared_ptr<core::ContentRepository> content_repo_;
   std::thread thread_;
 };
-}  // namespace repository
-}  // namespace core
-}  // namespace minifi
-}  // namespace nifi
-}  // namespace apache
-}  // namespace org
-
-#endif  // LIBMINIFI_INCLUDE_CORE_REPOSITORY_VOLATILEFLOWFILEREPOSITORY_H_
+}  // namespace org::apache::nifi::minifi::core::repository
diff --git a/libminifi/include/core/repository/VolatileProvenanceRepository.h b/libminifi/include/core/repository/VolatileProvenanceRepository.h
index a8f03f198..ccebf22c9 100644
--- a/libminifi/include/core/repository/VolatileProvenanceRepository.h
+++ b/libminifi/include/core/repository/VolatileProvenanceRepository.h
@@ -15,30 +15,23 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-#ifndef LIBMINIFI_INCLUDE_CORE_REPOSITORY_VOLATILEPROVENANCEREPOSITORY_H_
-#define LIBMINIFI_INCLUDE_CORE_REPOSITORY_VOLATILEPROVENANCEREPOSITORY_H_
+#pragma once
 
 #include <string>
 
 #include "VolatileRepository.h"
 #include "core/ThreadedRepository.h"
 
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace core {
-namespace repository {
+namespace org::apache::nifi::minifi::core::repository {
 
-class VolatileProvenanceRepository : public VolatileRepository<std::string, core::ThreadedRepository> {
+class VolatileProvenanceRepository : public VolatileRepository {
  public:
   explicit VolatileProvenanceRepository(std::string repo_name = "",
                                         std::string /*dir*/ = REPOSITORY_DIRECTORY,
                                         std::chrono::milliseconds maxPartitionMillis = MAX_REPOSITORY_ENTRY_LIFE_TIME,
                                         int64_t maxPartitionBytes = MAX_REPOSITORY_STORAGE_SIZE,
                                         std::chrono::milliseconds purgePeriod = REPOSITORY_PURGE_PERIOD)
-      : core::SerializableComponent(repo_name), VolatileRepository(repo_name.length() > 0 ? repo_name : core::getClassName<VolatileRepository>(), "", maxPartitionMillis, maxPartitionBytes, purgePeriod) { // NOLINT
-    purge_required_ = false;
+    : VolatileRepository(repo_name.length() > 0 ? repo_name : core::getClassName<VolatileRepository>(), "", maxPartitionMillis, maxPartitionBytes, purgePeriod) {
   }
 
   ~VolatileProvenanceRepository() override {
@@ -60,11 +53,4 @@ class VolatileProvenanceRepository : public VolatileRepository<std::string, core
   std::thread thread_;
 };
 
-}  // namespace repository
-}  // namespace core
-}  // namespace minifi
-}  // namespace nifi
-}  // namespace apache
-}  // namespace org
-
-#endif  // LIBMINIFI_INCLUDE_CORE_REPOSITORY_VOLATILEPROVENANCEREPOSITORY_H_
+}  // namespace org::apache::nifi::minifi::core::repository
diff --git a/libminifi/include/core/repository/VolatileRepository.h b/libminifi/include/core/repository/VolatileRepository.h
index 097f551c5..701e5896f 100644
--- a/libminifi/include/core/repository/VolatileRepository.h
+++ b/libminifi/include/core/repository/VolatileRepository.h
@@ -15,8 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-#ifndef LIBMINIFI_INCLUDE_CORE_REPOSITORY_VOLATILEREPOSITORY_H_
-#define LIBMINIFI_INCLUDE_CORE_REPOSITORY_VOLATILEREPOSITORY_H_
+#pragma once
 
 #include <chrono>
 #include <limits>
@@ -30,47 +29,26 @@
 #include "AtomicRepoEntries.h"
 #include "Connection.h"
 #include "core/Core.h"
-#include "core/Repository.h"
+#include "core/ThreadedRepository.h"
 #include "core/SerializableComponent.h"
 #include "utils/StringUtils.h"
+#include "VolatileRepositoryData.h"
 
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace core {
-namespace repository {
+namespace org::apache::nifi::minifi::core::repository {
 
-/**
- * Flow File repository
- * Design: Extends Repository and implements the run function, using RocksDB as the primary substrate.
- */
-template<typename KeyType, typename RepositoryType = core::Repository>
-class VolatileRepository : public RepositoryType {
+class VolatileRepository : public core::ThreadedRepository {
  public:
-  static const char *volatile_repo_max_count;
-  static const char *volatile_repo_max_bytes;
-
   explicit VolatileRepository(std::string repo_name = "",
                               std::string /*dir*/ = REPOSITORY_DIRECTORY,
                               std::chrono::milliseconds maxPartitionMillis = MAX_REPOSITORY_ENTRY_LIFE_TIME,
                               int64_t maxPartitionBytes = MAX_REPOSITORY_STORAGE_SIZE,
                               std::chrono::milliseconds purgePeriod = REPOSITORY_PURGE_PERIOD)
-      : core::SerializableComponent(repo_name),
-        RepositoryType(repo_name.length() > 0 ? repo_name : core::getClassName<VolatileRepository>(), "", maxPartitionMillis, maxPartitionBytes, purgePeriod),
-        current_size_(0),
-        current_index_(0),
-        max_count_(10000),
-        max_size_(static_cast<size_t>(maxPartitionBytes * 0.75)),
-        logger_(logging::LoggerFactory<VolatileRepository>::getLogger()) {
-    purge_required_ = false;
+    : core::ThreadedRepository(repo_name.length() > 0 ? repo_name : core::getClassName<VolatileRepository>(), "", maxPartitionMillis, maxPartitionBytes, purgePeriod),
+      repo_data_(10000, static_cast<size_t>(maxPartitionBytes * 0.75)),
+      current_index_(0),
+      logger_(logging::LoggerFactory<VolatileRepository>::getLogger()) {
   }
 
-  ~VolatileRepository() override;
-
-  /**
-   * Initialize the volatile repository
-   **/
   bool initialize(const std::shared_ptr<Configure> &configure) override;
 
   bool isNoop() const override {
@@ -82,26 +60,26 @@ class VolatileRepository : public RepositoryType {
    * @param key key to add to the repository
    * @param buf buffer
    **/
-  bool Put(const KeyType& key, const uint8_t *buf, size_t bufLen) override;
+  bool Put(const std::string& key, const uint8_t *buf, size_t bufLen) override;
 
   /**
    * Places new objects into the volatile memory area
    * @param data the key-value pairs to add to the repository
    **/
-  bool MultiPut(const std::vector<std::pair<KeyType, std::unique_ptr<io::BufferStream>>>& data) override;
+  bool MultiPut(const std::vector<std::pair<std::string, std::unique_ptr<io::BufferStream>>>& data) override;
 
   /**
    * Deletes the key
    * @return status of the delete operation
    */
-  bool Delete(const KeyType& key) override;
+  bool Delete(const std::string& key) override;
 
   /**
    * Sets the value from the provided key. Once the item is retrieved
    * it may not be retrieved again.
    * @return status of the get operation.
    */
-  bool Get(const KeyType& key, std::string &value) override;
+  bool Get(const std::string& key, std::string &value) override;
   /**
    * Deserializes objects into store
    * @param store vector in which we will store newly created objects.
@@ -123,251 +101,22 @@ class VolatileRepository : public RepositoryType {
   void loadComponent(const std::shared_ptr<core::ContentRepository> &content_repo) override;
 
   uint64_t getRepoSize() const override {
-    return current_size_;
+    return repo_data_.current_size;
   }
 
  protected:
-  virtual void emplace(RepoValue<KeyType> &old_value) {
+  virtual void emplace(RepoValue<std::string> &old_value) {
     std::lock_guard<std::mutex> lock(purge_mutex_);
     purge_list_.push_back(old_value.getKey());
   }
 
-  // current size of the volatile repo.
-  std::atomic<size_t> current_size_;
-  // current index.
+  VolatileRepositoryData repo_data_;
   std::atomic<uint32_t> current_index_;
-  // value vector that exists for non blocking iteration over
-  // objects that store data for this repo instance.
-  std::vector<AtomicEntry<KeyType>*> value_vector_;
-
-  // max count we are allowed to store.
-  uint32_t max_count_;
-  // maximum estimated size
-  size_t max_size_;
-
-  bool purge_required_;
-
   std::mutex purge_mutex_;
-  // purge list
-  std::vector<KeyType> purge_list_;
+  std::vector<std::string> purge_list_;
 
  private:
   std::shared_ptr<logging::Logger> logger_;
 };
 
-template<typename KeyType, typename RepositoryType>
-const char *VolatileRepository<KeyType, RepositoryType>::volatile_repo_max_count = "max.count";
-template<typename KeyType, typename RepositoryType>
-const char *VolatileRepository<KeyType, RepositoryType>::volatile_repo_max_bytes = "max.bytes";
-
-template<typename KeyType, typename RepositoryType>
-void VolatileRepository<KeyType, RepositoryType>::loadComponent(const std::shared_ptr<core::ContentRepository>& /*content_repo*/) {
-}
-
-// Destructor
-template<typename KeyType, typename RepositoryType>
-VolatileRepository<KeyType, RepositoryType>::~VolatileRepository() {
-  for (auto ent : value_vector_) {
-    delete ent;
-  }
-}
-
-/**
- * Initialize the volatile repository
- **/
-template<typename KeyType, typename RepositoryType>
-bool VolatileRepository<KeyType, RepositoryType>::initialize(const std::shared_ptr<Configure> &configure) {
-  std::string value = "";
-
-  if (configure != nullptr) {
-    int64_t max_cnt = 0;
-    std::stringstream strstream;
-    strstream << Configure::nifi_volatile_repository_options << RepositoryType::getName() << "." << volatile_repo_max_count;
-    if (configure->get(strstream.str(), value)) {
-      if (core::Property::StringToInt(value, max_cnt)) {
-        max_count_ = gsl::narrow<uint32_t>(max_cnt);
-      }
-    }
-
-    strstream.str("");
-    strstream.clear();
-    int64_t max_bytes = 0;
-    strstream << Configure::nifi_volatile_repository_options << RepositoryType::getName() << "." << volatile_repo_max_bytes;
-    if (configure->get(strstream.str(), value)) {
-      if (core::Property::StringToInt(value, max_bytes)) {
-        if (max_bytes <= 0) {
-          max_size_ = std::numeric_limits<uint32_t>::max();
-        } else {
-          max_size_ = gsl::narrow<size_t>(max_bytes);
-        }
-      }
-    }
-  }
-
-  logging::LOG_INFO(logger_) << "Resizing value_vector_ for " << RepositoryType::getName() << " count is " << max_count_;
-  logging::LOG_INFO(logger_) << "Using a maximum size for " << RepositoryType::getName() << " of  " << max_size_;
-  value_vector_.reserve(max_count_);
-  for (uint32_t i = 0; i < max_count_; i++) {
-    value_vector_.emplace_back(new AtomicEntry<KeyType>(&current_size_, &max_size_));
-  }
-  return true;
-}
-
-/**
- * Places a new object into the volatile memory area
- * @param key key to add to the repository
- * @param buf buffer
- **/
-template<typename KeyType, typename RepositoryType>
-bool VolatileRepository<KeyType, RepositoryType>::Put(const KeyType& key, const uint8_t *buf, size_t bufLen) {
-  RepoValue<KeyType> new_value(key, buf, bufLen);
-
-  const size_t size = new_value.size();
-  bool updated = false;
-  size_t reclaimed_size = 0;
-  RepoValue<KeyType> old_value;
-  do {
-    uint32_t private_index = current_index_.fetch_add(1);
-    // round robin through the beginning
-    if (private_index >= max_count_) {
-      uint32_t new_index = private_index + 1;
-      if (current_index_.compare_exchange_weak(new_index, 1)) {
-        private_index = 0;
-      } else {
-        continue;
-      }
-    }
-
-    updated = value_vector_.at(private_index)->setRepoValue(new_value, old_value, reclaimed_size);
-    logger_->log_debug("Set repo value at %u out of %u updated %u current_size %u, adding %u to  %u", private_index, max_count_, updated == true, reclaimed_size, size, current_size_.load());
-    if (updated && reclaimed_size > 0) {
-      emplace(old_value);
-    }
-    if (reclaimed_size > 0) {
-      /**
-       * this is okay since current_size_ is really an estimate.
-       * we don't need precise counts.
-       */
-      if (current_size_ < reclaimed_size) {
-        current_size_ = 0;
-      } else {
-        current_size_ -= reclaimed_size;
-      }
-    }
-  } while (!updated);
-  current_size_ += size;
-
-  logger_->log_debug("VolatileRepository -- put %zu %" PRIu32, current_size_.load(), current_index_.load());
-  return true;
-}
-
-template<typename KeyType, typename RepositoryType>
-bool VolatileRepository<KeyType, RepositoryType>::MultiPut(const std::vector<std::pair<KeyType, std::unique_ptr<io::BufferStream>>>& data) {
-  for (const auto& item : data) {
-    if (!Put(item.first, item.second->getBuffer().template as_span<const uint8_t>().data(), item.second->size())) {
-      return false;
-    }
-  }
-  return true;
-}
-
-/**
- * Deletes the key
- * @return status of the delete operation
- */
-template<typename KeyType, typename RepositoryType>
-bool VolatileRepository<KeyType, RepositoryType>::Delete(const KeyType& key) {
-  logger_->log_debug("Delete from volatile");
-  for (auto ent : value_vector_) {
-    // let the destructor do the cleanup
-    RepoValue<KeyType> value;
-    if (ent->getValue(key, value)) {
-      current_size_ -= value.size();
-      logger_->log_debug("Delete and pushed into purge_list from volatile");
-      emplace(value);
-      return true;
-    }
-  }
-  return false;
-}
-/**
- * Sets the value from the provided key. Once the item is retrieved
- * it may not be retrieved again.
- * @return status of the get operation.
- */
-template<typename KeyType, typename RepositoryType>
-bool VolatileRepository<KeyType, RepositoryType>::Get(const KeyType &key, std::string &value) {
-  for (auto ent : value_vector_) {
-    // let the destructor do the cleanup
-    RepoValue<KeyType> repo_value;
-    if (ent->getValue(key, repo_value)) {
-      current_size_ -= value.size();
-      repo_value.emplace(value);
-      return true;
-    }
-  }
-  return false;
-}
-
-template<typename KeyType, typename RepositoryType>
-bool VolatileRepository<KeyType, RepositoryType>::DeSerialize(std::vector<std::shared_ptr<core::SerializableComponent>> &store,
-                                                      size_t &max_size, std::function<std::shared_ptr<core::SerializableComponent>()> lambda) {
-  size_t requested_batch = max_size;
-  max_size = 0;
-  for (auto ent : value_vector_) {
-    // let the destructor do the cleanup
-    RepoValue<KeyType> repo_value;
-
-    if (ent->getValue(repo_value)) {
-      std::shared_ptr<core::SerializableComponent> newComponent = lambda();
-      // we've taken ownership of this repo value
-      newComponent->DeSerialize(repo_value.getBuffer());
-
-      store.push_back(newComponent);
-
-      current_size_ -= repo_value.getBuffer().size();
-
-      if (max_size++ >= requested_batch) {
-        break;
-      }
-    }
-  }
-  if (max_size > 0) {
-    return true;
-  } else {
-    return false;
-  }
-}
-
-template<typename KeyType, typename RepositoryType>
-bool VolatileRepository<KeyType, RepositoryType>::DeSerialize(std::vector<std::shared_ptr<core::SerializableComponent>> &store, size_t &max_size) {
-  logger_->log_debug("VolatileRepository -- DeSerialize %u", current_size_.load());
-  max_size = 0;
-  for (auto ent : value_vector_) {
-    // let the destructor do the cleanup
-    RepoValue<KeyType> repo_value;
-
-    if (ent->getValue(repo_value)) {
-      // we've taken ownership of this repo value
-      store.at(max_size)->DeSerialize(repo_value.getBuffer());
-      current_size_ -= repo_value.getBuffer().size();
-      if (max_size++ >= store.size()) {
-        break;
-      }
-    }
-  }
-  if (max_size > 0) {
-    return true;
-  } else {
-    return false;
-  }
-}
-
-}  // namespace repository
-}  // namespace core
-}  // namespace minifi
-}  // namespace nifi
-}  // namespace apache
-}  // namespace org
-
-#endif  // LIBMINIFI_INCLUDE_CORE_REPOSITORY_VOLATILEREPOSITORY_H_
+}  // namespace org::apache::nifi::minifi::core::repository
diff --git a/libminifi/include/core/repository/VolatileRepositoryData.h b/libminifi/include/core/repository/VolatileRepositoryData.h
new file mode 100644
index 000000000..a428126c5
--- /dev/null
+++ b/libminifi/include/core/repository/VolatileRepositoryData.h
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <vector>
+#include <atomic>
+#include <string>
+#include <memory>
+
+#include "AtomicRepoEntries.h"
+#include "properties/Configure.h"
+
+namespace org::apache::nifi::minifi::core::repository {
+
+static constexpr const char *VOLATILE_REPO_MAX_COUNT = "max.count";
+static constexpr const char *VOLATILE_REPO_MAX_BYTES = "max.bytes";
+
+struct VolatileRepositoryData {
+  VolatileRepositoryData(uint32_t max_count, size_t max_size);
+  ~VolatileRepositoryData();
+
+  void initialize(const std::shared_ptr<Configure> &configure, const std::string& repo_name);
+  void clear();
+  // current size of the volatile repo.
+  std::atomic<size_t> current_size;
+  // value vector that exists for non blocking iteration over
+  // objects that store data for this repo instance.
+  std::vector<AtomicEntry<std::string>*> value_vector;
+  // max count we are allowed to store.
+  uint32_t max_count;
+  // maximum estimated size
+  size_t max_size;
+};
+
+}  // namespace org::apache::nifi::minifi::core::repository
diff --git a/libminifi/src/core/RepositoryFactory.cpp b/libminifi/src/core/RepositoryFactory.cpp
index fd1cb305e..8455043cb 100644
--- a/libminifi/src/core/RepositoryFactory.cpp
+++ b/libminifi/src/core/RepositoryFactory.cpp
@@ -30,8 +30,7 @@ using namespace std::literals::chrono_literals;
 
 namespace org::apache::nifi::minifi::core {
 
-std::unique_ptr<core::ContentRepository>
-createContentRepository(const std::string& configuration_class_name, bool fail_safe, const std::string& repo_name) {
+std::unique_ptr<core::ContentRepository> createContentRepository(const std::string& configuration_class_name, bool fail_safe, const std::string& repo_name) {
   std::string class_name_lc = configuration_class_name;
   std::transform(class_name_lc.begin(), class_name_lc.end(), class_name_lc.begin(), ::tolower);
   try {
@@ -62,8 +61,7 @@ createContentRepository(const std::string& configuration_class_name, bool fail_s
 class NoOpThreadedRepository : public core::ThreadedRepository {
  public:
   explicit NoOpThreadedRepository(std::string repo_name)
-          : core::SerializableComponent(repo_name),
-            ThreadedRepository(std::move(repo_name)) {
+    : ThreadedRepository(std::move(repo_name)) {
   }
 
   ~NoOpThreadedRepository() override {
diff --git a/libminifi/src/core/repository/VolatileContentRepository.cpp b/libminifi/src/core/repository/VolatileContentRepository.cpp
index db642e973..822433bd8 100644
--- a/libminifi/src/core/repository/VolatileContentRepository.cpp
+++ b/libminifi/src/core/repository/VolatileContentRepository.cpp
@@ -34,7 +34,10 @@ namespace org::apache::nifi::minifi::core::repository {
 const char *VolatileContentRepository::minimal_locking = "minimal.locking";
 
 bool VolatileContentRepository::initialize(const std::shared_ptr<Configure> &configure) {
-  VolatileRepository::initialize(configure);
+  repo_data_.initialize(configure, getName());
+
+  logging::LOG_INFO(logger_) << "Resizing repo_data_.value_vector for " << getName() << " count is " << repo_data_.max_count;
+  logging::LOG_INFO(logger_) << "Using a maximum size for " << getName() << " of  " << repo_data_.max_size;
 
   if (configure != nullptr) {
     std::string value;
@@ -45,10 +48,7 @@ bool VolatileContentRepository::initialize(const std::shared_ptr<Configure> &con
     }
   }
   if (!minimize_locking_) {
-    for (auto ent : value_vector_) {
-      delete ent;
-    }
-    value_vector_.clear();
+    repo_data_.clear();
   }
 
   return true;
@@ -71,7 +71,7 @@ std::shared_ptr<io::BaseStream> VolatileContentRepository::write(const minifi::R
 
   int size = 0;
   if (LIKELY(minimize_locking_ == true)) {
-    for (auto ent : value_vector_) {
+    for (auto ent : repo_data_.value_vector) {
       if (ent->testAndSetKey(claim.getContentFullPath())) {
         std::lock_guard<std::mutex> lock(map_mutex_);
         master_list_[claim.getContentFullPath()] = ent;
@@ -86,7 +86,7 @@ std::shared_ptr<io::BaseStream> VolatileContentRepository::write(const minifi::R
     if (claim_check != master_list_.end()) {
       return std::make_shared<io::AtomicEntryStream<ResourceClaim::Path>>(claim.getContentFullPath(), claim_check->second);
     } else {
-      auto *ent = new AtomicEntry<ResourceClaim::Path>(&current_size_, &max_size_);
+      auto *ent = new AtomicEntry<ResourceClaim::Path>(&repo_data_.current_size, &repo_data_.max_size);
       if (ent->testAndSetKey(claim.getContentFullPath())) {
         master_list_[claim.getContentFullPath()] = ent;
         return std::make_shared<io::AtomicEntryStream<ResourceClaim::Path>>(claim.getContentFullPath(), ent);
@@ -149,7 +149,7 @@ bool VolatileContentRepository::remove(const minifi::ResourceClaim &claim) {
       auto size = claim_item->second->getLength();
       delete claim_item->second;
       master_list_.erase(claim.getContentFullPath());
-      current_size_ -= size;
+      repo_data_.current_size -= size;
     }
     return true;
   }
diff --git a/libminifi/src/core/repository/VolatileRepository.cpp b/libminifi/src/core/repository/VolatileRepository.cpp
index 6e3e1c698..8bbd81b48 100644
--- a/libminifi/src/core/repository/VolatileRepository.cpp
+++ b/libminifi/src/core/repository/VolatileRepository.cpp
@@ -23,16 +23,138 @@
 #include <vector>
 #include "FlowFileRecord.h"
 
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace core {
-namespace repository {
-
-} /* namespace repository */
-} /* namespace core */
-} /* namespace minifi */
-} /* namespace nifi */
-} /* namespace apache */
-} /* namespace org */
+namespace org::apache::nifi::minifi::core::repository {
+
+void VolatileRepository::loadComponent(const std::shared_ptr<core::ContentRepository>& /*content_repo*/) {
+}
+
+bool VolatileRepository::initialize(const std::shared_ptr<Configure> &configure) {
+  repo_data_.initialize(configure, core::ThreadedRepository::getName());
+
+  logging::LOG_INFO(logger_) << "Resizing value_vector for " << core::ThreadedRepository::getName() << " count is " << repo_data_.max_count;
+  logging::LOG_INFO(logger_) << "Using a maximum size for " << core::ThreadedRepository::getName() << " of  " << repo_data_.max_size;
+  return true;
+}
+
+bool VolatileRepository::Put(const std::string& key, const uint8_t *buf, size_t bufLen) {
+  RepoValue<std::string> new_value(key, buf, bufLen);
+
+  const size_t size = new_value.size();
+  bool updated = false;
+  size_t reclaimed_size = 0;
+  RepoValue<std::string> old_value;
+  do {
+    uint32_t private_index = current_index_.fetch_add(1);
+    // round robin through the beginning
+    if (private_index >= repo_data_.max_count) {
+      uint32_t new_index = private_index + 1;
+      if (current_index_.compare_exchange_weak(new_index, 1)) {
+        private_index = 0;
+      } else {
+        continue;
+      }
+    }
+
+    updated = repo_data_.value_vector.at(private_index)->setRepoValue(new_value, old_value, reclaimed_size);
+    logger_->log_debug("Set repo value at %u out of %u updated %u current_size %u, adding %u to  %u",
+      private_index, repo_data_.max_count, updated, reclaimed_size, size, repo_data_.current_size.load());
+    if (updated && reclaimed_size > 0) {
+      emplace(old_value);
+    }
+    if (reclaimed_size > 0) {
+      /**
+       * this is okay since current_size is really an estimate.
+       * we don't need precise counts.
+       */
+      if (repo_data_.current_size < reclaimed_size) {
+        repo_data_.current_size = 0;
+      } else {
+        repo_data_.current_size -= reclaimed_size;
+      }
+    }
+  } while (!updated);
+  repo_data_.current_size += size;
+
+  logger_->log_debug("VolatileRepository -- put %zu %" PRIu32, repo_data_.current_size.load(), current_index_.load());
+  return true;
+}
+
+bool VolatileRepository::MultiPut(const std::vector<std::pair<std::string, std::unique_ptr<io::BufferStream>>>& data) {
+  for (const auto& item : data) {
+    if (!Put(item.first, item.second->getBuffer().template as_span<const uint8_t>().data(), item.second->size())) {
+      return false;
+    }
+  }
+  return true;
+}
+
+bool VolatileRepository::Delete(const std::string& key) {
+  logger_->log_debug("Delete from volatile");
+  for (auto ent : repo_data_.value_vector) {
+    // let the destructor do the cleanup
+    RepoValue<std::string> value;
+    if (ent->getValue(key, value)) {
+      repo_data_.current_size -= value.size();
+      logger_->log_debug("Delete and pushed into purge_list from volatile");
+      emplace(value);
+      return true;
+    }
+  }
+  return false;
+}
+
+bool VolatileRepository::Get(const std::string &key, std::string &value) {
+  for (auto ent : repo_data_.value_vector) {
+    // let the destructor do the cleanup
+    RepoValue<std::string> repo_value;
+    if (ent->getValue(key, repo_value)) {
+      repo_data_.current_size -= value.size();
+      repo_value.emplace(value);
+      return true;
+    }
+  }
+  return false;
+}
+
+bool VolatileRepository::DeSerialize(std::vector<std::shared_ptr<core::SerializableComponent>> &store,
+                                     size_t &max_size, std::function<std::shared_ptr<core::SerializableComponent>()> lambda) {
+  size_t requested_batch = max_size;
+  max_size = 0;
+  for (auto ent : repo_data_.value_vector) {
+    // let the destructor do the cleanup
+    RepoValue<std::string> repo_value;
+
+    if (ent->getValue(repo_value)) {
+      std::shared_ptr<core::SerializableComponent> newComponent = lambda();
+      // we've taken ownership of this repo value
+      newComponent->DeSerialize(repo_value.getBuffer());
+      store.push_back(newComponent);
+      repo_data_.current_size -= repo_value.getBuffer().size();
+      if (max_size++ >= requested_batch) {
+        break;
+      }
+    }
+  }
+  return max_size > 0;
+}
+
+bool VolatileRepository::DeSerialize(std::vector<std::shared_ptr<core::SerializableComponent>> &store, size_t &max_size) {
+  logger_->log_debug("VolatileRepository -- DeSerialize %u", repo_data_.current_size.load());
+  max_size = 0;
+  for (auto ent : repo_data_.value_vector) {
+    // let the destructor do the cleanup
+    RepoValue<std::string> repo_value;
+
+    if (ent->getValue(repo_value)) {
+      // we've taken ownership of this repo value
+      store.at(max_size)->DeSerialize(repo_value.getBuffer());
+      repo_data_.current_size -= repo_value.getBuffer().size();
+      if (max_size++ >= store.size()) {
+        break;
+      }
+    }
+  }
+  return max_size > 0;
+}
+
+}  // namespace org::apache::nifi::minifi::core::repository
diff --git a/libminifi/src/core/repository/VolatileRepositoryData.cpp b/libminifi/src/core/repository/VolatileRepositoryData.cpp
new file mode 100644
index 000000000..a5ab41aff
--- /dev/null
+++ b/libminifi/src/core/repository/VolatileRepositoryData.cpp
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "core/repository/VolatileRepositoryData.h"
+
+#include "core/Property.h"
+#include "utils/gsl.h"
+
+namespace org::apache::nifi::minifi::core::repository {
+
+VolatileRepositoryData::VolatileRepositoryData(uint32_t max_count, size_t max_size)
+  : current_size(0),
+    max_count(max_count),
+    max_size(max_size) {
+}
+
+VolatileRepositoryData::~VolatileRepositoryData() {
+  clear();
+}
+
+void VolatileRepositoryData::clear() {
+  for (auto ent : value_vector) {
+    delete ent;
+  }
+  value_vector.clear();
+}
+
+void VolatileRepositoryData::initialize(const std::shared_ptr<Configure> &configure, const std::string& repo_name) {
+  if (configure != nullptr) {
+    int64_t max_cnt = 0;
+    std::stringstream strstream;
+    strstream << Configure::nifi_volatile_repository_options << repo_name << "." << VOLATILE_REPO_MAX_COUNT;
+    std::string value;
+    if (configure->get(strstream.str(), value)) {
+      if (core::Property::StringToInt(value, max_cnt)) {
+        max_count = gsl::narrow<uint32_t>(max_cnt);
+      }
+    }
+
+    strstream.str("");
+    strstream.clear();
+    int64_t max_bytes = 0;
+    strstream << Configure::nifi_volatile_repository_options << repo_name << "." << VOLATILE_REPO_MAX_BYTES;
+    if (configure->get(strstream.str(), value)) {
+      if (core::Property::StringToInt(value, max_bytes)) {
+        if (max_bytes <= 0) {
+          max_size = std::numeric_limits<uint32_t>::max();
+        } else {
+          max_size = gsl::narrow<size_t>(max_bytes);
+        }
+      }
+    }
+  }
+
+  value_vector.reserve(max_count);
+  for (uint32_t i = 0; i < max_count; i++) {
+    value_vector.emplace_back(new AtomicEntry<std::string>(&current_size, &max_size));
+  }
+}
+
+}  // namespace org::apache::nifi::minifi::core::repository
diff --git a/libminifi/test/flow-tests/SessionTests.cpp b/libminifi/test/flow-tests/SessionTests.cpp
index d8bdaaa82..59cd8e26b 100644
--- a/libminifi/test/flow-tests/SessionTests.cpp
+++ b/libminifi/test/flow-tests/SessionTests.cpp
@@ -60,7 +60,6 @@ TEST_CASE("Import null data") {
   LogTestController::getInstance().setTrace<minifi::ResourceClaim>();
   LogTestController::getInstance().setTrace<minifi::FlowFileRecord>();
   LogTestController::getInstance().setTrace<core::repository::FlowFileRepository>();
-  LogTestController::getInstance().setTrace<core::repository::VolatileRepository<minifi::ResourceClaim::Path>>();
   LogTestController::getInstance().setTrace<core::repository::DatabaseContentRepository>();
 
   auto dir = testController.createTempDirectory();
diff --git a/libminifi/test/persistence-tests/PersistenceTests.cpp b/libminifi/test/persistence-tests/PersistenceTests.cpp
index ef24382fa..530aa06c6 100644
--- a/libminifi/test/persistence-tests/PersistenceTests.cpp
+++ b/libminifi/test/persistence-tests/PersistenceTests.cpp
@@ -277,7 +277,6 @@ TEST_CASE("Persisted flowFiles are updated on modification", "[TestP1]") {
   LogTestController::getInstance().setTrace<minifi::ResourceClaim>();
   LogTestController::getInstance().setTrace<minifi::FlowFileRecord>();
   LogTestController::getInstance().setTrace<core::repository::FlowFileRepository>();
-  LogTestController::getInstance().setTrace<core::repository::VolatileRepository<minifi::ResourceClaim::Path>>();
   LogTestController::getInstance().setTrace<core::repository::DatabaseContentRepository>();
 
   auto dir = testController.createTempDirectory();
diff --git a/libminifi/test/rocksdb-tests/RepoTests.cpp b/libminifi/test/rocksdb-tests/RepoTests.cpp
index 7b9048616..e280c0432 100644
--- a/libminifi/test/rocksdb-tests/RepoTests.cpp
+++ b/libminifi/test/rocksdb-tests/RepoTests.cpp
@@ -361,9 +361,9 @@ TEST_CASE("Flush deleted flowfiles before shutdown", "[TestFFR7]") {
   class TestFlowFileRepository: public core::repository::FlowFileRepository{
    public:
     explicit TestFlowFileRepository(const std::string& name)
-        : core::SerializableComponent(name),
-          FlowFileRepository(name, REPOTEST_FLOWFILE_CHECKPOINT_DIR, core::repository::FLOWFILE_REPOSITORY_DIRECTORY,
-                             10min, core::repository::MAX_FLOWFILE_REPOSITORY_STORAGE_SIZE, 1ms) {}
+      : FlowFileRepository(name, REPOTEST_FLOWFILE_CHECKPOINT_DIR, core::repository::FLOWFILE_REPOSITORY_DIRECTORY,
+                           10min, core::repository::MAX_FLOWFILE_REPOSITORY_STORAGE_SIZE, 1ms) {}
+
     void flush() override {
       FlowFileRepository::flush();
       if (onFlush_) {
diff --git a/libminifi/test/rocksdb-tests/SwapTests.cpp b/libminifi/test/rocksdb-tests/SwapTests.cpp
index eeb0a167c..8da38679f 100644
--- a/libminifi/test/rocksdb-tests/SwapTests.cpp
+++ b/libminifi/test/rocksdb-tests/SwapTests.cpp
@@ -83,7 +83,6 @@ TEST_CASE("Connection will on-demand swap flow files") {
   LogTestController::getInstance().setTrace<minifi::utils::FlowFileQueue>();
   LogTestController::getInstance().setTrace<minifi::FlowFileLoader>();
   LogTestController::getInstance().setTrace<core::repository::FlowFileRepository>();
-  LogTestController::getInstance().setTrace<core::repository::VolatileRepository<minifi::ResourceClaim::Path>>();
 
   auto dir = testController.createTempDirectory();
 
diff --git a/libminifi/test/unit/ProcessSessionTests.cpp b/libminifi/test/unit/ProcessSessionTests.cpp
index ff4a78645..ff737f1de 100644
--- a/libminifi/test/unit/ProcessSessionTests.cpp
+++ b/libminifi/test/unit/ProcessSessionTests.cpp
@@ -135,7 +135,7 @@ struct VolatileFlowFileRepositoryTestAccessor {
 
 class TestVolatileFlowFileRepository : public core::repository::VolatileFlowFileRepository {
  public:
-  explicit TestVolatileFlowFileRepository(const std::string& name) : core::SerializableComponent(name) {}
+  explicit TestVolatileFlowFileRepository(const std::string& name) : core::repository::VolatileFlowFileRepository(name) {}
 
   bool MultiPut(const std::vector<std::pair<std::string, std::unique_ptr<minifi::io::BufferStream>>>& data) override {
     auto flush_on_exit = gsl::finally([&] {VolatileFlowFileRepositoryTestAccessor::call_flush(*this);});
diff --git a/libminifi/test/unit/ProvenanceTestHelper.h b/libminifi/test/unit/ProvenanceTestHelper.h
index d51ad27e7..6da7d14e1 100644
--- a/libminifi/test/unit/ProvenanceTestHelper.h
+++ b/libminifi/test/unit/ProvenanceTestHelper.h
@@ -42,8 +42,7 @@ template <typename T_BaseRepository>
 class TestRepositoryBase : public T_BaseRepository {
  public:
   TestRepositoryBase()
-      : org::apache::nifi::minifi::core::SerializableComponent("repo_name"),
-        T_BaseRepository("repo_name", "./dir", 1s, 100, 0ms) {
+    : T_BaseRepository("repo_name", "./dir", 1s, 100, 0ms) {
   }
 
   bool initialize(const std::shared_ptr<org::apache::nifi::minifi::Configure> &) override {
@@ -140,10 +139,6 @@ class TestRepositoryBase : public T_BaseRepository {
 
 class TestRepository : public TestRepositoryBase<org::apache::nifi::minifi::core::Repository> {
  public:
-  TestRepository()
-    : org::apache::nifi::minifi::core::SerializableComponent("repo_name") {
-  }
-
   bool start() override {
     return true;
   }
@@ -155,10 +150,6 @@ class TestRepository : public TestRepositoryBase<org::apache::nifi::minifi::core
 
 class TestThreadedRepository : public TestRepositoryBase<org::apache::nifi::minifi::core::ThreadedRepository> {
  public:
-  TestThreadedRepository()
-    : org::apache::nifi::minifi::core::SerializableComponent("repo_name") {
-  }
-
   ~TestThreadedRepository() override {
     stop();
   }
@@ -178,8 +169,7 @@ class TestThreadedRepository : public TestRepositoryBase<org::apache::nifi::mini
 class TestFlowRepository : public org::apache::nifi::minifi::core::ThreadedRepository {
  public:
   TestFlowRepository()
-      : org::apache::nifi::minifi::core::SerializableComponent("ff"),
-        org::apache::nifi::minifi::core::ThreadedRepository("ff", "./dir", 1s, 100, 0ms) {
+    : org::apache::nifi::minifi::core::ThreadedRepository("ff", "./dir", 1s, 100, 0ms) {
   }
 
   bool initialize(const std::shared_ptr<org::apache::nifi::minifi::Configure> &) override {
diff --git a/libminifi/test/unit/SwapTestController.h b/libminifi/test/unit/SwapTestController.h
index 954746664..c092601ee 100644
--- a/libminifi/test/unit/SwapTestController.h
+++ b/libminifi/test/unit/SwapTestController.h
@@ -55,9 +55,6 @@ struct SwapEvent {
 
 class SwappingFlowFileTestRepo : public TestFlowRepository, public minifi::SwapManager {
  public:
-  SwappingFlowFileTestRepo()
-      : core::SerializableComponent("ff") {}
-
   void store(std::vector<std::shared_ptr<core::FlowFile>> flow_files) override {
     std::vector<minifi::SwappedFlowFile> ids;
     for (const auto& ff : flow_files) {