You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by lo...@apache.org on 2023/01/11 15:14:06 UTC

[nifi-minifi-cpp] branch MINIFICPP-2028 updated (8c899cc2f -> 83e0af6c5)

This is an automated email from the ASF dual-hosted git repository.

lordgamez pushed a change to branch MINIFICPP-2028
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git


    omit 8c899cc2f MINIFICPP-2028 Remove SerializableComponent dependency from Repository
     new 83e0af6c5 MINIFICPP-2028 Remove SerializableComponent dependency from Repository

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (8c899cc2f)
            \
             N -- N -- N   refs/heads/MINIFICPP-2028 (83e0af6c5)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 libminifi/include/provenance/Provenance.h | 47 ++++++++++++++---------------
 libminifi/src/provenance/Provenance.cpp   | 49 +++++++++++++++----------------
 2 files changed, 48 insertions(+), 48 deletions(-)


[nifi-minifi-cpp] 01/01: MINIFICPP-2028 Remove SerializableComponent dependency from Repository

Posted by lo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lordgamez pushed a commit to branch MINIFICPP-2028
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit 83e0af6c58de86bfc7660c5465aaade9a37d988c
Author: Gabor Gyimesi <ga...@gmail.com>
AuthorDate: Mon Jan 9 14:24:56 2023 +0100

    MINIFICPP-2028 Remove SerializableComponent dependency from Repository
---
 extensions/rocksdb-repos/ProvenanceRepository.cpp  |  42 +---
 extensions/rocksdb-repos/ProvenanceRepository.h    |   7 +-
 .../tests/unit/ProcessorTests.cpp                  |   6 +-
 libminifi/include/core/Repository.h                |  74 +------
 libminifi/include/core/SerializableComponent.h     |  74 +------
 .../include/core/repository/VolatileRepository.h   |  19 --
 libminifi/include/provenance/Provenance.h          | 240 ++++++++-------------
 libminifi/src/core/Repository.cpp                  |  46 ++++
 .../SiteToSiteProvenanceReportingTask.cpp          |   5 +-
 .../src/core/repository/VolatileRepository.cpp     |  44 ----
 libminifi/src/provenance/Provenance.cpp            | 180 +++++++---------
 libminifi/test/rocksdb-tests/ProvenanceTests.cpp   | 138 ++++++------
 libminifi/test/unit/ProvenanceTestHelper.h         |  33 +--
 13 files changed, 323 insertions(+), 585 deletions(-)

diff --git a/extensions/rocksdb-repos/ProvenanceRepository.cpp b/extensions/rocksdb-repos/ProvenanceRepository.cpp
index 0c7da870d..2680de9d5 100644
--- a/extensions/rocksdb-repos/ProvenanceRepository.cpp
+++ b/extensions/rocksdb-repos/ProvenanceRepository.cpp
@@ -122,35 +122,17 @@ bool ProvenanceRepository::Get(const std::string &key, std::string &value) {
   return db_->Get(rocksdb::ReadOptions(), key, &value).ok();
 }
 
-bool ProvenanceRepository::Serialize(const std::string &key, const uint8_t *buffer, const size_t bufferSize) {
-  return Put(key, buffer, bufferSize);
-}
-
-bool ProvenanceRepository::get(std::vector<std::shared_ptr<core::CoreComponent>> &store, size_t max_size) {
-  std::unique_ptr<rocksdb::Iterator> it(db_->NewIterator(rocksdb::ReadOptions()));
-  for (it->SeekToFirst(); it->Valid(); it->Next()) {
-    std::shared_ptr<ProvenanceEventRecord> eventRead = std::make_shared<ProvenanceEventRecord>();
-    std::string key = it->key().ToString();
-    if (store.size() >= max_size)
-      break;
-    if (eventRead->DeSerialize(gsl::make_span(it->value()).as_span<const std::byte>())) {
-      store.push_back(std::dynamic_pointer_cast<core::CoreComponent>(eventRead));
-    }
-  }
-  return true;
-}
-
-bool ProvenanceRepository::DeSerialize(std::vector<std::shared_ptr<core::SerializableComponent>> &records, size_t &max_size,
-                                       std::function<std::shared_ptr<core::SerializableComponent>()> lambda) {
+bool ProvenanceRepository::getElements(std::vector<std::shared_ptr<core::SerializableComponent>> &records, size_t &max_size) {
   std::unique_ptr<rocksdb::Iterator> it(db_->NewIterator(rocksdb::ReadOptions()));
   size_t requested_batch = max_size;
   max_size = 0;
   for (it->SeekToFirst(); it->Valid(); it->Next()) {
     if (max_size >= requested_batch)
       break;
-    std::shared_ptr<core::SerializableComponent> eventRead = lambda();
+    auto eventRead = std::make_shared<ProvenanceEventRecord>();
     std::string key = it->key().ToString();
-    if (eventRead->DeSerialize(gsl::make_span(it->value()).as_span<const std::byte>())) {
+    io::BufferStream stream(gsl::make_span(it->value()).as_span<const std::byte>());
+    if (eventRead->deserialize(stream)) {
       max_size++;
       records.push_back(eventRead);
     }
@@ -158,22 +140,6 @@ bool ProvenanceRepository::DeSerialize(std::vector<std::shared_ptr<core::Seriali
   return max_size > 0;
 }
 
-bool ProvenanceRepository::DeSerialize(std::vector<std::shared_ptr<core::SerializableComponent>> &store, size_t &max_size) {
-  std::unique_ptr<rocksdb::Iterator> it(db_->NewIterator(rocksdb::ReadOptions()));
-  max_size = 0;
-  for (it->SeekToFirst(); it->Valid(); it->Next()) {
-    std::shared_ptr<ProvenanceEventRecord> eventRead = std::make_shared<ProvenanceEventRecord>();
-    std::string key = it->key().ToString();
-
-    if (store.at(max_size)->DeSerialize(gsl::make_span(it->value()).as_span<const std::byte>())) {
-      max_size++;
-    }
-    if (store.size() >= max_size)
-      break;
-  }
-  return max_size > 0;
-}
-
 void ProvenanceRepository::destroy() {
   db_.reset();
 }
diff --git a/extensions/rocksdb-repos/ProvenanceRepository.h b/extensions/rocksdb-repos/ProvenanceRepository.h
index 0dc9cb6e4..7aaeaa34e 100644
--- a/extensions/rocksdb-repos/ProvenanceRepository.h
+++ b/extensions/rocksdb-repos/ProvenanceRepository.h
@@ -79,15 +79,10 @@ class ProvenanceRepository : public core::ThreadedRepository {
   }
 
   bool Get(const std::string &key, std::string &value) override;
-
-  bool Serialize(const std::string &key, const uint8_t *buffer, const size_t bufferSize) override;
-  bool DeSerialize(std::vector<std::shared_ptr<core::SerializableComponent>> &records, size_t &max_size,
-                   std::function<std::shared_ptr<core::SerializableComponent>()> lambda) override;
-  bool DeSerialize(std::vector<std::shared_ptr<core::SerializableComponent>> &store, size_t &max_size) override;
+  bool getElements(std::vector<std::shared_ptr<core::SerializableComponent>> &records, size_t &max_size) override;
 
   void destroy();
   uint64_t getKeyCount() const;
-  virtual bool get(std::vector<std::shared_ptr<core::CoreComponent>> &store, size_t max_size);
 
   // Prevent default copy constructor and assignment operation
   // Only support pass by reference or pointer
diff --git a/extensions/standard-processors/tests/unit/ProcessorTests.cpp b/extensions/standard-processors/tests/unit/ProcessorTests.cpp
index b7afc647c..eb56c1355 100644
--- a/extensions/standard-processors/tests/unit/ProcessorTests.cpp
+++ b/extensions/standard-processors/tests/unit/ProcessorTests.cpp
@@ -48,6 +48,7 @@
 #include "utils/PropertyErrors.h"
 #include "utils/IntegrationTestUtils.h"
 #include "Utils.h"
+#include "io/BufferStream.h"
 
 TEST_CASE("Test Creation of GetFile", "[getfileCreate]") {
   TestController testController;
@@ -448,7 +449,8 @@ TEST_CASE("Test Find file", "[getfileCreate3]") {
 
   for (auto entry : repo->getRepoMap()) {
     minifi::provenance::ProvenanceEventRecord newRecord;
-    newRecord.DeSerialize(gsl::make_span(entry.second).as_span<const std::byte>());
+    minifi::io::BufferStream stream(gsl::make_span(entry.second).as_span<const std::byte>());
+    newRecord.deserialize(stream);
 
     bool found = false;
     for (const auto& provRec : records) {
@@ -475,7 +477,7 @@ TEST_CASE("Test Find file", "[getfileCreate3]") {
   processorReport->setScheduledState(core::ScheduledState::RUNNING);
   std::string jsonStr;
   std::size_t deserialized = 0;
-  repo->DeSerialize(recordsReport, deserialized);
+  repo->getElements(recordsReport, deserialized);
   std::function<void(const std::shared_ptr<core::ProcessContext> &, const std::shared_ptr<core::ProcessSession>&)> verifyReporter =
       [&](const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) {
         taskReport->getJsonReport(context, session, recordsReport, jsonStr);
diff --git a/libminifi/include/core/Repository.h b/libminifi/include/core/Repository.h
index 901efcecb..247da7351 100644
--- a/libminifi/include/core/Repository.h
+++ b/libminifi/include/core/Repository.h
@@ -54,14 +54,14 @@ constexpr auto MAX_REPOSITORY_STORAGE_SIZE = 10_MiB;
 constexpr auto MAX_REPOSITORY_ENTRY_LIFE_TIME = std::chrono::minutes(10);
 constexpr auto REPOSITORY_PURGE_PERIOD = std::chrono::milliseconds(2500);
 
-class Repository : public core::SerializableComponent {
+class Repository : public core::CoreComponent {
  public:
   explicit Repository(std::string repo_name = "Repository",
                       std::string directory = REPOSITORY_DIRECTORY,
                       std::chrono::milliseconds maxPartitionMillis = MAX_REPOSITORY_ENTRY_LIFE_TIME,
                       int64_t maxPartitionBytes = MAX_REPOSITORY_STORAGE_SIZE,
                       std::chrono::milliseconds purgePeriod = REPOSITORY_PURGE_PERIOD)
-    : core::SerializableComponent(std::move(repo_name)),
+    : core::CoreComponent(std::move(repo_name)),
       directory_(std::move(directory)),
       max_partition_millis_(maxPartitionMillis),
       max_partition_bytes_(maxPartitionBytes),
@@ -71,10 +71,12 @@ class Repository : public core::SerializableComponent {
       logger_(logging::LoggerFactory<Repository>::getLogger()) {
   }
 
-  virtual bool initialize(const std::shared_ptr<Configure>& /*configure*/) = 0;
+  virtual bool isRunning() {
+    return true;
+  }
 
+  virtual bool initialize(const std::shared_ptr<Configure>& /*configure*/) = 0;
   virtual bool start() = 0;
-
   virtual bool stop() = 0;
 
   virtual bool isNoop() const {
@@ -96,13 +98,7 @@ class Repository : public core::SerializableComponent {
     return true;
   }
 
-  virtual bool Delete(std::vector<std::shared_ptr<core::SerializableComponent>> &storedValues) {
-    bool found = true;
-    for (const auto& storedValue : storedValues) {
-      found &= Delete(storedValue->getName());
-    }
-    return found;
-  }
+  virtual bool Delete(std::vector<std::shared_ptr<core::SerializableComponent>> &storedValues);
 
   void setConnectionMap(std::map<std::string, core::Connectable*> connectionMap) {
     connection_map_ = std::move(connectionMap);
@@ -120,63 +116,11 @@ class Repository : public core::SerializableComponent {
     return repo_full_;
   }
 
-  /**
-   * Specialization that allows us to serialize max_size objects into store.
-   * the lambdaConstructor will create objects to put into store
-   * @param store vector in which we can store serialized object
-   * @param max_size reference that stores the max number of objects to retrieve and serialize.
-   * upon return max_size will represent the number of serialized objects.
-   * @return status of this operation
-   *
-   * Base implementation returns true;
-   */
-  virtual bool Serialize(std::vector<std::shared_ptr<core::SerializableComponent>>& /*store*/, size_t /*max_size*/) {
-    return true;
-  }
-
-  /**
-   * Specialization that allows us to deserialize max_size objects into store.
-   * @param store vector in which we can store deserialized object
-   * @param max_size reference that stores the max number of objects to retrieve and deserialize.
-   * upon return max_size will represent the number of deserialized objects.
-   * @return status of this operation
-   *
-   * Base implementation returns true;
-   */
-  virtual bool DeSerialize(std::vector<std::shared_ptr<core::SerializableComponent>>& /*store*/, size_t& /*max_size*/) {
-    return true;
-  }
-
-  /**
-   * Specialization that allows us to deserialize max_size objects into store.
-   * the lambdaConstructor will create objects to put into store
-   * @param store vector in which we can store deserialized object
-   * @param max_size reference that stores the max number of objects to retrieve and deserialize.
-   * upon return max_size will represent the number of deserialized objects.
-   * @param lambdaConstructor reference that will create the objects for store
-   * @return status of this operation
-   *
-   * Base implementation returns true;
-   */
-  virtual bool DeSerialize(std::vector<std::shared_ptr<core::SerializableComponent>>& /*store*/, size_t& /*max_size*/, std::function<std::shared_ptr<core::SerializableComponent>()> /*lambdaConstructor*/) { // NOLINT
-    return true;
-  }
-
-  bool Serialize(const std::shared_ptr<core::SerializableComponent>& /*store*/) override {
+  virtual bool getElements(std::vector<std::shared_ptr<core::SerializableComponent>>& /*store*/, size_t& /*max_size*/) {
     return true;
   }
 
-  bool DeSerialize(const std::shared_ptr<core::SerializableComponent>& /*store*/) override {
-    return true;
-  }
-
-  bool DeSerialize(gsl::span<const std::byte>) override {
-    return true;
-  }
-
-  bool Serialize(const std::string &key, const uint8_t *buffer, const size_t bufferSize) override {
-    return Put(key, buffer, bufferSize);
-  }
+  virtual bool storeElement(const std::shared_ptr<core::SerializableComponent> element);
 
   virtual void loadComponent(const std::shared_ptr<core::ContentRepository>& /*content_repo*/) {
   }
diff --git a/libminifi/include/core/SerializableComponent.h b/libminifi/include/core/SerializableComponent.h
index d5b0b037b..3d9480ebf 100644
--- a/libminifi/include/core/SerializableComponent.h
+++ b/libminifi/include/core/SerializableComponent.h
@@ -15,86 +15,28 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-#ifndef LIBMINIFI_INCLUDE_CORE_SERIALIZABLECOMPONENT_H_
-#define LIBMINIFI_INCLUDE_CORE_SERIALIZABLECOMPONENT_H_
+#pragma once
 
-#include <memory>
 #include <string>
 #include <utility>
 
-#include "core/Connectable.h"
 #include "core/Core.h"
 #include "utils/gsl.h"
+#include "io/InputStream.h"
+#include "io/OutputStream.h"
 
 namespace org::apache::nifi::minifi::core {
 
-/**
- * Represents a component that is serializable and an extension point of core Component
- */
-class SerializableComponent : public core::Connectable {
+class SerializableComponent : public core::CoreComponent {
  public:
   explicit SerializableComponent(std::string name)
-        : core::Connectable(std::move(name)) {
-    }
-
-  SerializableComponent(std::string name, const utils::Identifier& uuid)
-      : core::Connectable(std::move(name), uuid) {
-  }
-
-  ~SerializableComponent() override = default;
-
-  /**
-   * Serialize this object into the the store
-   * @param store object in which we are serializing data into
-   * @return status of this serialization.
-   */
-  virtual bool Serialize(const std::shared_ptr<core::SerializableComponent> &store) = 0;
-
-  /**
-   * Deserialization from the parameter store into the current object
-   * @param store from which we are deserializing the current object
-   * @return status of this deserialization.
-   */
-  virtual bool DeSerialize(const std::shared_ptr<core::SerializableComponent> &store) = 0;
-
-  /**
-   * Deserializes the current object using buffer
-   * @param buffer buffer from which we can deserialize the currenet object
-   * @param bufferSize length of buffer from which we can deserialize the current object.
-   * @return status of the deserialization.
-   */
-  virtual bool DeSerialize(gsl::span<const std::byte>) = 0;
-
-  /**
-   * Serialization of this object into buffer
-   * @param key string that represents this objects identifier
-   * @param buffer buffer that contains the serialized object
-   * @param bufferSize length of buffer
-   * @return status of serialization
-   */
-  virtual bool Serialize(const std::string& /*key*/, const uint8_t* /*buffer*/, const size_t /*bufferSize*/) {
-    return false;
+    : core::CoreComponent(std::move(name)) {
   }
 
-  void yield() override { }
-
-  /**
-   * Determines if we are connected and operating
-   */
-  bool isRunning() override {
-    return true;
-  }
+  virtual ~SerializableComponent() = default;
 
-  /**
-   * Determines if work is available by this connectable
-   * @return boolean if work is available.
-   */
-  bool isWorkAvailable() override {
-    return true;
-  }
+  virtual bool serialize(io::OutputStream& output_stream) = 0;
+  virtual bool deserialize(io::InputStream &input_stream) = 0;
 };
 
 }  // namespace org::apache::nifi::minifi::core
-
-#endif  // LIBMINIFI_INCLUDE_CORE_SERIALIZABLECOMPONENT_H_
-
diff --git a/libminifi/include/core/repository/VolatileRepository.h b/libminifi/include/core/repository/VolatileRepository.h
index 5bdf509cc..05d341f9d 100644
--- a/libminifi/include/core/repository/VolatileRepository.h
+++ b/libminifi/include/core/repository/VolatileRepository.h
@@ -82,25 +82,6 @@ class VolatileRepository : public core::ThreadedRepository {
    * @return status of the get operation.
    */
   bool Get(const std::string& key, std::string &value) override;
-  /**
-   * Deserializes objects into store
-   * @param store vector in which we will store newly created objects.
-   * @param max_size size of objects deserialized
-   */
-  bool DeSerialize(std::vector<std::shared_ptr<core::SerializableComponent>> &store, size_t &max_size, std::function<std::shared_ptr<core::SerializableComponent>()> lambda) override;
-
-  /**
-   * Deserializes objects into a store that contains a fixed number of objects in which
-   * we will deserialize from this repo
-   * @param store precreated object vector
-   * @param max_size size of objects deserialized
-   */
-  bool DeSerialize(std::vector<std::shared_ptr<core::SerializableComponent>> &store, size_t &max_size) override;
-
-  /**
-   * Function to load this component.
-   */
-  void loadComponent(const std::shared_ptr<core::ContentRepository> &content_repo) override;
 
   uint64_t getRepoSize() const override {
     return repo_data_.current_size;
diff --git a/libminifi/include/provenance/Provenance.h b/libminifi/include/provenance/Provenance.h
index e3feb6708..275701fcb 100644
--- a/libminifi/include/provenance/Provenance.h
+++ b/libminifi/include/provenance/Provenance.h
@@ -15,8 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-#ifndef LIBMINIFI_INCLUDE_PROVENANCE_PROVENANCE_H_
-#define LIBMINIFI_INCLUDE_PROVENANCE_PROVENANCE_H_
+#pragma once
 
 #include <algorithm>
 #include <atomic>
@@ -43,10 +42,7 @@
 #include "utils/TimeUtil.h"
 
 namespace org::apache::nifi::minifi::provenance {
-// Provenance Event Record Serialization Seg Size
-#define PROVENANCE_EVENT_RECORD_SEG_SIZE 2048
 
-// Provenance Event Record
 class ProvenanceEventRecord : public core::SerializableComponent {
  public:
   enum ProvenanceEventType {
@@ -160,169 +156,168 @@ class ProvenanceEventRecord : public core::SerializableComponent {
     _eventTime = std::chrono::system_clock::now();
   }
 
-  // Destructor
   virtual ~ProvenanceEventRecord() = default;
-  // Get the Event ID
-  utils::Identifier getEventId() {
+
+  utils::Identifier getEventId() const {
     return getUUID();
   }
 
   void setEventId(const utils::Identifier &id) {
     setUUID(id);
   }
-  // Get Attributes
-  std::map<std::string, std::string> getAttributes() {
+
+  std::map<std::string, std::string> getAttributes() const {
     return _attributes;
   }
-  // Get Size
-  uint64_t getFileSize() {
+
+  uint64_t getFileSize() const {
     return _size;
   }
-  // ! Get Offset
-  uint64_t getFileOffset() {
+
+  uint64_t getFileOffset() const {
     return _offset;
   }
-  // ! Get Entry Date
-  std::chrono::system_clock::time_point getFlowFileEntryDate() {
+
+  std::chrono::system_clock::time_point getFlowFileEntryDate() const {
     return _entryDate;
   }
-  // ! Get Lineage Start Date
-  std::chrono::system_clock::time_point getlineageStartDate() {
+
+  std::chrono::system_clock::time_point getlineageStartDate() const {
     return _lineageStartDate;
   }
-  // ! Get Event Time
-  std::chrono::system_clock::time_point getEventTime() {
+
+  std::chrono::system_clock::time_point getEventTime() const {
     return _eventTime;
   }
-  // ! Get Event Duration
-  std::chrono::milliseconds getEventDuration() {
+
+  std::chrono::milliseconds getEventDuration() const {
     return _eventDuration;
   }
-  // Set Event Duration
+
   void setEventDuration(std::chrono::milliseconds duration) {
     _eventDuration = duration;
   }
-  // ! Get Event Type
-  ProvenanceEventType getEventType() {
+
+  ProvenanceEventType getEventType() const {
     return _eventType;
   }
-  // Get Component ID
-  std::string getComponentId() {
+
+  std::string getComponentId() const {
     return _componentId;
   }
-  // Get Component Type
-  std::string getComponentType() {
+
+  std::string getComponentType() const {
     return _componentType;
   }
-  // Get FlowFileUuid
-  utils::Identifier getFlowFileUuid() {
+
+  utils::Identifier getFlowFileUuid() const {
     return flow_uuid_;
   }
-  // Get content full path
-  std::string getContentFullPath() {
+
+  std::string getContentFullPath() const {
     return _contentFullPath;
   }
-  // Get LineageIdentifiers
-  std::vector<utils::Identifier> getLineageIdentifiers() {
+
+  std::vector<utils::Identifier> getLineageIdentifiers() const {
     return _lineageIdentifiers;
   }
-  // Get Details
-  std::string getDetails() {
+
+  std::string getDetails() const {
     return _details;
   }
-  // Set Details
-  void setDetails(std::string details) {
+
+  void setDetails(const std::string& details) {
     _details = details;
   }
-  // Get TransitUri
+
   std::string getTransitUri() {
     return _transitUri;
   }
-  // Set TransitUri
-  void setTransitUri(std::string uri) {
+
+  void setTransitUri(const std::string& uri) {
     _transitUri = uri;
   }
-  // Get SourceSystemFlowFileIdentifier
-  std::string getSourceSystemFlowFileIdentifier() {
+
+  std::string getSourceSystemFlowFileIdentifier() const {
     return _sourceSystemFlowFileIdentifier;
   }
-  // Set SourceSystemFlowFileIdentifier
-  void setSourceSystemFlowFileIdentifier(std::string identifier) {
+
+  void setSourceSystemFlowFileIdentifier(const std::string& identifier) {
     _sourceSystemFlowFileIdentifier = identifier;
   }
-  // Get Parent UUIDs
-  std::vector<utils::Identifier> getParentUuids() {
+
+  std::vector<utils::Identifier> getParentUuids() const {
     return _parentUuids;
   }
-  // Add Parent UUID
+
   void addParentUuid(const utils::Identifier& uuid) {
     if (std::find(_parentUuids.begin(), _parentUuids.end(), uuid) != _parentUuids.end())
       return;
     else
       _parentUuids.push_back(uuid);
   }
-  // Add Parent Flow File
+
   void addParentFlowFile(const std::shared_ptr<core::FlowFile>& flow) {
     addParentUuid(flow->getUUID());
   }
-  // Remove Parent UUID
+
   void removeParentUuid(const utils::Identifier& uuid) {
     _parentUuids.erase(std::remove(_parentUuids.begin(), _parentUuids.end(), uuid), _parentUuids.end());
   }
-  // Remove Parent Flow File
+
   void removeParentFlowFile(const std::shared_ptr<core::FlowFile>& flow) {
     removeParentUuid(flow->getUUID());
   }
-  // Get Children UUIDs
-  std::vector<utils::Identifier> getChildrenUuids() {
+
+  std::vector<utils::Identifier> getChildrenUuids() const {
     return _childrenUuids;
   }
-  // Add Child UUID
+
   void addChildUuid(const utils::Identifier& uuid) {
     if (std::find(_childrenUuids.begin(), _childrenUuids.end(), uuid) != _childrenUuids.end())
       return;
     else
       _childrenUuids.push_back(uuid);
   }
-  // Add Child Flow File
-  void addChildFlowFile(std::shared_ptr<core::FlowFile> flow) {
+
+  void addChildFlowFile(const std::shared_ptr<core::FlowFile>& flow) {
     addChildUuid(flow->getUUID());
     return;
   }
-  // Remove Child UUID
+
   void removeChildUuid(const utils::Identifier& uuid) {
     _childrenUuids.erase(std::remove(_childrenUuids.begin(), _childrenUuids.end(), uuid), _childrenUuids.end());
   }
-  // Remove Child Flow File
-  void removeChildFlowFile(std::shared_ptr<core::FlowFile> flow) {
+
+  void removeChildFlowFile(const std::shared_ptr<core::FlowFile>& flow) {
     removeChildUuid(flow->getUUID());
   }
-  // Get AlternateIdentifierUri
-  std::string getAlternateIdentifierUri() {
+
+  std::string getAlternateIdentifierUri() const {
     return _alternateIdentifierUri;
   }
-  // Set AlternateIdentifierUri
-  void setAlternateIdentifierUri(std::string uri) {
+
+  void setAlternateIdentifierUri(const std::string& uri) {
     _alternateIdentifierUri = uri;
   }
-  // Get Relationship
-  std::string getRelationship() {
+
+  std::string getRelationship() const {
     return _relationship;
   }
-  // Set Relationship
-  void setRelationship(std::string relation) {
+
+  void setRelationship(const std::string& relation) {
     _relationship = relation;
   }
-  // Get sourceQueueIdentifier
-  std::string getSourceQueueIdentifier() {
+
+  std::string getSourceQueueIdentifier() const {
     return _sourceQueueIdentifier;
   }
-  // Set sourceQueueIdentifier
-  void setSourceQueueIdentifier(std::string identifier) {
+
+  void setSourceQueueIdentifier(const std::string& identifier) {
     _sourceQueueIdentifier = identifier;
   }
-  // fromFlowFile
-  void fromFlowFile(std::shared_ptr<core::FlowFile> &flow) {
+
+  void fromFlowFile(const std::shared_ptr<core::FlowFile> &flow) {
     _entryDate = flow->getEntryDate();
     _lineageStartDate = flow->getlineageStartDate();
     _lineageIdentifiers = flow->getlineageIdentifiers();
@@ -336,21 +331,12 @@ class ProvenanceEventRecord : public core::SerializableComponent {
       _contentFullPath = flow->getResourceClaim()->getContentFullPath();
     }
   }
-  using SerializableComponent::Serialize;
-
-  // Serialize the event to a stream
-  bool Serialize(org::apache::nifi::minifi::io::BufferStream& outStream);
 
-  // Serialize and Persistent to the repository
-  bool Serialize(const std::shared_ptr<core::SerializableComponent> &repo) override;
-  bool DeSerialize(gsl::span<const std::byte>) override;
-  bool DeSerialize(org::apache::nifi::minifi::io::BufferStream &stream) {
-    return DeSerialize(stream.getBuffer());
-  }
-  bool DeSerialize(const std::shared_ptr<core::SerializableComponent> &store) override;
+  bool serialize(io::OutputStream& output_stream) override;
+  bool deserialize(io::InputStream &input_stream) override;
+  bool loadFromRepository(const std::shared_ptr<core::Repository> &repo);
 
  protected:
-  // Event type
   ProvenanceEventType _eventType;
   // Date at which the event was created
   std::chrono::system_clock::time_point _eventTime{};
@@ -358,39 +344,24 @@ class ProvenanceEventRecord : public core::SerializableComponent {
   std::chrono::system_clock::time_point _entryDate{};
   // Date at which the origin of this flow file entered the flow
   std::chrono::system_clock::time_point _lineageStartDate{};
-  // Event Duration
   std::chrono::milliseconds _eventDuration{};
-  // Component ID
   std::string _componentId;
-  // Component Type
   std::string _componentType;
   // Size in bytes of the data corresponding to this flow file
   uint64_t _size;
-  // flow uuid
   utils::Identifier flow_uuid_;
-  // Offset to the content
   uint64_t _offset;
-  // Full path to the content
   std::string _contentFullPath;
-  // Attributes key/values pairs for the flow record
   std::map<std::string, std::string> _attributes;
   // UUID string for all parents
   std::vector<utils::Identifier> _lineageIdentifiers;
-  // transitUri
   std::string _transitUri;
-  // sourceSystemFlowFileIdentifier
   std::string _sourceSystemFlowFileIdentifier;
-  // parent UUID
   std::vector<utils::Identifier> _parentUuids;
-  // child UUID
   std::vector<utils::Identifier> _childrenUuids;
-  // detail
   std::string _details;
-  // sourceQueueIdentifier
   std::string _sourceQueueIdentifier;
-  // relationship
   std::string _relationship;
-  // alternateIdentifierUri;
   std::string _alternateIdentifierUri;
 
  private:
@@ -402,13 +373,8 @@ class ProvenanceEventRecord : public core::SerializableComponent {
   static std::shared_ptr<utils::IdGenerator> id_generator_;
 };
 
-// Provenance Reporter
 class ProvenanceReporter {
  public:
-  // Constructor
-  /*!
-   * Create a new provenance reporter associated with the process session
-   */
   ProvenanceReporter(std::shared_ptr<core::Repository> repo, std::string componentId, std::string componentType)
       : logger_(core::logging::LoggerFactory<ProvenanceReporter>::getLogger()) {
     _componentId = componentId;
@@ -416,59 +382,45 @@ class ProvenanceReporter {
     repo_ = repo;
   }
 
-  // Destructor
   virtual ~ProvenanceReporter() {
     clear();
   }
-  // Get events
-  std::set<std::shared_ptr<ProvenanceEventRecord>> getEvents() {
+
+  std::set<std::shared_ptr<ProvenanceEventRecord>> getEvents() const {
     return _events;
   }
-  // Add event
+
   void add(const std::shared_ptr<ProvenanceEventRecord> &event) {
     _events.insert(event);
   }
-  // Remove event
+
   void remove(const std::shared_ptr<ProvenanceEventRecord> &event) {
     if (_events.find(event) != _events.end()) {
       _events.erase(event);
     }
   }
-  //
-  // clear
+
   void clear() {
     _events.clear();
   }
-  // commit
+
   void commit();
-  // create
-  void create(std::shared_ptr<core::FlowFile> flow, std::string detail);
-  // route
-  void route(std::shared_ptr<core::FlowFile> flow, core::Relationship relation, std::string detail, std::chrono::milliseconds processingDuration);
-  // modifyAttributes
-  void modifyAttributes(std::shared_ptr<core::FlowFile> flow, std::string detail);
-  // modifyContent
-  void modifyContent(std::shared_ptr<core::FlowFile> flow, std::string detail, std::chrono::milliseconds processingDuration);
-  // clone
-  void clone(std::shared_ptr<core::FlowFile> parent, std::shared_ptr<core::FlowFile> child);
-  // join
-  void join(std::vector<std::shared_ptr<core::FlowFile> > parents, std::shared_ptr<core::FlowFile> child, std::string detail, std::chrono::milliseconds processingDuration);
-  // fork
-  void fork(std::vector<std::shared_ptr<core::FlowFile> > child, std::shared_ptr<core::FlowFile> parent, std::string detail, std::chrono::milliseconds processingDuration);
-  // expire
-  void expire(std::shared_ptr<core::FlowFile> flow, std::string detail);
-  // drop
-  void drop(std::shared_ptr<core::FlowFile> flow, std::string reason);
-  // send
-  void send(std::shared_ptr<core::FlowFile> flow, std::string transitUri, std::string detail, std::chrono::milliseconds processingDuration, bool force);
-  // fetch
-  void fetch(std::shared_ptr<core::FlowFile> flow, std::string transitUri, std::string detail, std::chrono::milliseconds processingDuration);
-  // receive
-  void receive(std::shared_ptr<core::FlowFile> flow, std::string transitUri, std::string sourceSystemFlowFileIdentifier, std::string detail, std::chrono::milliseconds processingDuration);
+  void create(const std::shared_ptr<core::FlowFile>& flow, const std::string& detail);
+  void route(const std::shared_ptr<core::FlowFile>& flow, const core::Relationship& relation, const std::string& detail, std::chrono::milliseconds processingDuration);
+  void modifyAttributes(const std::shared_ptr<core::FlowFile>& flow, const std::string& detail);
+  void modifyContent(const std::shared_ptr<core::FlowFile>& flow, const std::string& detail, std::chrono::milliseconds processingDuration);
+  void clone(const std::shared_ptr<core::FlowFile>& parent, const std::shared_ptr<core::FlowFile>& child);
+  void join(const std::vector<std::shared_ptr<core::FlowFile>>& parents, const std::shared_ptr<core::FlowFile>& child, const std::string& detail, std::chrono::milliseconds processingDuration);
+  void fork(const std::vector<std::shared_ptr<core::FlowFile>>& children, const std::shared_ptr<core::FlowFile>& parent, const std::string& detail, std::chrono::milliseconds processingDuration);
+  void expire(const std::shared_ptr<core::FlowFile>& flow, const std::string& detail);
+  void drop(const std::shared_ptr<core::FlowFile>& flow, const std::string& reason);
+  void send(const std::shared_ptr<core::FlowFile>& flow, const std::string& transitUri, const std::string& detail, std::chrono::milliseconds processingDuration, bool force);
+  void fetch(const std::shared_ptr<core::FlowFile>& flow, const std::string& transitUri, const std::string& detail, std::chrono::milliseconds processingDuration);
+  void receive(const std::shared_ptr<core::FlowFile>& flow, const std::string& transitUri,
+    const std::string& sourceSystemFlowFileIdentifier, const std::string& detail, std::chrono::milliseconds processingDuration);
 
  protected:
-  // allocate
-  std::shared_ptr<ProvenanceEventRecord> allocate(ProvenanceEventRecord::ProvenanceEventType eventType, std::shared_ptr<core::FlowFile> flow) {
+  std::shared_ptr<ProvenanceEventRecord> allocate(ProvenanceEventRecord::ProvenanceEventType eventType, const std::shared_ptr<core::FlowFile>& flow) {
     if (repo_->isNoop()) {
       return nullptr;
     }
@@ -480,16 +432,12 @@ class ProvenanceReporter {
     return event;
   }
 
-  // Component ID
   std::string _componentId;
-  // Component Type
   std::string _componentType;
 
  private:
   std::shared_ptr<core::logging::Logger> logger_;
-  // Incoming connection Iterator
   std::set<std::shared_ptr<ProvenanceEventRecord>> _events;
-  // provenance repository.
   std::shared_ptr<core::Repository> repo_;
 
   // Prevent default copy constructor and assignment operation
@@ -498,8 +446,4 @@ class ProvenanceReporter {
   ProvenanceReporter &operator=(const ProvenanceReporter &parent);
 };
 
-// Provenance Repository
-
 }  // namespace org::apache::nifi::minifi::provenance
-
-#endif  // LIBMINIFI_INCLUDE_PROVENANCE_PROVENANCE_H_
diff --git a/libminifi/src/core/Repository.cpp b/libminifi/src/core/Repository.cpp
new file mode 100644
index 000000000..5ab712be3
--- /dev/null
+++ b/libminifi/src/core/Repository.cpp
@@ -0,0 +1,46 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "core/Repository.h"
+
+namespace org::apache::nifi::minifi::core {
+
+bool Repository::Delete(std::vector<std::shared_ptr<core::SerializableComponent>> &storedValues) {
+  bool found = true;
+  for (const auto& storedValue : storedValues) {
+    found &= Delete(storedValue->getName());
+  }
+  return found;
+}
+
+bool Repository::storeElement(const std::shared_ptr<core::SerializableComponent> element) {
+  if (!element) {
+    return false;
+  }
+
+  org::apache::nifi::minifi::io::BufferStream outStream;
+
+  element->serialize(outStream);
+
+  if (!Put(element->getUUIDStr(), const_cast<uint8_t*>(outStream.getBuffer().as_span<const uint8_t>().data()), outStream.size())) {
+    logger_->log_error("NiFi Provenance Store event %s size %llu fail", element->getUUIDStr(), outStream.size());
+    return false;
+  }
+  return true;
+}
+
+}  // namespace org::apache::nifi::minifi::core
diff --git a/libminifi/src/core/reporting/SiteToSiteProvenanceReportingTask.cpp b/libminifi/src/core/reporting/SiteToSiteProvenanceReportingTask.cpp
index 950977728..4c25ef5df 100644
--- a/libminifi/src/core/reporting/SiteToSiteProvenanceReportingTask.cpp
+++ b/libminifi/src/core/reporting/SiteToSiteProvenanceReportingTask.cpp
@@ -17,8 +17,6 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
-
 #include <vector>
 #include <queue>
 #include <map>
@@ -164,8 +162,7 @@ void SiteToSiteProvenanceReportingTask::onTrigger(const std::shared_ptr<core::Pr
   logging::LOG_DEBUG(logger_) << "batch size " << batch_size_ << " records";
   size_t deserialized = batch_size_;
   std::shared_ptr<core::Repository> repo = context->getProvenanceRepository();
-  std::function<std::shared_ptr<core::SerializableComponent>()> constructor = []() {return std::make_shared<provenance::ProvenanceEventRecord>();};
-  if (!repo->DeSerialize(records, deserialized, constructor) && deserialized == 0) {
+  if (!repo->getElements(records, deserialized) && deserialized == 0) {
     return;
   }
   logging::LOG_DEBUG(logger_) << "Captured " << deserialized << " records";
diff --git a/libminifi/src/core/repository/VolatileRepository.cpp b/libminifi/src/core/repository/VolatileRepository.cpp
index e186d0053..033efffcc 100644
--- a/libminifi/src/core/repository/VolatileRepository.cpp
+++ b/libminifi/src/core/repository/VolatileRepository.cpp
@@ -25,9 +25,6 @@
 
 namespace org::apache::nifi::minifi::core::repository {
 
-void VolatileRepository::loadComponent(const std::shared_ptr<core::ContentRepository>& /*content_repo*/) {
-}
-
 VolatileRepository::~VolatileRepository() {
   for (auto ent : repo_data_.value_vector) {
     delete ent;
@@ -122,45 +119,4 @@ bool VolatileRepository::Get(const std::string &key, std::string &value) {
   return false;
 }
 
-bool VolatileRepository::DeSerialize(std::vector<std::shared_ptr<core::SerializableComponent>> &store,
-                                     size_t &max_size, std::function<std::shared_ptr<core::SerializableComponent>()> lambda) {
-  size_t requested_batch = max_size;
-  max_size = 0;
-  for (auto ent : repo_data_.value_vector) {
-    // let the destructor do the cleanup
-    RepoValue<std::string> repo_value;
-
-    if (ent->getValue(repo_value)) {
-      std::shared_ptr<core::SerializableComponent> newComponent = lambda();
-      // we've taken ownership of this repo value
-      newComponent->DeSerialize(repo_value.getBuffer());
-      store.push_back(newComponent);
-      repo_data_.current_size -= repo_value.getBuffer().size();
-      if (max_size++ >= requested_batch) {
-        break;
-      }
-    }
-  }
-  return max_size > 0;
-}
-
-bool VolatileRepository::DeSerialize(std::vector<std::shared_ptr<core::SerializableComponent>> &store, size_t &max_size) {
-  logger_->log_debug("VolatileRepository -- DeSerialize %u", repo_data_.current_size.load());
-  max_size = 0;
-  for (auto ent : repo_data_.value_vector) {
-    // let the destructor do the cleanup
-    RepoValue<std::string> repo_value;
-
-    if (ent->getValue(repo_value)) {
-      // we've taken ownership of this repo value
-      store.at(max_size)->DeSerialize(repo_value.getBuffer());
-      repo_data_.current_size -= repo_value.getBuffer().size();
-      if (max_size++ >= store.size()) {
-        break;
-      }
-    }
-  }
-  return max_size > 0;
-}
-
 }  // namespace org::apache::nifi::minifi::core::repository
diff --git a/libminifi/src/provenance/Provenance.cpp b/libminifi/src/provenance/Provenance.cpp
index dc705dfc4..c3b9cfefc 100644
--- a/libminifi/src/provenance/Provenance.cpp
+++ b/libminifi/src/provenance/Provenance.cpp
@@ -23,6 +23,7 @@
 #include <string>
 #include <vector>
 #include <list>
+#include <utility>
 
 #include "core/Repository.h"
 #include "io/BufferStream.h"
@@ -40,20 +41,17 @@ const char *ProvenanceEventRecord::ProvenanceEventTypeStr[REPLAY + 1] = { "CREAT
     "ATTRIBUTES_MODIFIED", "ROUTE", "ADDINFO", "REPLAY" };
 
 ProvenanceEventRecord::ProvenanceEventRecord(ProvenanceEventRecord::ProvenanceEventType event, std::string componentId, std::string componentType)
-    : core::SerializableComponent(core::getClassName<ProvenanceEventRecord>()) {
-  _eventType = event;
-  _componentId = componentId;
-  _componentType = componentType;
-  _eventTime = std::chrono::system_clock::now();
+    : core::SerializableComponent(core::getClassName<ProvenanceEventRecord>()),
+      _eventType(event),
+      _componentId(std::move(componentId)),
+      _componentType(std::move(componentType)),
+      _eventTime(std::chrono::system_clock::now()) {
 }
 
-// DeSerialize
-bool ProvenanceEventRecord::DeSerialize(const std::shared_ptr<core::SerializableComponent> &store) {
+bool ProvenanceEventRecord::loadFromRepository(const std::shared_ptr<core::Repository> &repo) {
   std::string value;
   bool ret;
 
-  const std::shared_ptr<core::Repository> repo = std::dynamic_pointer_cast<core::Repository>(store);
-
   if (nullptr == repo || uuid_.isNil()) {
     logger_->log_error("Repo could not be assigned");
     return false;
@@ -69,7 +67,7 @@ bool ProvenanceEventRecord::DeSerialize(const std::shared_ptr<core::Serializable
 
   org::apache::nifi::minifi::io::BufferStream stream(value);
 
-  ret = DeSerialize(stream);
+  ret = deserialize(stream);
 
   if (ret) {
     logger_->log_debug("NiFi Provenance retrieve event %s size %llu eventType %d success", getUUIDStr(), stream.size(), _eventType);
@@ -80,68 +78,68 @@ bool ProvenanceEventRecord::DeSerialize(const std::shared_ptr<core::Serializable
   return ret;
 }
 
-bool ProvenanceEventRecord::Serialize(org::apache::nifi::minifi::io::BufferStream& outStream) {
+bool ProvenanceEventRecord::serialize(io::OutputStream& output_stream) {
   {
-    const auto ret = outStream.write(this->uuid_);
+    const auto ret = output_stream.write(this->uuid_);
     if (ret == 0 || io::isError(ret)) {
       return false;
     }
   }
   {
     uint32_t eventType = this->_eventType;
-    const auto ret = outStream.write(eventType);
+    const auto ret = output_stream.write(eventType);
     if (ret != 4) {
       return false;
     }
   }
   {
     uint64_t event_time_ms = std::chrono::duration_cast<std::chrono::milliseconds>(_eventTime.time_since_epoch()).count();
-    const auto ret = outStream.write(event_time_ms);
+    const auto ret = output_stream.write(event_time_ms);
     if (ret != 8) {
       return false;
     }
   }
   {
     uint64_t entry_date_ms = std::chrono::duration_cast<std::chrono::milliseconds>(_entryDate.time_since_epoch()).count();
-    const auto ret = outStream.write(entry_date_ms);
+    const auto ret = output_stream.write(entry_date_ms);
     if (ret != 8) {
       return false;
     }
   }
   {
     uint64_t event_duration_ms = this->_eventDuration.count();
-    const auto ret = outStream.write(event_duration_ms);
+    const auto ret = output_stream.write(event_duration_ms);
     if (ret != 8) {
       return false;
     }
   }
   {
     uint64_t lineage_start_date_ms = std::chrono::duration_cast<std::chrono::milliseconds>(_lineageStartDate.time_since_epoch()).count();
-    const auto ret = outStream.write(lineage_start_date_ms);
+    const auto ret = output_stream.write(lineage_start_date_ms);
     if (ret != 8) {
       return false;
     }
   }
   {
-    const auto ret = outStream.write(this->_componentId);
+    const auto ret = output_stream.write(this->_componentId);
     if (ret == 0 || io::isError(ret)) {
       return false;
     }
   }
   {
-    const auto ret = outStream.write(this->_componentType);
+    const auto ret = output_stream.write(this->_componentType);
     if (ret == 0 || io::isError(ret)) {
       return false;
     }
   }
   {
-    const auto ret = outStream.write(this->flow_uuid_);
+    const auto ret = output_stream.write(this->flow_uuid_);
     if (ret == 0 || io::isError(ret)) {
       return false;
     }
   }
   {
-    const auto ret = outStream.write(this->_details);
+    const auto ret = output_stream.write(this->_details);
     if (ret == 0 || io::isError(ret)) {
       return false;
     }
@@ -149,45 +147,45 @@ bool ProvenanceEventRecord::Serialize(org::apache::nifi::minifi::io::BufferStrea
   // write flow attributes
   {
     const auto numAttributes = gsl::narrow<uint32_t>(this->_attributes.size());
-    const auto ret = outStream.write(numAttributes);
+    const auto ret = output_stream.write(numAttributes);
     if (ret != 4) {
       return false;
     }
   }
   for (const auto& itAttribute : _attributes) {
     {
-      const auto ret = outStream.write(itAttribute.first);
+      const auto ret = output_stream.write(itAttribute.first);
       if (ret == 0 || io::isError(ret)) {
         return false;
       }
     }
     {
-      const auto ret = outStream.write(itAttribute.second);
+      const auto ret = output_stream.write(itAttribute.second);
       if (ret == 0 || io::isError(ret)) {
         return false;
       }
     }
   }
   {
-    const auto ret = outStream.write(this->_contentFullPath);
+    const auto ret = output_stream.write(this->_contentFullPath);
     if (ret == 0 || io::isError(ret)) {
       return false;
     }
   }
   {
-    const auto ret = outStream.write(this->_size);
+    const auto ret = output_stream.write(this->_size);
     if (ret != 8) {
       return false;
     }
   }
   {
-    const auto ret = outStream.write(this->_offset);
+    const auto ret = output_stream.write(this->_offset);
     if (ret != 8) {
       return false;
     }
   }
   {
-    const auto ret = outStream.write(this->_sourceQueueIdentifier);
+    const auto ret = output_stream.write(this->_sourceQueueIdentifier);
     if (ret == 0 || io::isError(ret)) {
       return false;
     }
@@ -196,44 +194,44 @@ bool ProvenanceEventRecord::Serialize(org::apache::nifi::minifi::io::BufferStrea
     // write UUIDs
     {
       const auto parent_uuids_count = gsl::narrow<uint32_t>(this->_parentUuids.size());
-      const auto ret = outStream.write(parent_uuids_count);
+      const auto ret = output_stream.write(parent_uuids_count);
       if (ret != 4) {
         return false;
       }
     }
     for (const auto& parentUUID : _parentUuids) {
-      const auto ret = outStream.write(parentUUID);
+      const auto ret = output_stream.write(parentUUID);
       if (ret == 0 || io::isError(ret)) {
         return false;
       }
     }
     {
       const auto children_uuids_count = gsl::narrow<uint32_t>(this->_childrenUuids.size());
-      const auto ret = outStream.write(children_uuids_count);
+      const auto ret = output_stream.write(children_uuids_count);
       if (ret != 4) {
         return false;
       }
     }
     for (const auto& childUUID : _childrenUuids) {
-      const auto ret = outStream.write(childUUID);
+      const auto ret = output_stream.write(childUUID);
       if (ret == 0 || io::isError(ret)) {
         return false;
       }
     }
   } else if (this->_eventType == ProvenanceEventRecord::SEND || this->_eventType == ProvenanceEventRecord::FETCH) {
-    const auto ret = outStream.write(this->_transitUri);
+    const auto ret = output_stream.write(this->_transitUri);
     if (ret == 0 || io::isError(ret)) {
       return false;
     }
   } else if (this->_eventType == ProvenanceEventRecord::RECEIVE) {
     {
-      const auto ret = outStream.write(this->_transitUri);
+      const auto ret = output_stream.write(this->_transitUri);
       if (ret == 0 || io::isError(ret)) {
         return false;
       }
     }
     {
-      const auto ret = outStream.write(this->_sourceSystemFlowFileIdentifier);
+      const auto ret = output_stream.write(this->_sourceSystemFlowFileIdentifier);
       if (ret == 0 || io::isError(ret)) {
         return false;
       }
@@ -243,23 +241,9 @@ bool ProvenanceEventRecord::Serialize(org::apache::nifi::minifi::io::BufferStrea
   return true;
 }
 
-bool ProvenanceEventRecord::Serialize(const std::shared_ptr<core::SerializableComponent> &repo) {
-  org::apache::nifi::minifi::io::BufferStream outStream;
-
-  Serialize(outStream);
-
-  // Persist to the DB
-  if (!repo->Serialize(getUUIDStr(), const_cast<uint8_t*>(outStream.getBuffer().as_span<const uint8_t>().data()), outStream.size())) {
-    logger_->log_error("NiFi Provenance Store event %s size %llu fail", getUUIDStr(), outStream.size());
-  }
-  return true;
-}
-
-bool ProvenanceEventRecord::DeSerialize(const gsl::span<const std::byte> buffer) {
-  org::apache::nifi::minifi::io::BufferStream outStream(buffer);
-
+bool ProvenanceEventRecord::deserialize(io::InputStream &input_stream) {
   {
-    const auto ret = outStream.read(uuid_);
+    const auto ret = input_stream.read(uuid_);
     if (ret == 0 || io::isError(ret)) {
       return false;
     }
@@ -267,7 +251,7 @@ bool ProvenanceEventRecord::DeSerialize(const gsl::span<const std::byte> buffer)
 
   uint32_t eventType;
   {
-    const auto ret = outStream.read(eventType);
+    const auto ret = input_stream.read(eventType);
     if (ret != 4) {
       return false;
     }
@@ -276,7 +260,7 @@ bool ProvenanceEventRecord::DeSerialize(const gsl::span<const std::byte> buffer)
   this->_eventType = (ProvenanceEventRecord::ProvenanceEventType) eventType;
   {
     uint64_t event_time_in_ms;
-    const auto ret = outStream.read(event_time_in_ms);
+    const auto ret = input_stream.read(event_time_in_ms);
     if (ret != 8) {
       return false;
     }
@@ -285,7 +269,7 @@ bool ProvenanceEventRecord::DeSerialize(const gsl::span<const std::byte> buffer)
 
   {
     uint64_t entry_date_in_ms;
-    const auto ret = outStream.read(entry_date_in_ms);
+    const auto ret = input_stream.read(entry_date_in_ms);
     if (ret != 8) {
       return false;
     }
@@ -294,7 +278,7 @@ bool ProvenanceEventRecord::DeSerialize(const gsl::span<const std::byte> buffer)
 
   {
     uint64_t event_duration_ms;
-    const auto ret = outStream.read(event_duration_ms);
+    const auto ret = input_stream.read(event_duration_ms);
     if (ret != 8) {
       return false;
     }
@@ -303,7 +287,7 @@ bool ProvenanceEventRecord::DeSerialize(const gsl::span<const std::byte> buffer)
 
   {
     uint64_t lineage_start_date_in_ms;
-    const auto ret = outStream.read(lineage_start_date_in_ms);
+    const auto ret = input_stream.read(lineage_start_date_in_ms);
     if (ret != 8) {
       return false;
     }
@@ -311,28 +295,28 @@ bool ProvenanceEventRecord::DeSerialize(const gsl::span<const std::byte> buffer)
   }
 
   {
-    const auto ret = outStream.read(this->_componentId);
+    const auto ret = input_stream.read(this->_componentId);
     if (ret == 0 || io::isError(ret)) {
       return false;
     }
   }
 
   {
-    const auto ret = outStream.read(this->_componentType);
+    const auto ret = input_stream.read(this->_componentType);
     if (ret == 0 || io::isError(ret)) {
       return false;
     }
   }
 
   {
-    const auto ret = outStream.read(this->flow_uuid_);
+    const auto ret = input_stream.read(this->flow_uuid_);
     if (ret == 0 || io::isError(ret)) {
       return false;
     }
   }
 
   {
-    const auto ret = outStream.read(this->_details);
+    const auto ret = input_stream.read(this->_details);
     if (ret == 0 || io::isError(ret)) {
       return false;
     }
@@ -341,7 +325,7 @@ bool ProvenanceEventRecord::DeSerialize(const gsl::span<const std::byte> buffer)
   // read flow attributes
   uint32_t numAttributes = 0;
   {
-    const auto ret = outStream.read(numAttributes);
+    const auto ret = input_stream.read(numAttributes);
     if (ret != 4) {
       return false;
     }
@@ -350,14 +334,14 @@ bool ProvenanceEventRecord::DeSerialize(const gsl::span<const std::byte> buffer)
   for (uint32_t i = 0; i < numAttributes; i++) {
     std::string key;
     {
-      const auto ret = outStream.read(key);
+      const auto ret = input_stream.read(key);
       if (ret == 0 || io::isError(ret)) {
         return false;
       }
     }
     std::string value;
     {
-      const auto ret = outStream.read(value);
+      const auto ret = input_stream.read(value);
       if (ret == 0 || io::isError(ret)) {
         return false;
       }
@@ -366,28 +350,28 @@ bool ProvenanceEventRecord::DeSerialize(const gsl::span<const std::byte> buffer)
   }
 
   {
-    const auto ret = outStream.read(this->_contentFullPath);
+    const auto ret = input_stream.read(this->_contentFullPath);
     if (ret == 0 || io::isError(ret)) {
       return false;
     }
   }
 
   {
-    const auto ret = outStream.read(this->_size);
+    const auto ret = input_stream.read(this->_size);
     if (ret != 8) {
       return false;
     }
   }
 
   {
-    const auto ret = outStream.read(this->_offset);
+    const auto ret = input_stream.read(this->_offset);
     if (ret != 8) {
       return false;
     }
   }
 
   {
-    const auto ret = outStream.read(this->_sourceQueueIdentifier);
+    const auto ret = input_stream.read(this->_sourceQueueIdentifier);
     if (ret == 0 || io::isError(ret)) {
       return false;
     }
@@ -397,7 +381,7 @@ bool ProvenanceEventRecord::DeSerialize(const gsl::span<const std::byte> buffer)
     // read UUIDs
     uint32_t number = 0;
     {
-      const auto ret = outStream.read(number);
+      const auto ret = input_stream.read(number);
       if (ret != 4) {
         return false;
       }
@@ -406,7 +390,7 @@ bool ProvenanceEventRecord::DeSerialize(const gsl::span<const std::byte> buffer)
     for (uint32_t i = 0; i < number; i++) {
       utils::Identifier parentUUID;
       {
-        const auto ret = outStream.read(parentUUID);
+        const auto ret = input_stream.read(parentUUID);
         if (ret == 0 || io::isError(ret)) {
           return false;
         }
@@ -415,7 +399,7 @@ bool ProvenanceEventRecord::DeSerialize(const gsl::span<const std::byte> buffer)
     }
     number = 0;
     {
-      const auto ret = outStream.read(number);
+      const auto ret = input_stream.read(number);
       if (ret != 4) {
         return false;
       }
@@ -423,7 +407,7 @@ bool ProvenanceEventRecord::DeSerialize(const gsl::span<const std::byte> buffer)
     for (uint32_t i = 0; i < number; i++) {
       utils::Identifier childUUID;
       {
-        const auto ret = outStream.read(childUUID);
+        const auto ret = input_stream.read(childUUID);
         if (ret == 0 || io::isError(ret)) {
           return false;
         }
@@ -432,20 +416,20 @@ bool ProvenanceEventRecord::DeSerialize(const gsl::span<const std::byte> buffer)
     }
   } else if (this->_eventType == ProvenanceEventRecord::SEND || this->_eventType == ProvenanceEventRecord::FETCH) {
     {
-      const auto ret = outStream.read(this->_transitUri);
+      const auto ret = input_stream.read(this->_transitUri);
       if (ret == 0 || io::isError(ret)) {
         return false;
       }
     }
   } else if (this->_eventType == ProvenanceEventRecord::RECEIVE) {
     {
-      const auto ret = outStream.read(this->_transitUri);
+      const auto ret = input_stream.read(this->_transitUri);
       if (ret == 0 || io::isError(ret)) {
         return false;
       }
     }
     {
-      const auto ret = outStream.read(this->_sourceSystemFlowFileIdentifier);
+      const auto ret = input_stream.read(this->_sourceSystemFlowFileIdentifier);
       if (ret == 0 || io::isError(ret)) {
         return false;
       }
@@ -469,14 +453,14 @@ void ProvenanceReporter::commit() {
 
   for (auto& event : _events) {
     std::unique_ptr<io::BufferStream> stramptr(new io::BufferStream());
-    event->Serialize(*stramptr);
+    event->serialize(*stramptr);
 
     flowData.emplace_back(event->getUUIDStr(), std::move(stramptr));
   }
   repo_->MultiPut(flowData);
 }
 
-void ProvenanceReporter::create(std::shared_ptr<core::FlowFile> flow, std::string detail) {
+void ProvenanceReporter::create(const std::shared_ptr<core::FlowFile>& flow, const std::string& detail) {
   auto event = allocate(ProvenanceEventRecord::CREATE, flow);
 
   if (event) {
@@ -485,7 +469,7 @@ void ProvenanceReporter::create(std::shared_ptr<core::FlowFile> flow, std::strin
   }
 }
 
-void ProvenanceReporter::route(std::shared_ptr<core::FlowFile> flow, core::Relationship relation, std::string detail, std::chrono::milliseconds processingDuration) {
+void ProvenanceReporter::route(const std::shared_ptr<core::FlowFile>& flow, const core::Relationship& relation, const std::string& detail, std::chrono::milliseconds processingDuration) {
   auto event = allocate(ProvenanceEventRecord::ROUTE, flow);
 
   if (event) {
@@ -496,7 +480,7 @@ void ProvenanceReporter::route(std::shared_ptr<core::FlowFile> flow, core::Relat
   }
 }
 
-void ProvenanceReporter::modifyAttributes(std::shared_ptr<core::FlowFile> flow, std::string detail) {
+void ProvenanceReporter::modifyAttributes(const std::shared_ptr<core::FlowFile>& flow, const std::string& detail) {
   auto event = allocate(ProvenanceEventRecord::ATTRIBUTES_MODIFIED, flow);
 
   if (event) {
@@ -505,7 +489,7 @@ void ProvenanceReporter::modifyAttributes(std::shared_ptr<core::FlowFile> flow,
   }
 }
 
-void ProvenanceReporter::modifyContent(std::shared_ptr<core::FlowFile> flow, std::string detail, std::chrono::milliseconds processingDuration) {
+void ProvenanceReporter::modifyContent(const std::shared_ptr<core::FlowFile>& flow, const std::string& detail, std::chrono::milliseconds processingDuration) {
   auto event = allocate(ProvenanceEventRecord::CONTENT_MODIFIED, flow);
 
   if (event) {
@@ -515,7 +499,7 @@ void ProvenanceReporter::modifyContent(std::shared_ptr<core::FlowFile> flow, std
   }
 }
 
-void ProvenanceReporter::clone(std::shared_ptr<core::FlowFile> parent, std::shared_ptr<core::FlowFile> child) {
+void ProvenanceReporter::clone(const std::shared_ptr<core::FlowFile>& parent, const std::shared_ptr<core::FlowFile>& child) {
   auto event = allocate(ProvenanceEventRecord::CLONE, parent);
 
   if (event) {
@@ -525,15 +509,14 @@ void ProvenanceReporter::clone(std::shared_ptr<core::FlowFile> parent, std::shar
   }
 }
 
-void ProvenanceReporter::join(std::vector<std::shared_ptr<core::FlowFile> > parents, std::shared_ptr<core::FlowFile> child, std::string detail, std::chrono::milliseconds processingDuration) {
+void ProvenanceReporter::join(const std::vector<std::shared_ptr<core::FlowFile>>& parents, const std::shared_ptr<core::FlowFile>& child,
+    const std::string& detail, std::chrono::milliseconds processingDuration) {
   auto event = allocate(ProvenanceEventRecord::JOIN, child);
 
   if (event) {
     event->addChildFlowFile(child);
-    std::vector<std::shared_ptr<core::FlowFile> >::iterator it;
-    for (it = parents.begin(); it != parents.end(); it++) {
-      std::shared_ptr<core::FlowFile> record = *it;
-      event->addParentFlowFile(record);
+    for (const auto& parent : parents) {
+      event->addParentFlowFile(parent);
     }
     event->setDetails(detail);
     event->setEventDuration(processingDuration);
@@ -541,15 +524,14 @@ void ProvenanceReporter::join(std::vector<std::shared_ptr<core::FlowFile> > pare
   }
 }
 
-void ProvenanceReporter::fork(std::vector<std::shared_ptr<core::FlowFile> > child, std::shared_ptr<core::FlowFile> parent, std::string detail, std::chrono::milliseconds processingDuration) {
+void ProvenanceReporter::fork(const std::vector<std::shared_ptr<core::FlowFile>>& children, const std::shared_ptr<core::FlowFile>& parent,
+    const std::string& detail, std::chrono::milliseconds processingDuration) {
   auto event = allocate(ProvenanceEventRecord::FORK, parent);
 
   if (event) {
     event->addParentFlowFile(parent);
-    std::vector<std::shared_ptr<core::FlowFile> >::iterator it;
-    for (it = child.begin(); it != child.end(); it++) {
-      std::shared_ptr<core::FlowFile> record = *it;
-      event->addChildFlowFile(record);
+    for (const auto& child : children) {
+      event->addChildFlowFile(child);
     }
     event->setDetails(detail);
     event->setEventDuration(processingDuration);
@@ -557,7 +539,7 @@ void ProvenanceReporter::fork(std::vector<std::shared_ptr<core::FlowFile> > chil
   }
 }
 
-void ProvenanceReporter::expire(std::shared_ptr<core::FlowFile> flow, std::string detail) {
+void ProvenanceReporter::expire(const std::shared_ptr<core::FlowFile>& flow, const std::string& detail) {
   auto event = allocate(ProvenanceEventRecord::EXPIRE, flow);
 
   if (event) {
@@ -566,7 +548,7 @@ void ProvenanceReporter::expire(std::shared_ptr<core::FlowFile> flow, std::strin
   }
 }
 
-void ProvenanceReporter::drop(std::shared_ptr<core::FlowFile> flow, std::string reason) {
+void ProvenanceReporter::drop(const std::shared_ptr<core::FlowFile>& flow, const std::string& reason) {
   auto event = allocate(ProvenanceEventRecord::DROP, flow);
 
   if (event) {
@@ -576,7 +558,7 @@ void ProvenanceReporter::drop(std::shared_ptr<core::FlowFile> flow, std::string
   }
 }
 
-void ProvenanceReporter::send(std::shared_ptr<core::FlowFile> flow, std::string transitUri, std::string detail, std::chrono::milliseconds processingDuration, bool force) {
+void ProvenanceReporter::send(const std::shared_ptr<core::FlowFile>& flow, const std::string& transitUri, const std::string& detail, std::chrono::milliseconds processingDuration, bool force) {
   auto event = allocate(ProvenanceEventRecord::SEND, flow);
 
   if (event) {
@@ -587,15 +569,15 @@ void ProvenanceReporter::send(std::shared_ptr<core::FlowFile> flow, std::string
       add(event);
     } else {
       if (!repo_->isFull())
-        event->Serialize(repo_);
+        repo_->storeElement(event);
     }
   }
 }
 
-void ProvenanceReporter::receive(std::shared_ptr<core::FlowFile> flow,
-                                 std::string transitUri,
-                                 std::string sourceSystemFlowFileIdentifier,
-                                 std::string detail,
+void ProvenanceReporter::receive(const std::shared_ptr<core::FlowFile>& flow,
+                                 const std::string& transitUri,
+                                 const std::string& sourceSystemFlowFileIdentifier,
+                                 const std::string& detail,
                                  std::chrono::milliseconds processingDuration) {
   auto event = allocate(ProvenanceEventRecord::RECEIVE, flow);
 
@@ -608,7 +590,7 @@ void ProvenanceReporter::receive(std::shared_ptr<core::FlowFile> flow,
   }
 }
 
-void ProvenanceReporter::fetch(std::shared_ptr<core::FlowFile> flow, std::string transitUri, std::string detail, std::chrono::milliseconds processingDuration) {
+void ProvenanceReporter::fetch(const std::shared_ptr<core::FlowFile>& flow, const std::string& transitUri, const std::string& detail, std::chrono::milliseconds processingDuration) {
   auto event = allocate(ProvenanceEventRecord::FETCH, flow);
 
   if (event) {
diff --git a/libminifi/test/rocksdb-tests/ProvenanceTests.cpp b/libminifi/test/rocksdb-tests/ProvenanceTests.cpp
index 7a3f06557..09e3d4e0b 100644
--- a/libminifi/test/rocksdb-tests/ProvenanceTests.cpp
+++ b/libminifi/test/rocksdb-tests/ProvenanceTests.cpp
@@ -35,128 +35,128 @@ namespace provenance = minifi::provenance;
 using namespace std::literals::chrono_literals;
 
 TEST_CASE("Test Provenance record create", "[Testprovenance::ProvenanceEventRecord]") {
-  provenance::ProvenanceEventRecord record1(provenance::ProvenanceEventRecord::ProvenanceEventType::CREATE, "blah", "blahblah");
-  REQUIRE(record1.getAttributes().empty());
-  REQUIRE(record1.getAlternateIdentifierUri().length() == 0);
+  auto record1 = std::make_shared<provenance::ProvenanceEventRecord>(provenance::ProvenanceEventRecord::ProvenanceEventType::CREATE, "blah", "blahblah");
+  REQUIRE(record1->getAttributes().empty());
+  REQUIRE(record1->getAlternateIdentifierUri().length() == 0);
 }
 
 TEST_CASE("Test Provenance record serialization", "[Testprovenance::ProvenanceEventRecordSerializeDeser]") {
-  provenance::ProvenanceEventRecord record1(provenance::ProvenanceEventRecord::ProvenanceEventType::CREATE, "componentid", "componenttype");
+  auto record1 = std::make_shared<provenance::ProvenanceEventRecord>(provenance::ProvenanceEventRecord::ProvenanceEventType::CREATE, "componentid", "componenttype");
 
-  utils::Identifier eventId = record1.getEventId();
+  utils::Identifier eventId = record1->getEventId();
 
   std::string smileyface = ":)";
-  record1.setDetails(smileyface);
+  record1->setDetails(smileyface);
 
   auto sample = 65555ms;
   std::shared_ptr<core::Repository> testRepository = std::make_shared<TestRepository>();
-  record1.setEventDuration(sample);
-
-  record1.Serialize(testRepository);
-  provenance::ProvenanceEventRecord record2;
-  record2.setEventId(eventId);
-  REQUIRE(record2.DeSerialize(testRepository) == true);
-  REQUIRE(record2.getEventId() == record1.getEventId());
-  REQUIRE(record2.getComponentId() == record1.getComponentId());
-  REQUIRE(record2.getComponentType() == record1.getComponentType());
-  REQUIRE(record2.getDetails() == record1.getDetails());
-  REQUIRE(record2.getDetails() == smileyface);
-  REQUIRE(record2.getEventDuration() == sample);
+  record1->setEventDuration(sample);
+
+  testRepository->storeElement(record1);
+  auto record2 = std::make_shared<provenance::ProvenanceEventRecord>();
+  record2->setEventId(eventId);
+  REQUIRE(record2->loadFromRepository(testRepository) == true);
+  REQUIRE(record2->getEventId() == record1->getEventId());
+  REQUIRE(record2->getComponentId() == record1->getComponentId());
+  REQUIRE(record2->getComponentType() == record1->getComponentType());
+  REQUIRE(record2->getDetails() == record1->getDetails());
+  REQUIRE(record2->getDetails() == smileyface);
+  REQUIRE(record2->getEventDuration() == sample);
 }
 
 TEST_CASE("Test Flowfile record added to provenance", "[TestFlowAndProv1]") {
-  provenance::ProvenanceEventRecord record1(provenance::ProvenanceEventRecord::ProvenanceEventType::CLONE, "componentid", "componenttype");
-  utils::Identifier eventId = record1.getEventId();
+  auto record1 = std::make_shared<provenance::ProvenanceEventRecord>(provenance::ProvenanceEventRecord::ProvenanceEventType::CLONE, "componentid", "componenttype");
+  utils::Identifier eventId = record1->getEventId();
   std::shared_ptr<minifi::FlowFileRecord> ffr1 = std::make_shared<minifi::FlowFileRecord>();
   ffr1->setAttribute("potato", "potatoe");
   ffr1->setAttribute("tomato", "tomatoe");
 
-  record1.addChildFlowFile(ffr1);
+  record1->addChildFlowFile(ffr1);
 
   auto sample = 65555ms;
   std::shared_ptr<core::Repository> testRepository = std::make_shared<TestRepository>();
-  record1.setEventDuration(sample);
-
-  record1.Serialize(testRepository);
-  provenance::ProvenanceEventRecord record2;
-  record2.setEventId(eventId);
-  REQUIRE(record2.DeSerialize(testRepository) == true);
-  REQUIRE(record1.getChildrenUuids().size() == 1);
-  REQUIRE(record2.getChildrenUuids().size() == 1);
-  utils::Identifier childId = record2.getChildrenUuids().at(0);
+  record1->setEventDuration(sample);
+
+  testRepository->storeElement(record1);
+  auto record2 = std::make_shared<provenance::ProvenanceEventRecord>();
+  record2->setEventId(eventId);
+  REQUIRE(record2->loadFromRepository(testRepository) == true);
+  REQUIRE(record1->getChildrenUuids().size() == 1);
+  REQUIRE(record2->getChildrenUuids().size() == 1);
+  utils::Identifier childId = record2->getChildrenUuids().at(0);
   REQUIRE(childId == ffr1->getUUID());
-  record2.removeChildUuid(childId);
-  REQUIRE(record2.getChildrenUuids().empty());
+  record2->removeChildUuid(childId);
+  REQUIRE(record2->getChildrenUuids().empty());
 }
 
 TEST_CASE("Test Provenance record serialization Volatile", "[Testprovenance::ProvenanceEventRecordSerializeDeser]") {
-  provenance::ProvenanceEventRecord record1(provenance::ProvenanceEventRecord::ProvenanceEventType::CREATE, "componentid", "componenttype");
+  auto record1 = std::make_shared<provenance::ProvenanceEventRecord>(provenance::ProvenanceEventRecord::ProvenanceEventType::CREATE, "componentid", "componenttype");
 
-  utils::Identifier eventId = record1.getEventId();
+  utils::Identifier eventId = record1->getEventId();
 
   std::string smileyface = ":)";
-  record1.setDetails(smileyface);
+  record1->setDetails(smileyface);
 
   auto sample = 65555ms;
 
   std::shared_ptr<core::Repository> testRepository = std::make_shared<core::repository::VolatileProvenanceRepository>();
   testRepository->initialize(nullptr);
-  record1.setEventDuration(sample);
-
-  record1.Serialize(testRepository);
-  provenance::ProvenanceEventRecord record2;
-  record2.setEventId(eventId);
-  REQUIRE(record2.DeSerialize(testRepository) == true);
-  REQUIRE(record2.getEventId() == record1.getEventId());
-  REQUIRE(record2.getComponentId() == record1.getComponentId());
-  REQUIRE(record2.getComponentType() == record1.getComponentType());
-  REQUIRE(record2.getDetails() == record1.getDetails());
-  REQUIRE(record2.getDetails() == smileyface);
-  REQUIRE(record2.getEventDuration() == sample);
+  record1->setEventDuration(sample);
+
+  testRepository->storeElement(record1);
+  auto record2 = std::make_shared<provenance::ProvenanceEventRecord>();
+  record2->setEventId(eventId);
+  REQUIRE(record2->loadFromRepository(testRepository) == true);
+  REQUIRE(record2->getEventId() == record1->getEventId());
+  REQUIRE(record2->getComponentId() == record1->getComponentId());
+  REQUIRE(record2->getComponentType() == record1->getComponentType());
+  REQUIRE(record2->getDetails() == record1->getDetails());
+  REQUIRE(record2->getDetails() == smileyface);
+  REQUIRE(record2->getEventDuration() == sample);
 }
 
 TEST_CASE("Test Flowfile record added to provenance using Volatile Repo", "[TestFlowAndProv1]") {
-  provenance::ProvenanceEventRecord record1(provenance::ProvenanceEventRecord::ProvenanceEventType::CLONE, "componentid", "componenttype");
-  utils::Identifier eventId = record1.getEventId();
+  auto record1 = std::make_shared<provenance::ProvenanceEventRecord>(provenance::ProvenanceEventRecord::ProvenanceEventType::CLONE, "componentid", "componenttype");
+  utils::Identifier eventId = record1->getEventId();
   std::shared_ptr<minifi::FlowFileRecord> ffr1 = std::make_shared<minifi::FlowFileRecord>();
   ffr1->setAttribute("potato", "potatoe");
   ffr1->setAttribute("tomato", "tomatoe");
 
-  record1.addChildFlowFile(ffr1);
+  record1->addChildFlowFile(ffr1);
 
   auto sample = 65555ms;
   std::shared_ptr<core::Repository> testRepository = std::make_shared<core::repository::VolatileProvenanceRepository>();
   testRepository->initialize(nullptr);
-  record1.setEventDuration(sample);
-
-  record1.Serialize(testRepository);
-  provenance::ProvenanceEventRecord record2;
-  record2.setEventId(eventId);
-  REQUIRE(record2.DeSerialize(testRepository) == true);
-  REQUIRE(record1.getChildrenUuids().size() == 1);
-  REQUIRE(record2.getChildrenUuids().size() == 1);
-  utils::Identifier childId = record2.getChildrenUuids().at(0);
+  record1->setEventDuration(sample);
+
+  testRepository->storeElement(record1);
+  auto record2 = std::make_shared<provenance::ProvenanceEventRecord>();
+  record2->setEventId(eventId);
+  REQUIRE(record2->loadFromRepository(testRepository) == true);
+  REQUIRE(record1->getChildrenUuids().size() == 1);
+  REQUIRE(record2->getChildrenUuids().size() == 1);
+  utils::Identifier childId = record2->getChildrenUuids().at(0);
   REQUIRE(childId == ffr1->getUUID());
-  record2.removeChildUuid(childId);
-  REQUIRE(record2.getChildrenUuids().empty());
+  record2->removeChildUuid(childId);
+  REQUIRE(record2->getChildrenUuids().empty());
 }
 
 TEST_CASE("Test Provenance record serialization NoOp", "[Testprovenance::ProvenanceEventRecordSerializeDeser]") {
-  provenance::ProvenanceEventRecord record1(provenance::ProvenanceEventRecord::ProvenanceEventType::CREATE, "componentid", "componenttype");
+  auto record1 = std::make_shared<provenance::ProvenanceEventRecord>(provenance::ProvenanceEventRecord::ProvenanceEventType::CREATE, "componentid", "componenttype");
 
-  utils::Identifier eventId = record1.getEventId();
+  utils::Identifier eventId = record1->getEventId();
 
   std::string smileyface = ":)";
-  record1.setDetails(smileyface);
+  record1->setDetails(smileyface);
 
   auto sample = 65555ms;
 
   std::shared_ptr<core::Repository> testRepository = core::createRepository("nooprepository");
   testRepository->initialize(nullptr);
-  record1.setEventDuration(sample);
+  record1->setEventDuration(sample);
 
-  REQUIRE(record1.Serialize(testRepository) == true);
-  provenance::ProvenanceEventRecord record2;
-  record2.setEventId(eventId);
-  REQUIRE(record2.DeSerialize(testRepository) == false);
+  REQUIRE(testRepository->storeElement(record1));
+  auto record2 = std::make_shared<provenance::ProvenanceEventRecord>();
+  record2->setEventId(eventId);
+  REQUIRE(record2->loadFromRepository(testRepository) == false);
 }
diff --git a/libminifi/test/unit/ProvenanceTestHelper.h b/libminifi/test/unit/ProvenanceTestHelper.h
index 41ca154a8..400e0e91d 100644
--- a/libminifi/test/unit/ProvenanceTestHelper.h
+++ b/libminifi/test/unit/ProvenanceTestHelper.h
@@ -35,6 +35,7 @@
 #include "properties/Configure.h"
 #include "provenance/Provenance.h"
 #include "SwapManager.h"
+#include "io/BufferStream.h"
 
 using namespace std::literals::chrono_literals;
 
@@ -73,10 +74,6 @@ class TestRepositoryBase : public T_BaseRepository {
     return true;
   }
 
-  bool Serialize(const std::string &key, const uint8_t *buffer, const size_t bufferSize) override {
-    return Put(key, buffer, bufferSize);
-  }
-
   bool Delete(const std::string& key) override {
     std::lock_guard<std::mutex> lock{repository_results_mutex_};
     repository_results_.erase(key);
@@ -94,11 +91,7 @@ class TestRepositoryBase : public T_BaseRepository {
     }
   }
 
-  bool Serialize(std::vector<std::shared_ptr<org::apache::nifi::minifi::core::SerializableComponent>>& /*store*/, size_t /*max_size*/) override {
-    return false;
-  }
-
-  bool DeSerialize(std::vector<std::shared_ptr<org::apache::nifi::minifi::core::SerializableComponent>> &store, size_t &max_size) override {
+  bool getElements(std::vector<std::shared_ptr<org::apache::nifi::minifi::core::SerializableComponent>> &store, size_t &max_size) override {
     std::lock_guard<std::mutex> lock{repository_results_mutex_};
     max_size = 0;
     for (const auto &entry : repository_results_) {
@@ -106,27 +99,20 @@ class TestRepositoryBase : public T_BaseRepository {
         break;
       }
       const auto eventRead = store.at(max_size);
-      eventRead->DeSerialize(gsl::make_span(entry.second).template as_span<const std::byte>());
+      org::apache::nifi::minifi::io::BufferStream stream(gsl::make_span(entry.second).template as_span<const std::byte>());
+      eventRead->deserialize(stream);
       ++max_size;
     }
     return true;
   }
 
-  bool Serialize(const std::shared_ptr<org::apache::nifi::minifi::core::SerializableComponent>& /*store*/) override {
-    return false;
-  }
-
-  bool DeSerialize(const std::shared_ptr<org::apache::nifi::minifi::core::SerializableComponent> &store) override {
-    std::string value;
-    Get(store->getUUIDStr(), value);
-    store->DeSerialize(gsl::make_span(value).as_span<const std::byte>());
+  bool storeElement(const std::shared_ptr<org::apache::nifi::minifi::core::SerializableComponent> element) override {
+    org::apache::nifi::minifi::io::BufferStream outStream;
+    element->serialize(outStream);
+    Put(element->getUUIDStr(), const_cast<uint8_t*>(outStream.getBuffer().as_span<const uint8_t>().data()), outStream.size());
     return true;
   }
 
-  bool DeSerialize(gsl::span<const std::byte>) override {
-    return false;
-  }
-
   std::map<std::string, std::string> getRepoMap() const {
     std::lock_guard<std::mutex> lock{repository_results_mutex_};
     return repository_results_;
@@ -199,9 +185,6 @@ class TestFlowRepository : public org::apache::nifi::minifi::core::ThreadedRepos
     }
   }
 
-  void loadComponent(const std::shared_ptr<org::apache::nifi::minifi::core::ContentRepository>& /*content_repo*/) override {
-  }
-
  private:
   void run() override {
   }