You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2022/05/03 15:06:04 UTC

[nifi-minifi-cpp] 01/03: MINIFICPP-1717 Refactor *StreamCallback to be function objects

This is an automated email from the ASF dual-hosted git repository.

martinzink pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit 3070b855038a6f1ccd58b6537f4f5580681d3dd9
Author: Marton Szasz <sz...@apache.org>
AuthorDate: Tue May 3 14:44:53 2022 +0200

    MINIFICPP-1717 Refactor *StreamCallback to be function objects
    
    Closes #1236
    Signed-off-by: Martin Zink <ma...@apache.org>
---
 extensions/aws/processors/FetchS3Object.cpp        | 28 ++++---
 extensions/aws/processors/FetchS3Object.h          | 21 ------
 extensions/aws/processors/PutS3Object.cpp          |  2 +-
 extensions/aws/processors/PutS3Object.h            |  4 +-
 .../azure/processors/FetchAzureBlobStorage.cpp     | 40 +++-------
 .../azure/processors/FetchAzureDataLakeStorage.cpp | 13 +++-
 .../azure/processors/FetchAzureDataLakeStorage.h   | 28 -------
 .../azure/processors/PutAzureBlobStorage.cpp       |  2 +-
 extensions/azure/processors/PutAzureBlobStorage.h  |  4 +-
 .../azure/processors/PutAzureDataLakeStorage.cpp   |  4 +-
 .../azure/processors/PutAzureDataLakeStorage.h     |  6 +-
 extensions/azure/storage/AzureBlobStorage.cpp      |  2 +-
 extensions/azure/storage/AzureDataLakeStorage.cpp  |  2 +-
 extensions/bustache/ApplyTemplate.cpp              | 58 ++++++---------
 extensions/bustache/ApplyTemplate.h                |  5 +-
 extensions/civetweb/processors/ListenHTTP.cpp      | 15 +---
 extensions/civetweb/processors/ListenHTTP.h        | 10 ---
 extensions/gcp/processors/PutGCSObject.cpp         |  6 +-
 extensions/http-curl/client/HTTPCallback.h         |  6 +-
 extensions/http-curl/processors/InvokeHTTP.cpp     |  8 +-
 extensions/http-curl/protocols/RESTSender.cpp      |  6 +-
 .../http-curl/tests/HttpPostIntegrationTest.cpp    |  1 -
 extensions/jni/jvm/JniProcessSession.cpp           | 29 +++-----
 extensions/jni/jvm/JniReferenceObjects.h           | 21 +-----
 extensions/libarchive/CompressContent.cpp          | 14 ++--
 extensions/libarchive/CompressContent.h            | 62 ++++++----------
 extensions/libarchive/FocusArchiveEntry.cpp        |  9 +--
 extensions/libarchive/FocusArchiveEntry.h          |  5 +-
 extensions/libarchive/ManipulateArchive.cpp        |  7 +-
 extensions/libarchive/MergeContent.cpp             | 11 +--
 extensions/libarchive/MergeContent.h               | 12 +--
 extensions/libarchive/UnfocusArchiveEntry.cpp      |  4 +-
 extensions/libarchive/UnfocusArchiveEntry.h        |  4 +-
 extensions/librdkafka/ConsumeKafka.cpp             | 10 +--
 extensions/librdkafka/ConsumeKafka.h               | 11 ---
 extensions/librdkafka/PublishKafka.cpp             |  8 +-
 extensions/mqtt/processors/ConsumeMQTT.cpp         | 17 ++++-
 extensions/mqtt/processors/ConsumeMQTT.h           | 22 ------
 extensions/mqtt/processors/ConvertHeartBeat.cpp    |  2 +-
 extensions/mqtt/processors/ConvertJSONAck.cpp      | 17 ++---
 extensions/mqtt/processors/ConvertJSONAck.h        |  1 -
 extensions/mqtt/processors/PublishMQTT.cpp         |  2 +-
 extensions/mqtt/processors/PublishMQTT.h           |  5 +-
 extensions/opc/include/fetchopc.h                  | 13 +---
 extensions/opc/src/fetchopc.cpp                    |  5 +-
 extensions/opc/src/putopc.cpp                      |  1 +
 extensions/opencv/CaptureRTSPFrame.cpp             |  9 ++-
 extensions/opencv/CaptureRTSPFrame.h               | 32 +-------
 extensions/opencv/FrameIO.h                        | 79 --------------------
 extensions/opencv/MotionDetector.cpp               | 22 ++++--
 .../SourceInitiatedSubscriptionListener.cpp        | 25 +------
 .../SourceInitiatedSubscriptionListener.h          | 23 +-----
 extensions/pdh/PerformanceDataMonitor.cpp          |  4 +-
 extensions/procfs/processors/ProcFsMonitor.cpp     |  4 +-
 extensions/script/lua/LuaProcessSession.cpp        | 12 ++-
 extensions/script/lua/LuaProcessSession.h          | 32 --------
 extensions/script/python/PyProcessSession.cpp      | 10 ++-
 extensions/script/python/PyProcessSession.h        | 30 --------
 extensions/sensors/GetEnvironmentalSensors.cpp     |  3 +-
 extensions/sensors/GetMovementSensors.cpp          |  3 +-
 extensions/sensors/SensorBase.h                    | 21 ------
 extensions/sftp/processors/FetchSFTP.cpp           | 23 ++----
 extensions/sftp/processors/FetchSFTP.h             | 13 ----
 extensions/sftp/processors/PutSFTP.cpp             | 31 +++-----
 extensions/sftp/processors/PutSFTP.h               | 15 ----
 extensions/splunk/PutSplunkHTTP.cpp                |  7 +-
 extensions/sql/processors/FlowFileSource.cpp       |  3 +-
 .../processors/AttributesToJSON.cpp                |  3 +-
 .../processors/AttributesToJSON.h                  | 11 ---
 .../processors/DefragmentText.cpp                  | 37 ++--------
 .../processors/ExecuteProcess.cpp                  | 14 ++--
 .../processors/ExecuteProcess.h                    | 17 -----
 .../standard-processors/processors/ExtractText.cpp | 14 ++--
 .../standard-processors/processors/ExtractText.h   |  6 +-
 .../processors/GenerateFlowFile.cpp                |  6 +-
 .../processors/GenerateFlowFile.h                  | 13 ----
 .../standard-processors/processors/GetFile.cpp     |  3 +-
 .../standard-processors/processors/GetTCP.cpp      |  4 +-
 extensions/standard-processors/processors/GetTCP.h | 19 -----
 .../standard-processors/processors/HashContent.cpp | 27 +++----
 .../standard-processors/processors/HashContent.h   | 11 ---
 .../processors/LogAttribute.cpp                    |  7 +-
 .../standard-processors/processors/LogAttribute.h  | 19 -----
 .../standard-processors/processors/PutFile.cpp     |  4 +-
 .../standard-processors/processors/PutFile.h       |  6 +-
 .../standard-processors/processors/ReplaceText.cpp | 36 +--------
 .../standard-processors/processors/RouteText.cpp   |  6 +-
 .../standard-processors/processors/TailFile.cpp    | 12 +--
 .../tests/unit/DefragmentTextTests.cpp             | 16 +---
 extensions/usb-camera/GetUSBCamera.cpp             | 39 +++-------
 extensions/usb-camera/GetUSBCamera.h               | 15 +---
 .../CollectorInitiatedSubscription.cpp             | 18 +----
 .../windows-event-log/ConsumeWindowsEventLog.cpp   | 18 +----
 libminifi/include/core/ProcessSession.h            | 19 ++---
 .../include/core/ProcessSessionReadCallback.h      |  4 +-
 .../io/StreamCallback.h}                           | 27 +++----
 libminifi/include/io/StreamPipe.h                  | 86 +++++-----------------
 .../include/serialization/FlowFileSerializer.h     |  5 +-
 libminifi/include/sitetosite/SiteToSiteClient.h    | 54 --------------
 libminifi/include/utils/ByteArrayCallback.h        | 17 ++---
 libminifi/include/utils/FileReaderCallback.h       |  8 +-
 libminifi/include/utils/HTTPClient.h               |  2 +-
 libminifi/include/utils/JsonCallback.h             | 14 ++--
 .../utils/LineByLineInputOutputStreamCallback.h    |  4 +-
 libminifi/include/utils/StringUtils.h              |  1 +
 libminifi/src/c2/C2Agent.cpp                       |  2 +-
 libminifi/src/core/ProcessSession.cpp              | 74 +++++++++----------
 libminifi/src/core/ProcessSessionReadCallback.cpp  |  2 +-
 .../src/serialization/FlowFileV3Serializer.cpp     |  3 +-
 libminifi/src/serialization/PayloadSerializer.cpp  |  3 +-
 libminifi/src/sitetosite/SiteToSiteClient.cpp      | 13 +++-
 libminifi/src/utils/ByteArrayCallback.cpp          |  4 +-
 libminifi/src/utils/FileReaderCallback.cpp         | 29 ++++----
 .../utils/LineByLineInputOutputStreamCallback.cpp  |  2 +-
 libminifi/test/ReadFromFlowFileTestProcessor.cpp   | 21 +-----
 libminifi/test/TestBase.cpp                        |  2 +-
 libminifi/test/WriteToFlowFileTestProcessor.cpp    | 19 +----
 .../test/archive-tests/CompressContentTests.cpp    |  6 +-
 libminifi/test/archive-tests/MergeFileTests.cpp    | 42 +++++------
 .../test/persistence-tests/PersistenceTests.cpp    |  3 +-
 .../test/unit/ContentRepositoryDependentTests.h    | 34 +++------
 libminifi/test/unit/FlowFileSerializationTests.cpp |  8 +-
 .../LineByLineInputOutputStreamCallbackTests.cpp   |  6 +-
 libminifi/test/unit/LoggerTests.cpp                |  2 +-
 124 files changed, 507 insertions(+), 1354 deletions(-)

diff --git a/extensions/aws/processors/FetchS3Object.cpp b/extensions/aws/processors/FetchS3Object.cpp
index e6c7abe2e..af93ebe55 100644
--- a/extensions/aws/processors/FetchS3Object.cpp
+++ b/extensions/aws/processors/FetchS3Object.cpp
@@ -26,6 +26,7 @@
 #include "core/ProcessContext.h"
 #include "core/ProcessSession.h"
 #include "core/Resource.h"
+#include "utils/OptionalUtils.h"
 
 namespace org {
 namespace apache {
@@ -112,10 +113,13 @@ void FetchS3Object::onTrigger(const std::shared_ptr<core::ProcessContext> &conte
     return;
   }
 
-  WriteCallback callback(*get_object_params, s3_wrapper_);
-  session->write(flow_file, &callback);
+  std::optional<minifi::aws::s3::GetObjectResult> result;
+  session->write(flow_file, [&get_object_params, &result, this](const std::shared_ptr<io::BaseStream>& stream) -> int64_t {
+    result = s3_wrapper_.getObject(*get_object_params, *stream);
+    return (result | minifi::utils::map(&s3::GetObjectResult::write_size)).value_or(0);
+  });
 
-  if (callback.result_) {
+  if (result) {
     auto putAttributeIfNotEmpty = [&](const std::string& attribute, const std::string& value) {
       if (!value.empty()) {
         session->putAttribute(flow_file, attribute, value);
@@ -124,15 +128,15 @@ void FetchS3Object::onTrigger(const std::shared_ptr<core::ProcessContext> &conte
 
     logger_->log_debug("Successfully fetched S3 object %s from bucket %s", get_object_params->object_key, get_object_params->bucket);
     session->putAttribute(flow_file, "s3.bucket", get_object_params->bucket);
-    session->putAttribute(flow_file, core::SpecialFlowAttribute::PATH, callback.result_->path);
-    session->putAttribute(flow_file, core::SpecialFlowAttribute::ABSOLUTE_PATH, callback.result_->absolute_path);
-    session->putAttribute(flow_file, core::SpecialFlowAttribute::FILENAME, callback.result_->filename);
-    putAttributeIfNotEmpty(core::SpecialFlowAttribute::MIME_TYPE, callback.result_->mime_type);
-    putAttributeIfNotEmpty("s3.etag", callback.result_->etag);
-    putAttributeIfNotEmpty("s3.expirationTime", callback.result_->expiration.expiration_time);
-    putAttributeIfNotEmpty("s3.expirationTimeRuleId", callback.result_->expiration.expiration_time_rule_id);
-    putAttributeIfNotEmpty("s3.sseAlgorithm", callback.result_->ssealgorithm);
-    putAttributeIfNotEmpty("s3.version", callback.result_->version);
+    session->putAttribute(flow_file, core::SpecialFlowAttribute::PATH, result->path);
+    session->putAttribute(flow_file, core::SpecialFlowAttribute::ABSOLUTE_PATH, result->absolute_path);
+    session->putAttribute(flow_file, core::SpecialFlowAttribute::FILENAME, result->filename);
+    putAttributeIfNotEmpty(core::SpecialFlowAttribute::MIME_TYPE, result->mime_type);
+    putAttributeIfNotEmpty("s3.etag", result->etag);
+    putAttributeIfNotEmpty("s3.expirationTime", result->expiration.expiration_time);
+    putAttributeIfNotEmpty("s3.expirationTimeRuleId", result->expiration.expiration_time_rule_id);
+    putAttributeIfNotEmpty("s3.sseAlgorithm", result->ssealgorithm);
+    putAttributeIfNotEmpty("s3.version", result->version);
     session->transfer(flow_file, Success);
   } else {
     logger_->log_error("Failed to fetch S3 object %s from bucket %s", get_object_params->object_key, get_object_params->bucket);
diff --git a/extensions/aws/processors/FetchS3Object.h b/extensions/aws/processors/FetchS3Object.h
index 4a2411d7c..4c16e5e71 100644
--- a/extensions/aws/processors/FetchS3Object.h
+++ b/extensions/aws/processors/FetchS3Object.h
@@ -64,27 +64,6 @@ class FetchS3Object : public S3Processor {
   void onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) override;
   void onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) override;
 
-  class WriteCallback : public OutputStreamCallback {
-   public:
-    WriteCallback(const minifi::aws::s3::GetObjectRequestParameters& get_object_params, aws::s3::S3Wrapper& s3_wrapper)
-      : get_object_params_(get_object_params),
-        s3_wrapper_(s3_wrapper) {
-    }
-
-    int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
-      result_ = s3_wrapper_.getObject(get_object_params_, *stream);
-      if (!result_) {
-        return 0;
-      }
-
-      return result_->write_size;
-    }
-
-    const minifi::aws::s3::GetObjectRequestParameters& get_object_params_;
-    aws::s3::S3Wrapper& s3_wrapper_;
-    std::optional<minifi::aws::s3::GetObjectResult> result_;
-  };
-
  private:
   core::annotation::Input getInputRequirement() const override {
     return core::annotation::Input::INPUT_REQUIRED;
diff --git a/extensions/aws/processors/PutS3Object.cpp b/extensions/aws/processors/PutS3Object.cpp
index 4027b5a6a..09b9ba337 100644
--- a/extensions/aws/processors/PutS3Object.cpp
+++ b/extensions/aws/processors/PutS3Object.cpp
@@ -281,7 +281,7 @@ void PutS3Object::onTrigger(const std::shared_ptr<core::ProcessContext> &context
   }
 
   PutS3Object::ReadCallback callback(flow_file->getSize(), *put_s3_request_params, s3_wrapper_);
-  session->read(flow_file, &callback);
+  session->read(flow_file, std::ref(callback));
   if (!callback.result_.has_value()) {
     logger_->log_error("Failed to upload S3 object to bucket '%s'", put_s3_request_params->bucket);
     session->transfer(flow_file, Failure);
diff --git a/extensions/aws/processors/PutS3Object.h b/extensions/aws/processors/PutS3Object.h
index 0ba71b187..2d1575502 100644
--- a/extensions/aws/processors/PutS3Object.h
+++ b/extensions/aws/processors/PutS3Object.h
@@ -78,7 +78,7 @@ class PutS3Object : public S3Processor {
   void onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) override;
   void onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) override;
 
-  class ReadCallback : public InputStreamCallback {
+  class ReadCallback {
    public:
     static const uint64_t MAX_SIZE;
     static const uint64_t BUFFER_SIZE;
@@ -89,7 +89,7 @@ class PutS3Object : public S3Processor {
       , s3_wrapper_(s3_wrapper) {
     }
 
-    int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
+    int64_t operator()(const std::shared_ptr<io::BaseStream>& stream) {
       if (flow_size_ > MAX_SIZE) {
         return -1;
       }
diff --git a/extensions/azure/processors/FetchAzureBlobStorage.cpp b/extensions/azure/processors/FetchAzureBlobStorage.cpp
index 31f987db1..5359ce599 100644
--- a/extensions/azure/processors/FetchAzureBlobStorage.cpp
+++ b/extensions/azure/processors/FetchAzureBlobStorage.cpp
@@ -42,34 +42,6 @@ const core::Property FetchAzureBlobStorage::RangeLength(
 const core::Relationship FetchAzureBlobStorage::Success("success", "All successfully processed FlowFiles are routed to this relationship");
 const core::Relationship FetchAzureBlobStorage::Failure("failure", "Unsuccessful operations will be transferred to the failure relationship");
 
-namespace {
-class WriteCallback : public OutputStreamCallback {
- public:
-  WriteCallback(storage::AzureBlobStorage& azure_blob_storage, const storage::FetchAzureBlobStorageParameters& params)
-    : azure_blob_storage_(azure_blob_storage),
-      params_(params) {
-  }
-
-  int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
-    result_size_ = azure_blob_storage_.fetchBlob(params_, *stream);
-    if (!result_size_) {
-      return 0;
-    }
-
-    return gsl::narrow<int64_t>(*result_size_);
-  }
-
-  [[nodiscard]] auto getResult() const {
-    return result_size_;
-  }
-
- private:
-  storage::AzureBlobStorage& azure_blob_storage_;
-  const storage::FetchAzureBlobStorageParameters& params_;
-  std::optional<uint64_t> result_size_ = std::nullopt;
-};
-}  // namespace
-
 void FetchAzureBlobStorage::initialize() {
   setSupportedProperties({
     AzureStorageCredentialsService,
@@ -127,10 +99,16 @@ void FetchAzureBlobStorage::onTrigger(const std::shared_ptr<core::ProcessContext
   }
 
   auto fetched_flow_file = session->create(flow_file);
-  WriteCallback callback(azure_blob_storage_, *params);
-  session->write(fetched_flow_file, &callback);
+  std::optional<int64_t> result_size;
+  session->write(fetched_flow_file, [&, this](const std::shared_ptr<io::BaseStream>& stream) -> int64_t {
+    result_size = azure_blob_storage_.fetchBlob(*params, *stream);
+    if (!result_size) {
+      return 0;
+    }
+    return gsl::narrow<int64_t>(*result_size);
+  });
 
-  if (callback.getResult() == std::nullopt) {
+  if (result_size == std::nullopt) {
     logger_->log_error("Failed to fetch blob '%s' from Azure Blob storage", params->blob_name);
     session->transfer(flow_file, Failure);
     session->remove(fetched_flow_file);
diff --git a/extensions/azure/processors/FetchAzureDataLakeStorage.cpp b/extensions/azure/processors/FetchAzureDataLakeStorage.cpp
index 007147770..c27eb19ab 100644
--- a/extensions/azure/processors/FetchAzureDataLakeStorage.cpp
+++ b/extensions/azure/processors/FetchAzureDataLakeStorage.cpp
@@ -109,10 +109,17 @@ void FetchAzureDataLakeStorage::onTrigger(const std::shared_ptr<core::ProcessCon
   }
 
   auto fetched_flow_file = session->create(flow_file);
-  WriteCallback callback(azure_data_lake_storage_, *params, logger_);
-  session->write(fetched_flow_file, &callback);
+  std::optional<uint64_t> result;
+  session->write(fetched_flow_file, [&, this](const std::shared_ptr<io::BaseStream>& output_stream) -> int64_t {
+    result = azure_data_lake_storage_.fetchFile(*params, *output_stream);
+    if (!result) {
+      return 0;
+    }
+
+    return gsl::narrow<int64_t>(*result);
+  });
 
-  if (callback.getResult() == std::nullopt) {
+  if (result == std::nullopt) {
     logger_->log_error("Failed to fetch file '%s' from Azure Data Lake storage", params->filename);
     session->transfer(flow_file, Failure);
     session->remove(fetched_flow_file);
diff --git a/extensions/azure/processors/FetchAzureDataLakeStorage.h b/extensions/azure/processors/FetchAzureDataLakeStorage.h
index 3c2d08374..ec84b699b 100644
--- a/extensions/azure/processors/FetchAzureDataLakeStorage.h
+++ b/extensions/azure/processors/FetchAzureDataLakeStorage.h
@@ -55,34 +55,6 @@ class FetchAzureDataLakeStorage final : public AzureDataLakeStorageFileProcessor
  private:
   friend class ::AzureDataLakeStorageTestsFixture<FetchAzureDataLakeStorage>;
 
-  class WriteCallback : public OutputStreamCallback {
-   public:
-    WriteCallback(storage::AzureDataLakeStorage& azure_data_lake_storage, const storage::FetchAzureDataLakeStorageParameters& params, std::shared_ptr<core::logging::Logger> logger)
-      : azure_data_lake_storage_(azure_data_lake_storage),
-        params_(params),
-        logger_(std::move(logger)) {
-    }
-
-    int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
-      result_size_ = azure_data_lake_storage_.fetchFile(params_, *stream);
-      if (!result_size_) {
-        return 0;
-      }
-
-      return gsl::narrow<int64_t>(*result_size_);
-    }
-
-    auto getResult() const {
-      return result_size_;
-    }
-
-   private:
-    storage::AzureDataLakeStorage& azure_data_lake_storage_;
-    const storage::FetchAzureDataLakeStorageParameters& params_;
-    std::optional<uint64_t> result_size_ = std::nullopt;
-    std::shared_ptr<core::logging::Logger> logger_;
-  };
-
   core::annotation::Input getInputRequirement() const override {
     return core::annotation::Input::INPUT_REQUIRED;
   }
diff --git a/extensions/azure/processors/PutAzureBlobStorage.cpp b/extensions/azure/processors/PutAzureBlobStorage.cpp
index 2e741b953..5cb70767b 100644
--- a/extensions/azure/processors/PutAzureBlobStorage.cpp
+++ b/extensions/azure/processors/PutAzureBlobStorage.cpp
@@ -99,7 +99,7 @@ void PutAzureBlobStorage::onTrigger(const std::shared_ptr<core::ProcessContext>
     }
   }
   PutAzureBlobStorage::ReadCallback callback(flow_file->getSize(), azure_blob_storage_, *params);
-  session->read(flow_file, &callback);
+  session->read(flow_file, std::ref(callback));
   const std::optional<storage::UploadBlobResult> upload_result = callback.getResult();
 
   if (!upload_result) {
diff --git a/extensions/azure/processors/PutAzureBlobStorage.h b/extensions/azure/processors/PutAzureBlobStorage.h
index 52e0ca22e..1a25fbf7a 100644
--- a/extensions/azure/processors/PutAzureBlobStorage.h
+++ b/extensions/azure/processors/PutAzureBlobStorage.h
@@ -53,7 +53,7 @@ class PutAzureBlobStorage final : public AzureBlobStorageProcessorBase {
   void onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &session_factory) override;
   void onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) override;
 
-  class ReadCallback : public InputStreamCallback {
+  class ReadCallback {
    public:
     ReadCallback(uint64_t flow_size, storage::AzureBlobStorage& azure_blob_storage, const storage::PutAzureBlobStorageParameters& params)
       : flow_size_(flow_size)
@@ -61,7 +61,7 @@ class PutAzureBlobStorage final : public AzureBlobStorageProcessorBase {
       , params_(params) {
     }
 
-    int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
+    int64_t operator()(const std::shared_ptr<io::BaseStream>& stream) {
       std::vector<std::byte> buffer;
       buffer.resize(flow_size_);
       size_t read_ret = stream->read(buffer);
diff --git a/extensions/azure/processors/PutAzureDataLakeStorage.cpp b/extensions/azure/processors/PutAzureDataLakeStorage.cpp
index bf53088bd..7df24ec4a 100644
--- a/extensions/azure/processors/PutAzureDataLakeStorage.cpp
+++ b/extensions/azure/processors/PutAzureDataLakeStorage.cpp
@@ -100,7 +100,7 @@ void PutAzureDataLakeStorage::onTrigger(const std::shared_ptr<core::ProcessConte
   }
 
   PutAzureDataLakeStorage::ReadCallback callback(flow_file->getSize(), azure_data_lake_storage_, *params, logger_);
-  session->read(flow_file, &callback);
+  session->read(flow_file, std::ref(callback));
   const storage::UploadDataLakeStorageResult result = callback.getResult();
 
   if (result.result_code == storage::UploadResultCode::FILE_ALREADY_EXISTS) {
@@ -138,7 +138,7 @@ PutAzureDataLakeStorage::ReadCallback::ReadCallback(
     logger_(std::move(logger)) {
 }
 
-int64_t PutAzureDataLakeStorage::ReadCallback::process(const std::shared_ptr<io::BaseStream>& stream) {
+int64_t PutAzureDataLakeStorage::ReadCallback::operator()(const std::shared_ptr<io::BaseStream>& stream) {
   std::vector<std::byte> buffer;
   buffer.resize(flow_size_);
   size_t read_ret = stream->read(buffer);
diff --git a/extensions/azure/processors/PutAzureDataLakeStorage.h b/extensions/azure/processors/PutAzureDataLakeStorage.h
index 4dddcf6cd..d6d312892 100644
--- a/extensions/azure/processors/PutAzureDataLakeStorage.h
+++ b/extensions/azure/processors/PutAzureDataLakeStorage.h
@@ -61,12 +61,12 @@ class PutAzureDataLakeStorage final : public AzureDataLakeStorageFileProcessorBa
  private:
   friend class ::AzureDataLakeStorageTestsFixture<PutAzureDataLakeStorage>;
 
-  class ReadCallback : public InputStreamCallback {
+  class ReadCallback {
    public:
     ReadCallback(uint64_t flow_size, storage::AzureDataLakeStorage& azure_data_lake_storage, const storage::PutAzureDataLakeStorageParameters& params, std::shared_ptr<core::logging::Logger> logger);
-    int64_t process(const std::shared_ptr<io::BaseStream>& stream) override;
+    int64_t operator()(const std::shared_ptr<io::BaseStream>& stream);
 
-    storage::UploadDataLakeStorageResult getResult() const {
+    [[nodiscard]] storage::UploadDataLakeStorageResult getResult() const {
       return result_;
     }
 
diff --git a/extensions/azure/storage/AzureBlobStorage.cpp b/extensions/azure/storage/AzureBlobStorage.cpp
index fb00421ff..14e26b959 100644
--- a/extensions/azure/storage/AzureBlobStorage.cpp
+++ b/extensions/azure/storage/AzureBlobStorage.cpp
@@ -78,7 +78,7 @@ bool AzureBlobStorage::deleteBlob(const DeleteAzureBlobStorageParameters& params
 std::optional<uint64_t> AzureBlobStorage::fetchBlob(const FetchAzureBlobStorageParameters& params, io::BaseStream& stream) {
   try {
     auto fetch_res = blob_storage_client_->fetchBlob(params);
-    return internal::pipe(fetch_res.get(), &stream);
+    return internal::pipe(*fetch_res, stream);
   } catch (const std::exception& ex) {
     logger_->log_error("An exception occurred while fetching blob '%s' of container '%s': %s", params.blob_name, params.container_name, ex.what());
     return std::nullopt;
diff --git a/extensions/azure/storage/AzureDataLakeStorage.cpp b/extensions/azure/storage/AzureDataLakeStorage.cpp
index 0ba2aeb73..1546b8f05 100644
--- a/extensions/azure/storage/AzureDataLakeStorage.cpp
+++ b/extensions/azure/storage/AzureDataLakeStorage.cpp
@@ -95,7 +95,7 @@ bool AzureDataLakeStorage::deleteFile(const DeleteAzureDataLakeStorageParameters
 std::optional<uint64_t> AzureDataLakeStorage::fetchFile(const FetchAzureDataLakeStorageParameters& params, io::BaseStream& stream) {
   try {
     auto result = data_lake_storage_client_->fetchFile(params);
-    return internal::pipe(result.get(), &stream);
+    return internal::pipe(*result, stream);
   } catch (const std::exception& ex) {
     logger_->log_error("An exception occurred while fetching '%s/%s' of filesystem '%s': %s", params.directory_name, params.filename, params.file_system_name, ex.what());
     return std::nullopt;
diff --git a/extensions/bustache/ApplyTemplate.cpp b/extensions/bustache/ApplyTemplate.cpp
index 1107f8db3..4b2041566 100644
--- a/extensions/bustache/ApplyTemplate.cpp
+++ b/extensions/bustache/ApplyTemplate.cpp
@@ -34,41 +34,6 @@ namespace org::apache::nifi::minifi::processors {
 const core::Property ApplyTemplate::Template("Template", "Path to the input mustache template file", "");
 const core::Relationship ApplyTemplate::Success("success", "success operational on the flow record");
 
-namespace {
-class WriteCallback : public OutputStreamCallback {
- public:
-  WriteCallback(std::filesystem::path templateFile, const core::FlowFile& flow_file)
-      :template_file_{std::move(templateFile)}, flow_file_{flow_file}
-  {}
-  int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
-    logger_->log_info("ApplyTemplate reading template file from %s", template_file_);
-    // TODO(szaszm): we might want to return to memory-mapped input files when the next todo is done. Until then, the agents stores the whole result in memory anyway, so no point in not doing the same
-    // with the template file itself
-    const auto template_file_contents = [this] {
-      std::ifstream ifs{template_file_};
-      return std::string{std::istreambuf_iterator<char>{ifs}, std::istreambuf_iterator<char>{}};
-    }();
-
-    bustache::format format(template_file_contents);
-    bustache::object data;
-
-    for (const auto &attr : flow_file_.getAttributes()) {
-      data[attr.first] = attr.second;
-    }
-
-    // TODO(calebj) write ostream reciever for format() to prevent excessive copying
-    std::string ostring = to_string(format(data));
-    stream->write(gsl::make_span(ostring).as_span<const std::byte>());
-    return gsl::narrow<int64_t>(ostring.length());
-  }
-
- private:
-  std::shared_ptr<core::logging::Logger> logger_ = core::logging::LoggerFactory<ApplyTemplate>::getLogger();
-  std::filesystem::path template_file_;
-  const core::FlowFile& flow_file_;
-};
-}  // namespace
-
 void ApplyTemplate::initialize() {
   setSupportedProperties({Template});
   setSupportedRelationships({Success});
@@ -83,8 +48,27 @@ void ApplyTemplate::onTrigger(const std::shared_ptr<core::ProcessContext> &conte
 
   std::string template_file;
   context->getProperty(Template, template_file, flow_file);
-  WriteCallback cb(template_file, *flow_file);
-  session->write(flow_file, &cb);
+  session->write(flow_file, [&template_file, &flow_file, this](const auto& output_stream) {
+    logger_->log_info("ApplyTemplate reading template file from %s", template_file);
+    // TODO(szaszm): we might want to return to memory-mapped input files when the next todo is done. Until then, the agents stores the whole result in memory anyway, so no point in not doing the same
+    // with the template file itself
+    const auto template_file_contents = [&] {
+      std::ifstream ifs{template_file};
+      return std::string{std::istreambuf_iterator<char>{ifs}, std::istreambuf_iterator<char>{}};
+    }();
+
+    bustache::format format(template_file_contents);
+    bustache::object data;
+
+    for (const auto &attr : flow_file->getAttributes()) {
+      data[attr.first] = attr.second;
+    }
+
+    // TODO(calebj) write ostream reciever for format() to prevent excessive copying
+    std::string ostring = to_string(format(data));
+    output_stream->write(gsl::make_span(ostring).as_span<const std::byte>());
+    return gsl::narrow<int64_t>(ostring.length());
+  });
   session->transfer(flow_file, Success);
 }
 
diff --git a/extensions/bustache/ApplyTemplate.h b/extensions/bustache/ApplyTemplate.h
index 5cc1d5a63..e0033172b 100644
--- a/extensions/bustache/ApplyTemplate.h
+++ b/extensions/bustache/ApplyTemplate.h
@@ -35,16 +35,15 @@ class ApplyTemplate : public core::Processor {
  public:
   explicit ApplyTemplate(const std::string& name, const utils::Identifier& uuid = {})
       : Processor(name, uuid) {}
-  ~ApplyTemplate() override = default;
   static constexpr char const *ProcessorName = "ApplyTemplate";
 
   static const core::Property Template;
 
   static const core::Relationship Success;
 
-  void onTrigger(const std::shared_ptr<core::ProcessContext> &context,
-                 const std::shared_ptr<core::ProcessSession> &session) override;
+  void onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) override;
   void initialize() override;
+
  private:
   std::shared_ptr<core::logging::Logger> logger_ = core::logging::LoggerFactory<ApplyTemplate>::getLogger();
 };
diff --git a/extensions/civetweb/processors/ListenHTTP.cpp b/extensions/civetweb/processors/ListenHTTP.cpp
index d88f0082a..52a0ae55e 100644
--- a/extensions/civetweb/processors/ListenHTTP.cpp
+++ b/extensions/civetweb/processors/ListenHTTP.cpp
@@ -281,8 +281,10 @@ void ListenHTTP::processRequestBuffer(core::ProcessSession *session) {
     session->add(flow_file);
 
     if (flow_file_buffer_pair.second) {
-      WriteCallback callback(std::move(flow_file_buffer_pair.second));
-      session->write(flow_file, &callback);
+      session->write(flow_file, [request_content = flow_file_buffer_pair.second.get()](const std::shared_ptr<io::BaseStream>& stream) -> int64_t {
+        const auto write_ret = stream->write(request_content->getBuffer());
+        return io::isError(write_ret) ? -1 : gsl::narrow<int64_t>(write_ret);
+      });
     }
 
     session->transfer(flow_file, Success);
@@ -504,15 +506,6 @@ std::unique_ptr<io::BufferStream> ListenHTTP::Handler::createContentBuffer(struc
   return content_buffer;
 }
 
-ListenHTTP::WriteCallback::WriteCallback(std::unique_ptr<io::BufferStream> request_content)
-    : request_content_(std::move(request_content)) {
-}
-
-int64_t ListenHTTP::WriteCallback::process(const std::shared_ptr<io::BaseStream>& stream) {
-  const auto write_ret = stream->write(request_content_->getBuffer());
-  return io::isError(write_ret) ? -1 : gsl::narrow<int64_t>(write_ret);
-}
-
 bool ListenHTTP::isSecure() const {
   return (listeningPort.length() > 0) && *listeningPort.rbegin() == 's';
 }
diff --git a/extensions/civetweb/processors/ListenHTTP.h b/extensions/civetweb/processors/ListenHTTP.h
index 76c9e2c32..683fc0dc8 100644
--- a/extensions/civetweb/processors/ListenHTTP.h
+++ b/extensions/civetweb/processors/ListenHTTP.h
@@ -126,16 +126,6 @@ class ListenHTTP : public core::Processor {
     utils::ConcurrentQueue<FlowFileBufferPair> request_buffer_;
   };
 
-  // Write callback for transferring data from HTTP request to content repo
-  class WriteCallback : public OutputStreamCallback {
-   public:
-    explicit WriteCallback(std::unique_ptr<io::BufferStream>);
-    int64_t process(const std::shared_ptr<io::BaseStream>& stream) override;
-
-   private:
-    std::unique_ptr<io::BufferStream> request_content_;
-  };
-
   static int logMessage(const struct mg_connection *conn, const char *message) {
     try {
       struct mg_context* ctx = mg_get_context(conn);
diff --git a/extensions/gcp/processors/PutGCSObject.cpp b/extensions/gcp/processors/PutGCSObject.cpp
index 9fadfedfe..00a3d0c26 100644
--- a/extensions/gcp/processors/PutGCSObject.cpp
+++ b/extensions/gcp/processors/PutGCSObject.cpp
@@ -114,7 +114,7 @@ const core::Relationship PutGCSObject::Failure("failure", "Files that could not
 
 
 namespace {
-class UploadToGCSCallback : public InputStreamCallback {
+class UploadToGCSCallback {
  public:
   UploadToGCSCallback(gcs::Client& client, std::string bucket, std::string key)
       : bucket_(std::move(bucket)),
@@ -122,7 +122,7 @@ class UploadToGCSCallback : public InputStreamCallback {
         client_(client) {
   }
 
-  int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
+  int64_t operator()(const std::shared_ptr<io::BaseStream>& stream) {
     std::string content;
     content.resize(stream->size());
     const auto read_ret = stream->read(gsl::make_span(content).as_span<std::byte>());
@@ -283,7 +283,7 @@ void PutGCSObject::onTrigger(const std::shared_ptr<core::ProcessContext>& contex
 
   callback.setEncryptionKey(encryption_key_);
 
-  session->read(flow_file, &callback);
+  session->read(flow_file, std::ref(callback));
   auto& result = callback.getResult();
   if (!result.ok()) {
     flow_file->setAttribute(GCS_ERROR_REASON, result.status().error_info().reason());
diff --git a/extensions/http-curl/client/HTTPCallback.h b/extensions/http-curl/client/HTTPCallback.h
index 83df0b31f..c05ad61d1 100644
--- a/extensions/http-curl/client/HTTPCallback.h
+++ b/extensions/http-curl/client/HTTPCallback.h
@@ -53,7 +53,7 @@ namespace utils {
  *  - because of this, all functions that request data at a specific offset are implicit seeks and potentially modify
  *    the current buffer
  */
-class HttpStreamingCallback : public ByteInputCallBack {
+class HttpStreamingCallback final : public ByteInputCallback {
  public:
   HttpStreamingCallback()
       : is_alive_(true),
@@ -78,7 +78,7 @@ class HttpStreamingCallback : public ByteInputCallBack {
     seekInner(lock, pos);
   }
 
-  int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
+  int64_t operator()(const std::shared_ptr<io::BaseStream>& stream) override {
     std::vector<std::byte> vec;
 
     if (stream->size() > 0) {
@@ -89,7 +89,7 @@ class HttpStreamingCallback : public ByteInputCallBack {
     return processInner(std::move(vec));
   }
 
-  virtual int64_t process(const uint8_t* data, size_t size) {
+  int64_t process(const uint8_t* data, size_t size) {
     std::vector<std::byte> vec;
     vec.resize(size);
     memcpy(vec.data(), data, size);
diff --git a/extensions/http-curl/processors/InvokeHTTP.cpp b/extensions/http-curl/processors/InvokeHTTP.cpp
index 1604ef288..3a4516883 100644
--- a/extensions/http-curl/processors/InvokeHTTP.cpp
+++ b/extensions/http-curl/processors/InvokeHTTP.cpp
@@ -294,7 +294,7 @@ void InvokeHTTP::onTrigger(const std::shared_ptr<core::ProcessContext> &context,
   std::string tx_id = generateId();
 
   // Note: callback must be declared before callbackObj so that they are destructed in the correct order
-  std::unique_ptr<utils::ByteInputCallBack> callback = nullptr;
+  std::unique_ptr<utils::ByteInputCallback> callback = nullptr;
   std::unique_ptr<utils::HTTPUploadCallback> callbackObj = nullptr;
 
   // Client declared after the callbacks to make sure the callbacks are still available when the client is destructed
@@ -324,11 +324,11 @@ void InvokeHTTP::onTrigger(const std::shared_ptr<core::ProcessContext> &context,
     logger_->log_trace("InvokeHTTP -- reading flowfile");
     std::shared_ptr<ResourceClaim> claim = flowFile->getResourceClaim();
     if (claim) {
-      callback = std::unique_ptr<utils::ByteInputCallBack>(new utils::ByteInputCallBack());
+      callback = std::make_unique<utils::ByteInputCallback>();
       if (send_body_) {
-        session->read(flowFile, callback.get());
+        session->read(flowFile, std::ref(*callback));
       }
-      callbackObj = std::unique_ptr<utils::HTTPUploadCallback>(new utils::HTTPUploadCallback);
+      callbackObj = std::make_unique<utils::HTTPUploadCallback>();
       callbackObj->ptr = callback.get();
       callbackObj->pos = 0;
       logger_->log_trace("InvokeHTTP -- Setting callback, size is %d", callback->getBufferSize());
diff --git a/extensions/http-curl/protocols/RESTSender.cpp b/extensions/http-curl/protocols/RESTSender.cpp
index af15b4dbf..c94e1c0a4 100644
--- a/extensions/http-curl/protocols/RESTSender.cpp
+++ b/extensions/http-curl/protocols/RESTSender.cpp
@@ -89,7 +89,7 @@ C2Payload RESTSender::sendPayload(const std::string url, const Direction directi
   }
 
   // Callback for transmit. Declared in order to destruct in proper order - take care!
-  std::vector<std::unique_ptr<utils::ByteInputCallBack>> inputs;
+  std::vector<std::unique_ptr<utils::ByteInputCallback>> inputs;
   std::vector<std::unique_ptr<utils::HTTPUploadCallback>> callbacks;
 
   // Callback for transfer. Declared in order to destruct in proper order - take care!
@@ -113,7 +113,7 @@ C2Payload RESTSender::sendPayload(const std::string url, const Direction directi
         if (filename.empty()) {
           throw std::logic_error("Missing filename");
         }
-        auto file_input = std::make_unique<utils::ByteInputCallBack>();
+        auto file_input = std::make_unique<utils::ByteInputCallback>();
         auto file_cb = std::make_unique<utils::HTTPUploadCallback>();
         file_input->write(file.getRawDataAsString());
         file_cb->ptr = file_input.get();
@@ -122,7 +122,7 @@ C2Payload RESTSender::sendPayload(const std::string url, const Direction directi
         callbacks.push_back(std::move(file_cb));
       }
     } else {
-      auto data_input = std::make_unique<utils::ByteInputCallBack>();
+      auto data_input = std::make_unique<utils::ByteInputCallback>();
       auto data_cb = std::make_unique<utils::HTTPUploadCallback>();
       data_input->write(data.value_or(""));
       data_cb->ptr = data_input.get();
diff --git a/extensions/http-curl/tests/HttpPostIntegrationTest.cpp b/extensions/http-curl/tests/HttpPostIntegrationTest.cpp
index eb850f6fb..159aad707 100644
--- a/extensions/http-curl/tests/HttpPostIntegrationTest.cpp
+++ b/extensions/http-curl/tests/HttpPostIntegrationTest.cpp
@@ -49,7 +49,6 @@ class HttpTestHarness : public HTTPIntegrationBase {
     LogTestController::getInstance().setTrace<minifi::processors::InvokeHTTP>();
     LogTestController::getInstance().setDebug<utils::HTTPClient>();
     LogTestController::getInstance().setDebug<minifi::processors::ListenHTTP>();
-    LogTestController::getInstance().setDebug<minifi::processors::ListenHTTP::WriteCallback>();
     LogTestController::getInstance().setDebug<minifi::processors::ListenHTTP::Handler>();
     LogTestController::getInstance().setDebug<minifi::processors::LogAttribute>();
     LogTestController::getInstance().setDebug<core::Processor>();
diff --git a/extensions/jni/jvm/JniProcessSession.cpp b/extensions/jni/jvm/JniProcessSession.cpp
index a82d44e36..5f4f0105c 100644
--- a/extensions/jni/jvm/JniProcessSession.cpp
+++ b/extensions/jni/jvm/JniProcessSession.cpp
@@ -18,21 +18,16 @@
 
 #include "JniProcessSession.h"
 
-#include <string>
 #include <memory>
+#include <string>
 #include <algorithm>
-#include <iterator>
 #include <set>
 #include <utility>
-#include "core/Property.h"
-#include "io/validation.h"
 #include "utils/StringUtils.h"
 #include "utils/file/FileUtils.h"
-#include "properties/Configure.h"
 #include "JVMLoader.h"
 #include "JniReferenceObjects.h"
 
-#include "core/Processor.h"
 #include "JniFlowFile.h"
 #include "../JavaException.h"
 
@@ -76,21 +71,15 @@ JNIEXPORT jobject JNICALL Java_org_apache_nifi_processor_JniProcessSession_readF
   minifi::jni::JniFlowFile *ptr = minifi::jni::JVMLoader::getInstance()->getReference<minifi::jni::JniFlowFile>(env, ff);
   if (ptr->get()) {
     auto jincls = minifi::jni::JVMLoader::getInstance()->load_class("org/apache/nifi/processor/JniInputStream", env);
-
     auto jin = jincls.newInstance(env);
-
     minifi::jni::ThrowIf(env);
 
-    std::unique_ptr<minifi::jni::JniByteInputStream> callback = std::unique_ptr<minifi::jni::JniByteInputStream>(new minifi::jni::JniByteInputStream(4096));
-
-    session->getSession()->read(ptr->get(), callback.get());
-
+    auto callback = std::make_unique<minifi::jni::JniByteInputStream>(4096);
+    session->getSession()->read(ptr->get(), std::ref(*callback));
     auto jniInpuStream = std::make_shared<minifi::jni::JniInputStream>(std::move(callback), jin, session->getServicer());
-
     session->addInputStream(jniInpuStream);
 
     minifi::jni::JVMLoader::getInstance()->setReference(jin, env, jniInpuStream.get());
-
     return jin;
   }
 
@@ -133,8 +122,9 @@ JNIEXPORT jboolean JNICALL Java_org_apache_nifi_processor_JniProcessSession_writ
     jbyte* buffer = env->GetByteArrayElements(byteArray, 0);
     jsize length = env->GetArrayLength(byteArray);
 
-    minifi::jni::JniByteOutStream outStream(buffer, (size_t) length);
-    session->getSession()->write(ptr->get(), &outStream);
+    if (length > 0) {
+      session->getSession()->writeBuffer(ptr->get(), gsl::make_span(reinterpret_cast<std::byte*>(buffer), gsl::narrow<size_t>(length)));
+    }
 
     env->ReleaseByteArrayElements(byteArray, buffer, 0);
 
@@ -436,16 +426,15 @@ JNIEXPORT jboolean JNICALL Java_org_apache_nifi_processor_JniProcessSession_appe
     return false;
   }
   THROW_IF((ff == nullptr || byteArray == nullptr), env, NO_FF_OBJECT);
-  minifi::jni::JniSession *session = minifi::jni::JVMLoader::getPtr<minifi::jni::JniSession>(env, obj);
-  minifi::jni::JniFlowFile *ptr = minifi::jni::JVMLoader::getInstance()->getReference<minifi::jni::JniFlowFile>(env, ff);
+  auto *session = minifi::jni::JVMLoader::getPtr<minifi::jni::JniSession>(env, obj);
+  auto *ptr = minifi::jni::JVMLoader::getInstance()->getReference<minifi::jni::JniFlowFile>(env, ff);
 
   if (ptr->get()) {
     jbyte* buffer = env->GetByteArrayElements(byteArray, 0);
     jsize length = env->GetArrayLength(byteArray);
 
     if (length > 0) {
-      minifi::jni::JniByteOutStream outStream(buffer, (size_t) length);
-      session->getSession()->append(ptr->get(), &outStream);
+      session->getSession()->appendBuffer(ptr->get(), gsl::make_span(reinterpret_cast<std::byte*>(buffer), gsl::narrow<size_t>(length)));
     }
 
     env->ReleaseByteArrayElements(byteArray, buffer, 0);
diff --git a/extensions/jni/jvm/JniReferenceObjects.h b/extensions/jni/jvm/JniReferenceObjects.h
index 66e98333a..366e89546 100644
--- a/extensions/jni/jvm/JniReferenceObjects.h
+++ b/extensions/jni/jvm/JniReferenceObjects.h
@@ -96,33 +96,16 @@ struct check_empty_ff : public std::unary_function<std::shared_ptr<JniFlowFile>,
   }
 };
 
-class JniByteOutStream : public minifi::OutputStreamCallback {
- public:
-  JniByteOutStream(jbyte *bytes, size_t length)
-      : bytes_(bytes),
-        length_(length) {
-  }
-
-  virtual ~JniByteOutStream() = default;
-  virtual int64_t process(const std::shared_ptr<minifi::io::BaseStream>& stream) {
-    const auto write_ret = stream->write(reinterpret_cast<uint8_t*>(bytes_), length_);
-    return io::isError(write_ret) ? -1 : gsl::narrow<int64_t>(write_ret);
-  }
- private:
-  jbyte *bytes_;
-  size_t length_;
-};
-
 /**
  * Jni byte input stream
  */
-class JniByteInputStream : public minifi::InputStreamCallback {
+class JniByteInputStream {
  public:
   explicit JniByteInputStream(uint64_t size)
       : buffer_(size),
         read_size_(0) {
   }
-  int64_t process(const std::shared_ptr<minifi::io::BaseStream>& stream) override {
+  int64_t operator()(const std::shared_ptr<minifi::io::BaseStream>& stream) {
     stream_ = stream;
     return 0;
   }
diff --git a/extensions/libarchive/CompressContent.cpp b/extensions/libarchive/CompressContent.cpp
index 874eb72c0..c03e63809 100644
--- a/extensions/libarchive/CompressContent.cpp
+++ b/extensions/libarchive/CompressContent.cpp
@@ -187,7 +187,7 @@ void CompressContent::processFlowFile(const std::shared_ptr<core::FlowFile>& flo
         if (!compressor.newEntry({filename, in->size()})) {
           return -1;
         }
-        return internal::pipe(in.get(), &compressor);
+        return internal::pipe(*in, compressor);
       };
     } else {
       transformer = [&] (const std::shared_ptr<io::InputStream>& in, const std::shared_ptr<io::OutputStream>& out) -> int64_t {
@@ -196,7 +196,7 @@ void CompressContent::processFlowFile(const std::shared_ptr<core::FlowFile>& flo
           success = false;
           return 0;  // prevents a session rollback
         }
-        auto ret = internal::pipe(&decompressor, out.get());
+        auto ret = internal::pipe(decompressor, *out);
         if (ret < 0) {
           success = false;
           return 0;  // prevents a session rollback
@@ -204,14 +204,14 @@ void CompressContent::processFlowFile(const std::shared_ptr<core::FlowFile>& flo
         return ret;
       };
     }
-    session->write(result, FunctionOutputStreamCallback([&] (const auto& out) {
-      return session->read(flowFile, FunctionInputStreamCallback([&] (const auto& in) {
+    session->write(result, [&] (const auto& out) {
+      return session->read(flowFile, [&] (const auto& in) {
         return transformer(in, out);
-      }));
-    }));
+      });
+    });
   } else {
     CompressContent::GzipWriteCallback callback(compressMode_, compressLevel_, flowFile, session);
-    session->write(result, &callback);
+    session->write(result, std::ref(callback));
     success = callback.success_;
   }
 
diff --git a/extensions/libarchive/CompressContent.h b/extensions/libarchive/CompressContent.h
index b485d52f6..f41adba00 100644
--- a/extensions/libarchive/CompressContent.h
+++ b/extensions/libarchive/CompressContent.h
@@ -88,7 +88,7 @@ class CompressContent : public core::Processor {
   )
 
  public:
-  class GzipWriteCallback : public OutputStreamCallback {
+  class GzipWriteCallback {
    public:
     GzipWriteCallback(CompressionMode compress_mode, int compress_level, std::shared_ptr<core::FlowFile> flow, std::shared_ptr<core::ProcessSession> session)
       : compress_mode_(std::move(compress_mode))
@@ -104,47 +104,33 @@ class CompressContent : public core::Processor {
     std::shared_ptr<core::ProcessSession> session_;
     bool success_{false};
 
-    int64_t process(const std::shared_ptr<io::BaseStream>& outputStream) override {
-      class ReadCallback : public InputStreamCallback {
-       public:
-        ReadCallback(GzipWriteCallback& writer, std::shared_ptr<io::OutputStream> outputStream)
-          : writer_(writer)
-          , outputStream_(std::move(outputStream)) {
-        }
-
-        int64_t process(const std::shared_ptr<io::BaseStream>& inputStream) override {
-          std::vector<std::byte> buffer(16 * 1024U);
-          size_t read_size = 0;
-          while (read_size < writer_.flow_->getSize()) {
-            const auto ret = inputStream->read(buffer);
-            if (io::isError(ret)) {
-              return -1;
-            } else if (ret == 0) {
-              break;
-            } else {
-              const auto writeret = outputStream_->write(gsl::make_span(buffer).subspan(0, ret));
-              if (io::isError(writeret) || gsl::narrow<size_t>(writeret) != ret) {
-                return -1;
-              }
-              read_size += ret;
-            }
-          }
-          outputStream_->close();
-          return gsl::narrow<int64_t>(read_size);
-        }
-
-        GzipWriteCallback& writer_;
-        std::shared_ptr<io::OutputStream> outputStream_;
-      };
-
+    int64_t operator()(const std::shared_ptr<io::BaseStream>& output_stream) {
       std::shared_ptr<io::ZlibBaseStream> filterStream;
       if (compress_mode_ == CompressionMode::Compress) {
-        filterStream = std::make_shared<io::ZlibCompressStream>(gsl::make_not_null(outputStream.get()), io::ZlibCompressionFormat::GZIP, compress_level_);
+        filterStream = std::make_shared<io::ZlibCompressStream>(gsl::make_not_null(output_stream.get()), io::ZlibCompressionFormat::GZIP, compress_level_);
       } else {
-        filterStream = std::make_shared<io::ZlibDecompressStream>(gsl::make_not_null(outputStream.get()), io::ZlibCompressionFormat::GZIP);
+        filterStream = std::make_shared<io::ZlibDecompressStream>(gsl::make_not_null(output_stream.get()), io::ZlibCompressionFormat::GZIP);
       }
-      ReadCallback readCb(*this, filterStream);
-      session_->read(flow_, &readCb);
+      session_->read(flow_, [this, &filterStream](const std::shared_ptr<io::BaseStream>& input_stream) -> int64_t {
+        std::vector<std::byte> buffer(16 * 1024U);
+        size_t read_size = 0;
+        while (read_size < flow_->getSize()) {
+          const auto ret = input_stream->read(buffer);
+          if (io::isError(ret)) {
+            return -1;
+          } else if (ret == 0) {
+            break;
+          } else {
+            const auto writeret = filterStream->write(gsl::make_span(buffer).subspan(0, ret));
+            if (io::isError(writeret) || gsl::narrow<size_t>(writeret) != ret) {
+              return -1;
+            }
+            read_size += ret;
+          }
+        }
+        filterStream->close();
+        return gsl::narrow<int64_t>(read_size);
+      });
 
       success_ = filterStream->isFinished();
 
diff --git a/extensions/libarchive/FocusArchiveEntry.cpp b/extensions/libarchive/FocusArchiveEntry.cpp
index 43caa9cfa..b38378b9c 100644
--- a/extensions/libarchive/FocusArchiveEntry.cpp
+++ b/extensions/libarchive/FocusArchiveEntry.cpp
@@ -72,8 +72,7 @@ void FocusArchiveEntry::onTrigger(core::ProcessContext *context, core::ProcessSe
   context->getProperty(Path.getName(), archiveMetadata.focusedEntry);
   flowFile->getAttribute("filename", archiveMetadata.archiveName);
 
-  ReadCallback cb(this, &file_man, &archiveMetadata);
-  session->read(flowFile, &cb);
+  session->read(flowFile, ReadCallback{this, &file_man, &archiveMetadata});
 
   // For each extracted entry, import & stash to key
   std::string targetEntryStashKey;
@@ -164,7 +163,7 @@ la_ssize_t FocusArchiveEntry::ReadCallback::read_cb(struct archive * a, void *d,
   return gsl::narrow<la_ssize_t>(read);
 }
 
-int64_t FocusArchiveEntry::ReadCallback::process(const std::shared_ptr<io::BaseStream>& stream) {
+int64_t FocusArchiveEntry::ReadCallback::operator()(const std::shared_ptr<io::BaseStream>& stream) const {
   auto inputArchive = archive_read_new();
   struct archive_entry *entry;
   int64_t nlen = 0;
@@ -184,7 +183,7 @@ int64_t FocusArchiveEntry::ReadCallback::process(const std::shared_ptr<io::BaseS
     return nlen;
   }
 
-  while (isRunning()) {
+  while (proc_->isRunning()) {
     res = archive_read_next_header(inputArchive, &entry);
 
     if (res == ARCHIVE_EOF) {
@@ -255,8 +254,6 @@ FocusArchiveEntry::ReadCallback::ReadCallback(core::Processor *processor, utils:
   _archiveMetadata = archiveMetadata;
 }
 
-FocusArchiveEntry::ReadCallback::~ReadCallback() = default;
-
 REGISTER_RESOURCE(FocusArchiveEntry, "Allows manipulation of entries within an archive (e.g. TAR) by focusing on one entry within the archive at a time. "
     "When an archive entry is focused, that entry is treated as the content of the FlowFile and may be manipulated independently of the rest of the archive."
     " To restore the FlowFile to its original state, use UnfocusArchiveEntry.");
diff --git a/extensions/libarchive/FocusArchiveEntry.h b/extensions/libarchive/FocusArchiveEntry.h
index 1c35e2b5e..9b5d3e16e 100644
--- a/extensions/libarchive/FocusArchiveEntry.h
+++ b/extensions/libarchive/FocusArchiveEntry.h
@@ -64,11 +64,10 @@ class FocusArchiveEntry : public core::Processor {
   //! Initialize, over write by NiFi FocusArchiveEntry
   void initialize() override;
 
-  class ReadCallback : public InputStreamCallback {
+  class ReadCallback {
    public:
     explicit ReadCallback(core::Processor*, utils::file::FileManager *file_man, ArchiveMetadata *archiveMetadata);
-    ~ReadCallback() override;
-    int64_t process(const std::shared_ptr<io::BaseStream>& stream) override;
+    int64_t operator()(const std::shared_ptr<io::BaseStream>& stream) const;
     bool isRunning() {return proc_->isRunning();}
 
    private:
diff --git a/extensions/libarchive/ManipulateArchive.cpp b/extensions/libarchive/ManipulateArchive.cpp
index cabf103b7..b62dcbab2 100644
--- a/extensions/libarchive/ManipulateArchive.cpp
+++ b/extensions/libarchive/ManipulateArchive.cpp
@@ -126,8 +126,7 @@ void ManipulateArchive::onTrigger(core::ProcessContext* /*context*/, core::Proce
     ArchiveMetadata archiveMetadata;
     utils::file::FileManager file_man;
 
-    FocusArchiveEntry::ReadCallback readCallback(this, &file_man, &archiveMetadata);
-    session->read(flowFile, &readCallback);
+    session->read(flowFile, FocusArchiveEntry::ReadCallback{this, &file_man, &archiveMetadata});
 
     auto entries_end = archiveMetadata.entryMetadata.end();
 
@@ -210,9 +209,7 @@ void ManipulateArchive::onTrigger(core::ProcessContext* /*context*/, core::Proce
         archiveMetadata.entryMetadata.insert(position, touchEntry);
     }
 
-    UnfocusArchiveEntry::WriteCallback writeCallback(&archiveMetadata);
-    session->write(flowFile, &writeCallback);
-
+    session->write(flowFile, UnfocusArchiveEntry::WriteCallback{&archiveMetadata});
     session->transfer(flowFile, Success);
 }
 
diff --git a/extensions/libarchive/MergeContent.cpp b/extensions/libarchive/MergeContent.cpp
index 15a508694..d0f4461ab 100644
--- a/extensions/libarchive/MergeContent.cpp
+++ b/extensions/libarchive/MergeContent.cpp
@@ -295,7 +295,7 @@ bool MergeContent::processBin(core::ProcessContext *context, core::ProcessSessio
     return false;
   }
 
-  auto flowFileReader = [&] (const std::shared_ptr<core::FlowFile>& ff, InputStreamCallback* cb) {
+  auto flowFileReader = [&] (const std::shared_ptr<core::FlowFile>& ff, const io::InputStreamCallback& cb) {
     return session->read(ff, cb);
   };
 
@@ -349,8 +349,7 @@ BinaryConcatenationMerge::BinaryConcatenationMerge(const std::string &header, co
 
 void BinaryConcatenationMerge::merge(core::ProcessContext* /*context*/, core::ProcessSession *session,
     std::deque<std::shared_ptr<core::FlowFile>> &flows, FlowFileSerializer& serializer, const std::shared_ptr<core::FlowFile>& merge_flow) {
-  BinaryConcatenationMerge::WriteCallback callback(header_, footer_, demarcator_, flows, serializer);
-  session->write(merge_flow, &callback);
+  session->write(merge_flow, BinaryConcatenationMerge::WriteCallback{header_, footer_, demarcator_, flows, serializer});
   std::string fileName;
   if (flows.size() == 1) {
     flows.front()->getAttribute(core::SpecialFlowAttribute::FILENAME, fileName);
@@ -363,8 +362,7 @@ void BinaryConcatenationMerge::merge(core::ProcessContext* /*context*/, core::Pr
 
 void TarMerge::merge(core::ProcessContext* /*context*/, core::ProcessSession *session,
     std::deque<std::shared_ptr<core::FlowFile>> &flows, FlowFileSerializer& serializer, const std::shared_ptr<core::FlowFile>& merge_flow) {
-  ArchiveMerge::WriteCallback callback(std::string(merge_content_options::MERGE_FORMAT_TAR_VALUE), flows, serializer);
-  session->write(merge_flow, &callback);
+  session->write(merge_flow, ArchiveMerge::WriteCallback{merge_content_options::MERGE_FORMAT_TAR_VALUE, flows, serializer});
   std::string fileName;
   merge_flow->getAttribute(core::SpecialFlowAttribute::FILENAME, fileName);
   if (flows.size() == 1) {
@@ -380,8 +378,7 @@ void TarMerge::merge(core::ProcessContext* /*context*/, core::ProcessSession *se
 
 void ZipMerge::merge(core::ProcessContext* /*context*/, core::ProcessSession *session,
     std::deque<std::shared_ptr<core::FlowFile>> &flows, FlowFileSerializer& serializer, const std::shared_ptr<core::FlowFile>& merge_flow) {
-  ArchiveMerge::WriteCallback callback(std::string(merge_content_options::MERGE_FORMAT_ZIP_VALUE), flows, serializer);
-  session->write(merge_flow, &callback);
+  session->write(merge_flow, ArchiveMerge::WriteCallback{merge_content_options::MERGE_FORMAT_ZIP_VALUE, flows, serializer});
   std::string fileName;
   merge_flow->getAttribute(core::SpecialFlowAttribute::FILENAME, fileName);
   if (flows.size() == 1) {
diff --git a/extensions/libarchive/MergeContent.h b/extensions/libarchive/MergeContent.h
index ca139cf0f..7cb6b24fe 100644
--- a/extensions/libarchive/MergeContent.h
+++ b/extensions/libarchive/MergeContent.h
@@ -21,6 +21,7 @@
 
 #include <deque>
 #include <map>
+#include <utility>
 #include <vector>
 #include <memory>
 #include <string>
@@ -72,7 +73,7 @@ class BinaryConcatenationMerge : public MergeBin {
   void merge(core::ProcessContext *context, core::ProcessSession *session,
       std::deque<std::shared_ptr<core::FlowFile>> &flows, FlowFileSerializer& serializer, const std::shared_ptr<core::FlowFile> &flowFile) override;
   // Nest Callback Class for write stream
-  class WriteCallback: public OutputStreamCallback {
+  class WriteCallback {
    public:
     WriteCallback(std::string &header, std::string &footer, std::string &demarcator,
         std::deque<std::shared_ptr<core::FlowFile>> &flows, FlowFileSerializer& serializer) :
@@ -85,7 +86,7 @@ class BinaryConcatenationMerge : public MergeBin {
     std::deque<std::shared_ptr<core::FlowFile>> &flows_;
     FlowFileSerializer& serializer_;
 
-    int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
+    int64_t operator()(const std::shared_ptr<io::BaseStream>& stream) const {
       size_t write_size_sum = 0;
       if (!header_.empty()) {
         const auto write_ret = stream->write(reinterpret_cast<const uint8_t*>(header_.data()), header_.size());
@@ -160,16 +161,15 @@ class ArchiveMerge {
     bool header_emitted_{false};
   };
   // Nest Callback Class for write stream
-  class WriteCallback: public OutputStreamCallback {
+  class WriteCallback {
    public:
     WriteCallback(std::string merge_type, std::deque<std::shared_ptr<core::FlowFile>> &flows, FlowFileSerializer& serializer)
-        : merge_type_(merge_type),
+        : merge_type_(std::move(merge_type)),
           flows_(flows),
           serializer_(serializer) {
       size_ = 0;
       stream_ = nullptr;
     }
-    ~WriteCallback() override = default;
 
     std::string merge_type_;
     std::deque<std::shared_ptr<core::FlowFile>> &flows_;
@@ -199,7 +199,7 @@ class ArchiveMerge {
       return totalWrote;
     }
 
-    int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
+    int64_t operator()(const std::shared_ptr<io::BaseStream>& stream) {
       struct archive *arch;
 
       arch = archive_write_new();
diff --git a/extensions/libarchive/UnfocusArchiveEntry.cpp b/extensions/libarchive/UnfocusArchiveEntry.cpp
index c32d2a600..2f6c1ac74 100644
--- a/extensions/libarchive/UnfocusArchiveEntry.cpp
+++ b/extensions/libarchive/UnfocusArchiveEntry.cpp
@@ -138,7 +138,7 @@ void UnfocusArchiveEntry::onTrigger(core::ProcessContext *context, core::Process
 
   // Create archive by restoring each entry in the archive from tmp files
   WriteCallback cb(&lensArchiveMetadata);
-  session->write(flowFile, &cb);
+  session->write(flowFile, std::cref(cb));
 
   // Transfer to the relationship
   session->transfer(flowFile, Success);
@@ -158,7 +158,7 @@ la_ssize_t UnfocusArchiveEntry::WriteCallback::write_cb(struct archive *, void *
   return io::isError(write_ret) ? -1 : gsl::narrow<la_ssize_t>(write_ret);
 }
 
-int64_t UnfocusArchiveEntry::WriteCallback::process(const std::shared_ptr<io::BaseStream>& stream) {
+int64_t UnfocusArchiveEntry::WriteCallback::operator()(const std::shared_ptr<io::BaseStream>& stream) const {
   auto outputArchive = archive_write_new();
   int64_t nlen = 0;
 
diff --git a/extensions/libarchive/UnfocusArchiveEntry.h b/extensions/libarchive/UnfocusArchiveEntry.h
index 6ada17eb3..905e886f5 100644
--- a/extensions/libarchive/UnfocusArchiveEntry.h
+++ b/extensions/libarchive/UnfocusArchiveEntry.h
@@ -64,10 +64,10 @@ class UnfocusArchiveEntry : public core::Processor {
   void initialize() override;
 
   //! Write callback for reconstituting lensed archive into flow file content
-  class WriteCallback : public OutputStreamCallback {
+  class WriteCallback {
    public:
     explicit WriteCallback(ArchiveMetadata *archiveMetadata);
-    int64_t process(const std::shared_ptr<io::BaseStream>& stream) override;
+    int64_t operator()(const std::shared_ptr<io::BaseStream>& stream) const;
    private:
     //! Logger
     std::shared_ptr<Logger> logger_ = core::logging::LoggerFactory<UnfocusArchiveEntry>::getLogger();
diff --git a/extensions/librdkafka/ConsumeKafka.cpp b/extensions/librdkafka/ConsumeKafka.cpp
index 555d6e6d8..5182db4bb 100644
--- a/extensions/librdkafka/ConsumeKafka.cpp
+++ b/extensions/librdkafka/ConsumeKafka.cpp
@@ -493,8 +493,7 @@ std::optional<std::vector<std::shared_ptr<FlowFileRecord>>> ConsumeKafka::transf
         return {};
       }
       // flowfile content is consumed here
-      WriteCallback stream_writer_callback(&flowfile_content[0], flowfile_content.size());
-      session.write(flow_file, &stream_writer_callback);
+      session.writeBuffer(flow_file, flowfile_content);
       for (const auto& kv : attributes_from_headers) {
         flow_file->setAttribute(kv.first, kv.second);
       }
@@ -538,13 +537,6 @@ void ConsumeKafka::onTrigger(core::ProcessContext* /* context */, core::ProcessS
   process_pending_messages(*session);
 }
 
-int64_t ConsumeKafka::WriteCallback::process(const std::shared_ptr<io::BaseStream>& stream) {
-  if (!data_) return 0;
-  const auto write_ret = stream->write(data_, dataSize_);
-  if (io::isError(write_ret)) return -1;
-  return gsl::narrow<int64_t>(write_ret);
-}
-
 REGISTER_RESOURCE(ConsumeKafka, "Consumes messages from Apache Kafka and transform them into MiNiFi FlowFiles. "
     "The application should make sure that the processor is triggered at regular intervals, even if no messages are expected, "
     "to serve any queued callbacks waiting to be called. Rebalancing can also only happen on trigger."); // NOLINT
diff --git a/extensions/librdkafka/ConsumeKafka.h b/extensions/librdkafka/ConsumeKafka.h
index bb7ebee2d..208bd1988 100644
--- a/extensions/librdkafka/ConsumeKafka.h
+++ b/extensions/librdkafka/ConsumeKafka.h
@@ -141,17 +141,6 @@ class ConsumeKafka : public KafkaProcessorBase {
   void process_pending_messages(core::ProcessSession& session);
 
  private:
-  class WriteCallback : public OutputStreamCallback {
-   public:
-    WriteCallback(char *data, uint64_t size) :
-        data_(reinterpret_cast<uint8_t*>(data)),
-        dataSize_(size) {}
-    int64_t process(const std::shared_ptr<io::BaseStream>& stream);
-   private:
-    uint8_t* data_;
-    uint64_t dataSize_;
-  };
-
   core::annotation::Input getInputRequirement() const override {
     return core::annotation::Input::INPUT_FORBIDDEN;
   }
diff --git a/extensions/librdkafka/PublishKafka.cpp b/extensions/librdkafka/PublishKafka.cpp
index 39111795b..72756be72 100644
--- a/extensions/librdkafka/PublishKafka.cpp
+++ b/extensions/librdkafka/PublishKafka.cpp
@@ -262,7 +262,7 @@ class PublishKafka::Messages {
 };
 
 namespace {
-class ReadCallback : public InputStreamCallback {
+class ReadCallback {
  public:
   struct rd_kafka_headers_deleter {
     void operator()(rd_kafka_headers_t* ptr) const noexcept {
@@ -357,7 +357,7 @@ class ReadCallback : public InputStreamCallback {
   ReadCallback(const ReadCallback&) = delete;
   ReadCallback& operator=(ReadCallback) = delete;
 
-  int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
+  int64_t operator()(const std::shared_ptr<io::BaseStream>& stream) {
     std::vector<std::byte> buffer;
 
     buffer.resize(max_seg_size_);
@@ -870,11 +870,11 @@ void PublishKafka::onTrigger(const std::shared_ptr<core::ProcessContext> &contex
 
     ReadCallback callback(max_flow_seg_size_, kafkaKey, thisTopic->getTopic(), conn_->getConnection(), *flowFile,
                                         attributeNameRegex_, messages, flow_file_index, failEmptyFlowFiles, logger_);
-    session->read(flowFile, &callback);
+    session->read(flowFile, std::ref(callback));
 
     if (!callback.called_) {
       // workaround: call callback since ProcessSession doesn't do so for empty flow files without resource claims
-      callback.process(nullptr);
+      callback(nullptr);
     }
 
     if (flowFile->getSize() == 0 && failEmptyFlowFiles) {
diff --git a/extensions/mqtt/processors/ConsumeMQTT.cpp b/extensions/mqtt/processors/ConsumeMQTT.cpp
index ba60d2a80..8290c8308 100644
--- a/extensions/mqtt/processors/ConsumeMQTT.cpp
+++ b/extensions/mqtt/processors/ConsumeMQTT.cpp
@@ -94,9 +94,20 @@ void ConsumeMQTT::onTrigger(const std::shared_ptr<core::ProcessContext>& /*conte
   while (!msg_queue.empty()) {
     MQTTClient_message *message = msg_queue.front();
     std::shared_ptr<core::FlowFile> processFlowFile = session->create();
-    ConsumeMQTT::WriteCallback callback(message);
-    session->write(processFlowFile, &callback);
-    if (callback.status_ < 0) {
+    int write_status{};
+    session->write(processFlowFile, [message, &write_status](const std::shared_ptr<io::BaseStream>& stream) -> int64_t {
+      if (message->payloadlen < 0) {
+        write_status = -1;
+        return -1;
+      }
+      const auto len = stream->write(reinterpret_cast<uint8_t*>(message->payload), gsl::narrow<size_t>(message->payloadlen));
+      if (io::isError(len)) {
+        write_status = -1;
+        return -1;
+      }
+      return gsl::narrow<int64_t>(len);
+    });
+    if (write_status < 0) {
       logger_->log_error("ConsumeMQTT fail for the flow with UUID %s", processFlowFile->getUUIDStr());
       session->remove(processFlowFile);
     } else {
diff --git a/extensions/mqtt/processors/ConsumeMQTT.h b/extensions/mqtt/processors/ConsumeMQTT.h
index f76c2af08..cad8461af 100644
--- a/extensions/mqtt/processors/ConsumeMQTT.h
+++ b/extensions/mqtt/processors/ConsumeMQTT.h
@@ -72,28 +72,6 @@ class ConsumeMQTT : public processors::AbstractMQTTProcessor {
 
   static core::Relationship Success;
 
-  // Nest Callback Class for write stream
-  class WriteCallback : public OutputStreamCallback {
-   public:
-    explicit WriteCallback(MQTTClient_message *message)
-        : message_(message) {
-    }
-    MQTTClient_message *message_;
-    int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
-      if (message_->payloadlen < 0) {
-        status_ = -1;
-        return -1;
-      }
-      const auto len = stream->write(reinterpret_cast<uint8_t*>(message_->payload), gsl::narrow<size_t>(message_->payloadlen));
-      if (io::isError(len)) {
-        status_ = -1;
-        return -1;
-      }
-      return gsl::narrow<int64_t>(len);
-    }
-    int status_ = 0;
-  };
-
  public:
   /**
    * Function that's executed when the processor is scheduled.
diff --git a/extensions/mqtt/processors/ConvertHeartBeat.cpp b/extensions/mqtt/processors/ConvertHeartBeat.cpp
index 5b7e14cd9..69446b5de 100644
--- a/extensions/mqtt/processors/ConvertHeartBeat.cpp
+++ b/extensions/mqtt/processors/ConvertHeartBeat.cpp
@@ -58,7 +58,7 @@ void ConvertHeartBeat::onTrigger(const std::shared_ptr<core::ProcessContext> &co
       minifi::utils::StreamOutputCallback byteCallback(serialized.size() + 1);
       byteCallback.write(const_cast<char*>(serialized.c_str()), serialized.size());
       auto newff = session->create();
-      session->write(newff, &byteCallback);
+      session->write(newff, std::ref(byteCallback));
       session->transfer(newff, Success);
       received_heartbeat = true;
     } else {
diff --git a/extensions/mqtt/processors/ConvertJSONAck.cpp b/extensions/mqtt/processors/ConvertJSONAck.cpp
index d46c53124..1d44905ec 100644
--- a/extensions/mqtt/processors/ConvertJSONAck.cpp
+++ b/extensions/mqtt/processors/ConvertJSONAck.cpp
@@ -33,14 +33,10 @@ std::string ConvertJSONAck::parseTopicName(const std::string &json) {
 
   try {
     rapidjson::ParseResult ok = root.Parse(json.c_str());
-    if (ok) {
-      if (root.HasMember("agentInfo")) {
-        if (root["agentInfo"].HasMember("identifier")) {
-          std::stringstream topicStr;
-          topicStr << root["agentInfo"]["identifier"].GetString() << "/in";
-          return topicStr.str();
-        }
-      }
+    if (ok && root.HasMember("agentInfo") && root["agentInfo"].HasMember("identifier")) {
+      std::stringstream topicStr;
+      topicStr << root["agentInfo"]["identifier"].GetString() << "/in";
+      return topicStr.str();
     }
   } catch (...) {
   }
@@ -68,6 +64,7 @@ void ConvertJSONAck::onTrigger(const std::shared_ptr<core::ProcessContext> &cont
     topic = parseTopicName(to_string(read_result));
     session->transfer(flow, Success);
   }
+
   flow = session->get();
   if (!flow) {
     return;
@@ -76,8 +73,8 @@ void ConvertJSONAck::onTrigger(const std::shared_ptr<core::ProcessContext> &cont
   if (!topic.empty()) {
     const auto read_result = session->readBuffer(flow);
     c2::C2Payload response_payload(c2::Operation::HEARTBEAT, state::UpdateState::READ_COMPLETE, true);
-    auto payload = parseJsonResponse(response_payload, read_result.buffer);
-    auto stream = c2::PayloadSerializer::serialize(1, payload);
+    const auto payload = parseJsonResponse(response_payload, read_result.buffer);
+    const auto stream = c2::PayloadSerializer::serialize(1, payload);
     mqtt_service_->send(topic, stream->getBuffer());
   }
 
diff --git a/extensions/mqtt/processors/ConvertJSONAck.h b/extensions/mqtt/processors/ConvertJSONAck.h
index 49cccea35..bfe458b56 100644
--- a/extensions/mqtt/processors/ConvertJSONAck.h
+++ b/extensions/mqtt/processors/ConvertJSONAck.h
@@ -28,7 +28,6 @@
 #include "core/Core.h"
 #include "core/logging/LoggerConfiguration.h"
 #include "ConvertBase.h"
-#include "utils/gsl.h"
 
 
 namespace org::apache::nifi::minifi::processors {
diff --git a/extensions/mqtt/processors/PublishMQTT.cpp b/extensions/mqtt/processors/PublishMQTT.cpp
index 1a3686e8b..54cfd0aed 100644
--- a/extensions/mqtt/processors/PublishMQTT.cpp
+++ b/extensions/mqtt/processors/PublishMQTT.cpp
@@ -90,7 +90,7 @@ void PublishMQTT::onTrigger(const std::shared_ptr<core::ProcessContext>& /*conte
   }
 
   PublishMQTT::ReadCallback callback(flowFile->getSize(), max_seg_size_, topic_, client_, qos_, retain_, delivered_token_);
-  session->read(flowFile, &callback);
+  session->read(flowFile, std::ref(callback));
   if (callback.status_ < 0) {
     logger_->log_error("Failed to send flow to MQTT topic %s", topic_);
     session->transfer(flowFile, Failure);
diff --git a/extensions/mqtt/processors/PublishMQTT.h b/extensions/mqtt/processors/PublishMQTT.h
index 834a386f9..ff58f61a0 100644
--- a/extensions/mqtt/processors/PublishMQTT.h
+++ b/extensions/mqtt/processors/PublishMQTT.h
@@ -65,7 +65,7 @@ class PublishMQTT : public processors::AbstractMQTTProcessor {
   static core::Relationship Success;
 
   // Nest Callback Class for read stream
-  class ReadCallback : public InputStreamCallback {
+  class ReadCallback {
    public:
     ReadCallback(uint64_t flow_size, uint64_t max_seg_size, const std::string &key, MQTTClient client, int qos, bool retain, MQTTClient_deliveryToken &token)
         : flow_size_(flow_size),
@@ -78,8 +78,7 @@ class PublishMQTT : public processors::AbstractMQTTProcessor {
       status_ = 0;
       read_size_ = 0;
     }
-    ~ReadCallback() override = default;
-    int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
+    int64_t operator()(const std::shared_ptr<io::BaseStream>& stream) {
       if (flow_size_ < max_seg_size_)
         max_seg_size_ = flow_size_;
       gsl_Expects(max_seg_size_ < gsl::narrow<uint64_t>(std::numeric_limits<int>::max()));
diff --git a/extensions/opc/include/fetchopc.h b/extensions/opc/include/fetchopc.h
index 64bc44354..d430ec7e9 100644
--- a/extensions/opc/include/fetchopc.h
+++ b/extensions/opc/include/fetchopc.h
@@ -63,7 +63,7 @@ class FetchOPCProcessor : public BaseOPCProcessor {
 
   void onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) override;
 
-  void initialize(void) override;
+  void initialize() override;
 
  protected:
   bool nodeFoundCallBack(opc::Client& client, const UA_ReferenceDescription *ref, const std::string& path,
@@ -71,17 +71,6 @@ class FetchOPCProcessor : public BaseOPCProcessor {
 
   void OPCData2FlowFile(const opc::NodeData& opcnode, const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session);
 
-  class WriteCallback : public OutputStreamCallback {
-    std::string data_;
-   public:
-    explicit WriteCallback(std::string&& data)
-      : data_(data) {
-    }
-    int64_t process(const std::shared_ptr<io::BaseStream>& stream) {
-      const auto write_ret = stream->write(reinterpret_cast<const uint8_t*>(data_.c_str()), data_.size());
-      return io::isError(write_ret) ? -1 : gsl::narrow<int64_t>(write_ret);
-    }
-  };
   std::string nodeID_;
   int32_t nameSpaceIdx_;
   opc::OPCNodeIDType idType_;
diff --git a/extensions/opc/src/fetchopc.cpp b/extensions/opc/src/fetchopc.cpp
index cedade7b8..60295436e 100644
--- a/extensions/opc/src/fetchopc.cpp
+++ b/extensions/opc/src/fetchopc.cpp
@@ -216,10 +216,9 @@ namespace processors {
     for (const auto& attr : opcnode.attributes) {
       flowFile->setAttribute(attr.first, attr.second);
     }
-    if (opcnode.data.size() > 0) {
+    if (!opcnode.data.empty()) {
       try {
-        FetchOPCProcessor::WriteCallback callback(opc::nodeValue2String(opcnode));
-        session->write(flowFile, &callback);
+        session->writeBuffer(flowFile, opc::nodeValue2String(opcnode));
       } catch (const std::exception& e) {
         std::string browsename;
         flowFile->getAttribute("Browsename", browsename);
diff --git a/extensions/opc/src/putopc.cpp b/extensions/opc/src/putopc.cpp
index bb04e8d50..6137b42fc 100644
--- a/extensions/opc/src/putopc.cpp
+++ b/extensions/opc/src/putopc.cpp
@@ -19,6 +19,7 @@
 
 #include <list>
 #include <memory>
+#include <set>
 #include <string>
 
 #include "opc.h"
diff --git a/extensions/opencv/CaptureRTSPFrame.cpp b/extensions/opencv/CaptureRTSPFrame.cpp
index 5b11ab76e..6c0350673 100644
--- a/extensions/opencv/CaptureRTSPFrame.cpp
+++ b/extensions/opencv/CaptureRTSPFrame.cpp
@@ -148,8 +148,6 @@ void CaptureRTSPFrame::onTrigger(const std::shared_ptr<core::ProcessContext> &co
   // retrieve a frame of your source
   if (video_capture_.read(frame)) {
     if (!frame.empty()) {
-      CaptureRTSPFrameWriteCallback write_cb(frame, image_encoding_);
-
       auto t = std::time(nullptr);
       auto tm = *std::localtime(&t);
 
@@ -161,7 +159,12 @@ void CaptureRTSPFrame::onTrigger(const std::shared_ptr<core::ProcessContext> &co
       session->putAttribute(flow_file, "filename", filename);
       session->putAttribute(flow_file, "video.backend.driver", video_backend_driver_);
 
-      session->write(flow_file, &write_cb);
+      session->write(flow_file, [&frame, this](const std::shared_ptr<io::BaseStream>& output_stream) -> int64_t {
+        std::vector<uchar> image_buf;
+        imencode(image_encoding_, frame, image_buf);
+        const auto ret = output_stream->write(image_buf.data(), image_buf.size());
+        return io::isError(ret) ? -1 : gsl::narrow<int64_t>(ret);
+      });
       session->transfer(flow_file, Success);
       logger_->log_info("A frame is captured");
     } else {
diff --git a/extensions/opencv/CaptureRTSPFrame.h b/extensions/opencv/CaptureRTSPFrame.h
index 3b9c72a8e..f0f472661 100644
--- a/extensions/opencv/CaptureRTSPFrame.h
+++ b/extensions/opencv/CaptureRTSPFrame.h
@@ -32,12 +32,7 @@
 #include "utils/gsl.h"
 #include "utils/Export.h"
 
-
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace processors {
+namespace org::apache::nifi::minifi::processors {
 
 class CaptureRTSPFrame : public core::Processor {
  public:
@@ -65,25 +60,6 @@ class CaptureRTSPFrame : public core::Processor {
 
   void notifyStop() override;
 
-  class CaptureRTSPFrameWriteCallback : public OutputStreamCallback {
-   public:
-    explicit CaptureRTSPFrameWriteCallback(cv::Mat image_mat, std::string image_encoding)
-        : image_mat_(std::move(image_mat)), image_encoding_(image_encoding) {
-    }
-    ~CaptureRTSPFrameWriteCallback() override = default;
-
-    int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
-      imencode(image_encoding_, image_mat_, image_buf_);
-      const auto ret = stream->write(image_buf_.data(), image_buf_.size());
-      return io::isError(ret) ? -1 : gsl::narrow<int64_t>(ret);
-    }
-
-   private:
-    std::vector<uchar> image_buf_;
-    cv::Mat image_mat_;
-    std::string image_encoding_;
-  };
-
  private:
   std::shared_ptr<core::logging::Logger> logger_ = core::logging::LoggerFactory<CaptureRTSPFrame>::getLogger();
   std::mutex mutex_;
@@ -132,8 +108,4 @@ class CaptureRTSPFrame : public core::Processor {
 //  std::shared_ptr<minifi::controllers::SSLContextService> ssl_service_;
 };
 
-} /* namespace processors */
-} /* namespace minifi */
-} /* namespace nifi */
-} /* namespace apache */
-} /* namespace org */
+}   // namespace org::apache::nifi::minifi::processors
diff --git a/extensions/opencv/FrameIO.h b/extensions/opencv/FrameIO.h
deleted file mode 100644
index 75c6a90ff..000000000
--- a/extensions/opencv/FrameIO.h
+++ /dev/null
@@ -1,79 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#pragma once
-
-#include <utility>
-#include <vector>
-#include <memory>
-#include <string>
-
-#include "io/StreamPipe.h"
-#include "utils/gsl.h"
-
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace opencv {
-
-class FrameWriteCallback : public OutputStreamCallback {
- public:
-  explicit FrameWriteCallback(cv::Mat image_mat, std::string image_encoding)
-      : image_mat_(std::move(image_mat)), image_encoding_(image_encoding) {
-  }
-  ~FrameWriteCallback() override = default;
-
-  int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
-    imencode(image_encoding_, image_mat_, image_buf_);
-    const auto ret = stream->write(image_buf_.data(), image_buf_.size());
-    return io::isError(ret) ? -1 : gsl::narrow<int64_t>(ret);
-  }
-
- private:
-  std::vector<uchar> image_buf_;
-  cv::Mat image_mat_;
-  std::string image_encoding_;
-};
-
-class FrameReadCallback : public InputStreamCallback {
- public:
-  explicit FrameReadCallback(cv::Mat &image_mat)
-      : image_mat_(image_mat) {
-  }
-  ~FrameReadCallback() override = default;
-
-  int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
-    std::vector<uchar> image_buf;
-    image_buf.resize(stream->size());
-    const auto ret = stream->read(gsl::make_span(image_buf).as_span<std::byte>());
-    if (ret != stream->size()) {
-      throw std::runtime_error("ImageReadCallback failed to fully read flow file input stream");
-    }
-    image_mat_ = cv::imdecode(image_buf, -1);
-    return gsl::narrow<int64_t>(ret);
-  }
-
- private:
-  cv::Mat &image_mat_;
-};
-
-} /* namespace opencv */
-} /* namespace minifi */
-} /* namespace nifi */
-} /* namespace apache */
-} /* namespace org */
diff --git a/extensions/opencv/MotionDetector.cpp b/extensions/opencv/MotionDetector.cpp
index 05f94b0e8..3114b783a 100644
--- a/extensions/opencv/MotionDetector.cpp
+++ b/extensions/opencv/MotionDetector.cpp
@@ -20,7 +20,6 @@
 #include <vector>
 
 #include "MotionDetector.h"
-#include "FrameIO.h"
 #include "core/ProcessContext.h"
 #include "core/ProcessSession.h"
 #include "core/Resource.h"
@@ -163,8 +162,16 @@ void MotionDetector::onTrigger(const std::shared_ptr<core::ProcessContext> &cont
   }
   cv::Mat frame;
 
-  opencv::FrameReadCallback cb(frame);
-  session->read(flow_file, &cb);
+  session->read(flow_file, [&frame](const std::shared_ptr<io::BaseStream>& input_stream) -> int64_t {
+    std::vector<uchar> image_buf;
+    image_buf.resize(input_stream->size());
+    const auto ret = input_stream->read(gsl::make_span(image_buf).as_span<std::byte>());
+    if (io::isError(ret) || ret != input_stream->size()) {
+      throw std::runtime_error("ImageReadCallback failed to fully read flow file input stream");
+    }
+    frame = cv::imdecode(image_buf, -1);
+    return gsl::narrow<int64_t>(ret);
+  });
 
   if (frame.empty()) {
     logger_->log_error("Empty frame.");
@@ -192,11 +199,14 @@ void MotionDetector::onTrigger(const std::shared_ptr<core::ProcessContext> &cont
 
   detectAndDraw(frame);
 
-  opencv::FrameWriteCallback write_cb(frame, image_encoding_);
-
   session->putAttribute(flow_file, "filename", filename);
 
-  session->write(flow_file, &write_cb);
+  session->write(flow_file, [&frame, this](const auto& output_stream) -> int64_t {
+    std::vector<uchar> image_buf;
+    imencode(image_encoding_, frame, image_buf);
+    const auto ret = output_stream->write(image_buf.data(), image_buf.size());
+    return io::isError(ret) ? -1 : gsl::narrow<int64_t>(ret);
+  });
   session->transfer(flow_file, Success);
   logger_->log_trace("Finish motion detecting");
 }
diff --git a/extensions/openwsman/processors/SourceInitiatedSubscriptionListener.cpp b/extensions/openwsman/processors/SourceInitiatedSubscriptionListener.cpp
index 5c76d603c..6625f42e1 100644
--- a/extensions/openwsman/processors/SourceInitiatedSubscriptionListener.cpp
+++ b/extensions/openwsman/processors/SourceInitiatedSubscriptionListener.cpp
@@ -66,11 +66,8 @@ extern "C" {
 #define WSMAN_CUSTOM_ACTION_HEARTBEAT "http://schemas.dmtf.org/wbem/wsman/1/wsman/Heartbeat"
 #define WSMAN_CUSTOM_ACTION_EVENTS "http://schemas.dmtf.org/wbem/wsman/1/wsman/Events"
 
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace processors {
+namespace org::apache::nifi::minifi::processors {
+
 core::Property SourceInitiatedSubscriptionListener::ListenHostname(
     core::PropertyBuilder::createProperty("Listen Hostname")->withDescription("The hostname or IP of this machine that will be advertised to event sources to connect to. "
                                                                               "It must be contained as a Subject Alternative Name in the server certificate, "
@@ -602,15 +599,6 @@ bool SourceInitiatedSubscriptionListener::Handler::handleSubscriptionManager(str
   return true;
 }
 
-SourceInitiatedSubscriptionListener::Handler::WriteCallback::WriteCallback(char* text)
-    : text_(text) {
-}
-
-int64_t SourceInitiatedSubscriptionListener::Handler::WriteCallback::process(const std::shared_ptr<io::BaseStream>& stream) {
-  const auto write_ret = stream->write(reinterpret_cast<uint8_t*>(text_), strlen(text_));
-  return io::isError(write_ret) ? -1 : gsl::narrow<int64_t>(write_ret);
-}
-
 int SourceInitiatedSubscriptionListener::Handler::enumerateEventCallback(WsXmlNodeH node, void* data) {
   if (data == nullptr) {
     return 1;
@@ -636,8 +624,7 @@ int SourceInitiatedSubscriptionListener::Handler::enumerateEventCallback(WsXmlNo
       return 1;
     }
 
-    WriteCallback callback(text);
-    session->write(flow_file, &callback);
+    session->writeBuffer(flow_file, std::string_view{text});
 
     session->putAttribute(flow_file, core::SpecialFlowAttribute::MIME_TYPE, "application/xml");
     flow_file->addAttribute(ATTRIBUTE_WEF_REMOTE_MACHINEID, machine_id);
@@ -906,8 +893,4 @@ void SourceInitiatedSubscriptionListener::notifyStop() {
 REGISTER_RESOURCE(SourceInitiatedSubscriptionListener, "This processor implements a Windows Event Forwarding Source Initiated Subscription server with the help of OpenWSMAN. "
                                                        "Windows hosts can be set up to connect and forward Event Logs to this processor.");
 
-} /* namespace processors */
-} /* namespace minifi */
-} /* namespace nifi */
-} /* namespace apache */
-} /* namespace org */
+}  // namespace org::apache::nifi::minifi::processors
diff --git a/extensions/openwsman/processors/SourceInitiatedSubscriptionListener.h b/extensions/openwsman/processors/SourceInitiatedSubscriptionListener.h
index 6b84ef973..927e81960 100644
--- a/extensions/openwsman/processors/SourceInitiatedSubscriptionListener.h
+++ b/extensions/openwsman/processors/SourceInitiatedSubscriptionListener.h
@@ -39,11 +39,7 @@ extern "C" {
 #include "core/logging/LoggerConfiguration.h"
 #include "utils/Id.h"
 
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace processors {
+namespace org::apache::nifi::minifi::processors {
 
 class SourceInitiatedSubscriptionListener : public core::Processor {
  public:
@@ -86,16 +82,7 @@ class SourceInitiatedSubscriptionListener : public core::Processor {
   class Handler: public CivetHandler {
    public:
     explicit Handler(SourceInitiatedSubscriptionListener& processor);
-    bool handlePost(CivetServer* server, struct mg_connection* conn);
-
-    class WriteCallback : public OutputStreamCallback {
-     public:
-      explicit WriteCallback(char* text);
-      int64_t process(const std::shared_ptr<io::BaseStream>& stream);
-
-     private:
-      char* text_;
-    };
+    bool handlePost(CivetServer* server, struct mg_connection* conn) override;
 
    private:
     SourceInitiatedSubscriptionListener& processor_;
@@ -163,8 +150,4 @@ class SourceInitiatedSubscriptionListener : public core::Processor {
   bool loadState();
 };
 
-} /* namespace processors */
-} /* namespace minifi */
-} /* namespace nifi */
-} /* namespace apache */
-} /* namespace org */
+}  // namespace org::apache::nifi::minifi::processors
diff --git a/extensions/pdh/PerformanceDataMonitor.cpp b/extensions/pdh/PerformanceDataMonitor.cpp
index 71c9618e5..4e1461d5c 100644
--- a/extensions/pdh/PerformanceDataMonitor.cpp
+++ b/extensions/pdh/PerformanceDataMonitor.cpp
@@ -123,11 +123,11 @@ void PerformanceDataMonitor::onTrigger(core::ProcessContext* context, core::Proc
   }
   if (pretty_output_) {
     utils::PrettyJsonOutputCallback callback(std::move(root), decimal_places_);
-    session->write(flowFile, &callback);
+    session->write(flowFile, std::ref(callback));
     session->transfer(flowFile, Success);
   } else {
     utils::JsonOutputCallback callback(std::move(root), decimal_places_);
-    session->write(flowFile, &callback);
+    session->write(flowFile, std::ref(callback));
     session->transfer(flowFile, Success);
   }
 }
diff --git a/extensions/procfs/processors/ProcFsMonitor.cpp b/extensions/procfs/processors/ProcFsMonitor.cpp
index 485da9f03..992d6edcd 100644
--- a/extensions/procfs/processors/ProcFsMonitor.cpp
+++ b/extensions/procfs/processors/ProcFsMonitor.cpp
@@ -120,11 +120,11 @@ void ProcFsMonitor::onTrigger(core::ProcessContext*, core::ProcessSession* sessi
 
   if (output_compactness_ == OutputCompactness::PRETTY) {
     utils::PrettyJsonOutputCallback callback(std::move(root), decimal_places_);
-    session->write(flowFile, &callback);
+    session->write(flowFile, std::ref(callback));
     session->transfer(flowFile, Success);
   } else if (output_compactness_ == OutputCompactness::COMPACT) {
     utils::JsonOutputCallback callback(std::move(root), decimal_places_);
-    session->write(flowFile, &callback);
+    session->write(flowFile, std::ref(callback));
     session->transfer(flowFile, Success);
   } else {
     throw Exception(GENERAL_EXCEPTION, "Invalid output compactness");
diff --git a/extensions/script/lua/LuaProcessSession.cpp b/extensions/script/lua/LuaProcessSession.cpp
index 4e20bb840..7b9916b8c 100644
--- a/extensions/script/lua/LuaProcessSession.cpp
+++ b/extensions/script/lua/LuaProcessSession.cpp
@@ -74,8 +74,10 @@ void LuaProcessSession::read(const std::shared_ptr<script::ScriptFlowFile> &scri
     throw std::runtime_error("Access of FlowFile after it has been released");
   }
 
-  LuaInputStreamCallback lua_callback(input_stream_callback);
-  session_->read(flow_file, &lua_callback);
+  session_->read(flow_file, [&input_stream_callback](const std::shared_ptr<io::BaseStream>& input_stream) -> int64_t {
+    sol::function callback = input_stream_callback["process"];
+    return callback(input_stream_callback, std::make_shared<LuaBaseStream>(input_stream));
+  });
 }
 
 void LuaProcessSession::write(const std::shared_ptr<script::ScriptFlowFile> &script_flow_file,
@@ -90,8 +92,10 @@ void LuaProcessSession::write(const std::shared_ptr<script::ScriptFlowFile> &scr
     throw std::runtime_error("Access of FlowFile after it has been released");
   }
 
-  LuaOutputStreamCallback lua_callback(output_stream_callback);
-  session_->write(flow_file, &lua_callback);
+  session_->write(flow_file, [&output_stream_callback](const std::shared_ptr<io::BaseStream>& output_stream) -> int64_t {
+    sol::function callback = output_stream_callback["process"];
+    return callback(output_stream_callback, std::make_shared<LuaBaseStream>(output_stream));
+  });
 }
 
 std::shared_ptr<script::ScriptFlowFile> LuaProcessSession::create() {
diff --git a/extensions/script/lua/LuaProcessSession.h b/extensions/script/lua/LuaProcessSession.h
index 48e387307..27c4709e1 100644
--- a/extensions/script/lua/LuaProcessSession.h
+++ b/extensions/script/lua/LuaProcessSession.h
@@ -53,38 +53,6 @@ class LuaProcessSession {
    */
   void releaseCoreResources();
 
-  class LuaInputStreamCallback : public InputStreamCallback {
-   public:
-    explicit LuaInputStreamCallback(const sol::table &input_stream_callback) {
-      lua_callback_ = input_stream_callback;
-    }
-
-    int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
-      auto lua_stream = std::make_shared<LuaBaseStream>(stream);
-      sol::function callback = lua_callback_["process"];
-      return callback(lua_callback_, lua_stream);
-    }
-
-   private:
-    sol::table lua_callback_;
-  };
-
-  class LuaOutputStreamCallback : public OutputStreamCallback {
-   public:
-    explicit LuaOutputStreamCallback(const sol::table &output_stream_callback) {
-      lua_callback_ = output_stream_callback;
-    }
-
-    int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
-      auto lua_stream = std::make_shared<LuaBaseStream>(stream);
-      sol::function callback = lua_callback_["process"];
-      return callback(lua_callback_, lua_stream);
-    }
-
-   private:
-    sol::table lua_callback_;
-  };
-
  private:
   std::vector<std::shared_ptr<script::ScriptFlowFile>> flow_files_;
   std::shared_ptr<core::ProcessSession> session_;
diff --git a/extensions/script/python/PyProcessSession.cpp b/extensions/script/python/PyProcessSession.cpp
index b0a5d36a2..78b92dd57 100644
--- a/extensions/script/python/PyProcessSession.cpp
+++ b/extensions/script/python/PyProcessSession.cpp
@@ -80,8 +80,9 @@ void PyProcessSession::read(std::shared_ptr<script::ScriptFlowFile> script_flow_
     throw std::runtime_error("Access of FlowFile after it has been released");
   }
 
-  PyInputStreamCallback py_callback(input_stream_callback);
-  session_->read(flow_file, &py_callback);
+  session_->read(flow_file, [&input_stream_callback](const std::shared_ptr<io::BaseStream>& input_stream) -> int64_t {
+    return input_stream_callback.attr("process")(std::make_shared<PyBaseStream>(input_stream)).cast<int64_t>();
+  });
 }
 
 void PyProcessSession::write(std::shared_ptr<script::ScriptFlowFile> script_flow_file,
@@ -96,8 +97,9 @@ void PyProcessSession::write(std::shared_ptr<script::ScriptFlowFile> script_flow
     throw std::runtime_error("Access of FlowFile after it has been released");
   }
 
-  PyOutputStreamCallback py_callback(output_stream_callback);
-  session_->write(flow_file, &py_callback);
+  session_->write(flow_file, [&output_stream_callback](const std::shared_ptr<io::BaseStream>& output_stream) -> int64_t {
+    return output_stream_callback.attr("process")(std::make_shared<PyBaseStream>(output_stream)).cast<int64_t>();
+  });
 }
 
 std::shared_ptr<script::ScriptFlowFile> PyProcessSession::create() {
diff --git a/extensions/script/python/PyProcessSession.h b/extensions/script/python/PyProcessSession.h
index 1d6867eac..2cdee8a3e 100644
--- a/extensions/script/python/PyProcessSession.h
+++ b/extensions/script/python/PyProcessSession.h
@@ -58,36 +58,6 @@ class PyProcessSession {
    */
   void releaseCoreResources();
 
-  class PyInputStreamCallback : public InputStreamCallback {
-   public:
-    explicit PyInputStreamCallback(const py::object &input_stream_callback) {
-      py_callback_ = input_stream_callback;
-    }
-
-    int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
-      auto py_stream = std::make_shared<PyBaseStream>(stream);
-      return py_callback_.attr("process")(py_stream).cast<int64_t>();
-    }
-
-   private:
-    py::object py_callback_;
-  };
-
-  class PyOutputStreamCallback : public OutputStreamCallback {
-   public:
-    explicit PyOutputStreamCallback(const py::object &output_stream_callback) {
-      py_callback_ = output_stream_callback;
-    }
-
-    int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
-      auto py_stream = std::make_shared<PyBaseStream>(stream);
-      return py_callback_.attr("process")(py_stream).cast<int64_t>();
-    }
-
-   private:
-    py::object py_callback_;
-  };
-
  private:
   std::vector<std::shared_ptr<script::ScriptFlowFile>> flow_files_;
   std::shared_ptr<core::ProcessSession> session_;
diff --git a/extensions/sensors/GetEnvironmentalSensors.cpp b/extensions/sensors/GetEnvironmentalSensors.cpp
index 900095f41..7c62cde10 100644
--- a/extensions/sensors/GetEnvironmentalSensors.cpp
+++ b/extensions/sensors/GetEnvironmentalSensors.cpp
@@ -137,8 +137,7 @@ void GetEnvironmentalSensors::onTrigger(const std::shared_ptr<core::ProcessConte
   }
 
   if (have_sensor) {
-    WriteCallback callback("GetEnvironmentalSensors");
-    session->write(flow_file_, &callback);
+    session->writeBuffer(flow_file_, "GetEnvironmentalSensors");
     session->transfer(flow_file_, Success);
   }
 }
diff --git a/extensions/sensors/GetMovementSensors.cpp b/extensions/sensors/GetMovementSensors.cpp
index 7e15f1a57..3340bf665 100644
--- a/extensions/sensors/GetMovementSensors.cpp
+++ b/extensions/sensors/GetMovementSensors.cpp
@@ -85,8 +85,7 @@ void GetMovementSensors::onTrigger(const std::shared_ptr<core::ProcessContext>&
       logger_->log_trace("Could not read gyroscope");
     }
 
-    WriteCallback callback("GetMovementSensors");
-    session->write(flow_file_, &callback);
+    session->writeBuffer(flow_file_, "GetMovementSensors");
     session->transfer(flow_file_, Success);
   }
 }
diff --git a/extensions/sensors/SensorBase.h b/extensions/sensors/SensorBase.h
index 6a889b743..b27a8e30f 100644
--- a/extensions/sensors/SensorBase.h
+++ b/extensions/sensors/SensorBase.h
@@ -33,39 +33,18 @@ namespace nifi {
 namespace minifi {
 namespace processors {
 
-// SensorBase Class
 class SensorBase : public core::Processor {
  public:
-  // Constructor
-  /*!
-   * Create a new processor
-   */
   explicit SensorBase(const std::string& name, const utils::Identifier& uuid = {})
     : Processor(name, uuid) {
   }
-  // Destructor
   ~SensorBase() override;
-  // Processor Name
   static core::Relationship Success;
-  // Supported Properties
 
   void onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) override;
   void initialize() override;
   void onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) override;
 
-  class WriteCallback : public OutputStreamCallback {
-   public:
-    explicit WriteCallback(std::string data)
-        : data_{std::move(data)} {
-    }
-    std::string data_;
-    int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
-      if (data_.empty()) return 0;
-      const auto write_ret = stream->write(reinterpret_cast<const uint8_t*>(data_.data()), data_.size());
-      return io::isError(write_ret) ? -1 : gsl::narrow<int64_t>(write_ret);
-    }
-  };
-
  protected:
   std::optional<RTIMUSettings> settings_;
   std::unique_ptr<RTIMU> imu_;
diff --git a/extensions/sftp/processors/FetchSFTP.cpp b/extensions/sftp/processors/FetchSFTP.cpp
index 1dac1d8bf..322bcd402 100644
--- a/extensions/sftp/processors/FetchSFTP.cpp
+++ b/extensions/sftp/processors/FetchSFTP.cpp
@@ -158,21 +158,6 @@ void FetchSFTP::onSchedule(const std::shared_ptr<core::ProcessContext> &context,
   startKeepaliveThreadIfNeeded();
 }
 
-FetchSFTP::WriteCallback::WriteCallback(const std::string& remote_file,
-                                    utils::SFTPClient& client)
-    : remote_file_(remote_file)
-    , client_(client) {
-}
-
-FetchSFTP::WriteCallback::~WriteCallback() = default;
-
-int64_t FetchSFTP::WriteCallback::process(const std::shared_ptr<io::BaseStream>& stream) {
-  if (!client_.getFile(remote_file_, *stream)) {
-    throw utils::SFTPException{client_.getLastError()};
-  }
-  return stream->size();
-}
-
 void FetchSFTP::onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) {
   auto flow_file = session->get();
   if (flow_file == nullptr) {
@@ -220,9 +205,13 @@ void FetchSFTP::onTrigger(const std::shared_ptr<core::ProcessContext> &context,
   };
 
   /* Download file */
-  WriteCallback write_callback(remote_file, *client);
   try {
-    session->write(flow_file, &write_callback);
+    session->write(flow_file, [&remote_file, &client](const std::shared_ptr<io::BaseStream>& stream) -> int64_t {
+      if (!client->getFile(remote_file, *stream)) {
+        throw utils::SFTPException{client->getLastError()};
+      }
+      return gsl::narrow<int64_t>(stream->size());
+    });
   } catch (const utils::SFTPException& ex) {
     logger_->log_debug(ex.what());
     switch (ex.error().value()) {
diff --git a/extensions/sftp/processors/FetchSFTP.h b/extensions/sftp/processors/FetchSFTP.h
index 5270d4b0c..b8b62a2de 100644
--- a/extensions/sftp/processors/FetchSFTP.h
+++ b/extensions/sftp/processors/FetchSFTP.h
@@ -74,19 +74,6 @@ class FetchSFTP : public SFTPProcessorBase {
   void initialize() override;
   void onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) override;
 
-  class WriteCallback : public OutputStreamCallback {
-   public:
-    WriteCallback(const std::string& remote_file,
-                 utils::SFTPClient& client);
-    ~WriteCallback() override;
-    int64_t process(const std::shared_ptr<io::BaseStream>& stream) override;
-
-   private:
-    std::shared_ptr<core::logging::Logger> logger_ = core::logging::LoggerFactory<FetchSFTP::WriteCallback>::getLogger();
-    const std::string remote_file_;
-    utils::SFTPClient& client_;
-  };
-
  private:
   core::annotation::Input getInputRequirement() const override {
     return core::annotation::Input::INPUT_REQUIRED;
diff --git a/extensions/sftp/processors/PutSFTP.cpp b/extensions/sftp/processors/PutSFTP.cpp
index a683aa901..d90687465 100644
--- a/extensions/sftp/processors/PutSFTP.cpp
+++ b/extensions/sftp/processors/PutSFTP.cpp
@@ -212,26 +212,6 @@ void PutSFTP::onSchedule(const std::shared_ptr<core::ProcessContext> &context, c
   startKeepaliveThreadIfNeeded();
 }
 
-PutSFTP::ReadCallback::ReadCallback(const std::string& target_path,
-                                    utils::SFTPClient& client,
-                                    const std::string& conflict_resolution)
-    : target_path_(target_path)
-    , client_(client)
-    , conflict_resolution_(conflict_resolution) {
-}
-
-PutSFTP::ReadCallback::~ReadCallback() = default;
-
-int64_t PutSFTP::ReadCallback::process(const std::shared_ptr<io::BaseStream>& stream) {
-  if (!client_.putFile(target_path_,
-      *stream,
-      conflict_resolution_ == CONFLICT_RESOLUTION_REPLACE /*overwrite*/,
-      stream->size() /*expected_size*/)) {
-    throw utils::SFTPException{client_.getLastError()};
-  }
-  return stream->size();
-}
-
 bool PutSFTP::processOne(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) {
   auto flow_file = session->get();
   if (flow_file == nullptr) {
@@ -436,9 +416,16 @@ bool PutSFTP::processOne(const std::shared_ptr<core::ProcessContext> &context, c
   std::string final_target_path = utils::file::concat_path(remote_path, resolved_filename, true /*force_posix*/);
   logger_->log_debug("The target path is %s, final target path is %s", target_path.c_str(), final_target_path.c_str());
 
-  ReadCallback read_callback(target_path.c_str(), *client, conflict_resolution_);
   try {
-    session->read(flow_file, &read_callback);
+    session->read(flow_file, [&client, &target_path, this](const std::shared_ptr<io::BaseStream>& stream) {
+      if (!client->putFile(target_path,
+          *stream,
+          conflict_resolution_ == CONFLICT_RESOLUTION_REPLACE /*overwrite*/,
+          stream->size() /*expected_size*/)) {
+        throw utils::SFTPException{client->getLastError()};
+      }
+      return gsl::narrow<int64_t>(stream->size());
+    });
   } catch (const utils::SFTPException& ex) {
     logger_->log_debug(ex.what());
     session->transfer(flow_file, Failure);
diff --git a/extensions/sftp/processors/PutSFTP.h b/extensions/sftp/processors/PutSFTP.h
index 9f6ff3c16..e723becb3 100644
--- a/extensions/sftp/processors/PutSFTP.h
+++ b/extensions/sftp/processors/PutSFTP.h
@@ -89,21 +89,6 @@ class PutSFTP : public SFTPProcessorBase {
   void initialize() override;
   void onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) override;
 
-  class ReadCallback : public InputStreamCallback {
-   public:
-    ReadCallback(const std::string& target_path,
-        utils::SFTPClient& client,
-        const std::string& conflict_resolution);
-    ~ReadCallback() override;
-    int64_t process(const std::shared_ptr<io::BaseStream>& stream) override;
-
-   private:
-    std::shared_ptr<core::logging::Logger> logger_ = core::logging::LoggerFactory<PutSFTP::ReadCallback>::getLogger();
-    const std::string target_path_;
-    utils::SFTPClient& client_;
-    const std::string conflict_resolution_;
-  };
-
  private:
   core::annotation::Input getInputRequirement() const override {
     return core::annotation::Input::INPUT_REQUIRED;
diff --git a/extensions/splunk/PutSplunkHTTP.cpp b/extensions/splunk/PutSplunkHTTP.cpp
index a58ef713f..224715036 100644
--- a/extensions/splunk/PutSplunkHTTP.cpp
+++ b/extensions/splunk/PutSplunkHTTP.cpp
@@ -30,6 +30,7 @@
 #include "client/HTTPClient.h"
 #include "utils/HTTPClient.h"
 #include "utils/OptionalUtils.h"
+#include "utils/ByteArrayCallback.h"
 
 #include "rapidjson/document.h"
 
@@ -131,9 +132,9 @@ void setFlowFileAsPayload(core::ProcessSession& session,
                           core::ProcessContext& context,
                           utils::HTTPClient& client,
                           const gsl::not_null<std::shared_ptr<core::FlowFile>>& flow_file,
-                          utils::ByteInputCallBack& payload_callback,
+                          utils::ByteInputCallback& payload_callback,
                           utils::HTTPUploadCallback& payload_callback_obj) {
-  session.read(flow_file, &payload_callback);
+  session.read(flow_file, std::ref(payload_callback));
   payload_callback_obj.ptr = &payload_callback;
   payload_callback_obj.pos = 0;
   client.appendHeader("Content-Length", std::to_string(flow_file->getSize()));
@@ -160,7 +161,7 @@ void PutSplunkHTTP::onTrigger(const std::shared_ptr<core::ProcessContext>& conte
   utils::HTTPClient client;
   initializeClient(client, getNetworkLocation().append(getEndpoint(*context, flow_file, client)), getSSLContextService(*context));
 
-  utils::ByteInputCallBack payload_callback;
+  utils::ByteInputCallback payload_callback;
   utils::HTTPUploadCallback payload_callback_obj;
   setFlowFileAsPayload(*session, *context, client, flow_file, payload_callback, payload_callback_obj);
 
diff --git a/extensions/sql/processors/FlowFileSource.cpp b/extensions/sql/processors/FlowFileSource.cpp
index 49c5affd4..f7d7dffd0 100644
--- a/extensions/sql/processors/FlowFileSource.cpp
+++ b/extensions/sql/processors/FlowFileSource.cpp
@@ -53,11 +53,10 @@ void FlowFileSource::FlowFileGenerator::endProcessBatch() {
     return;
   }
 
-  OutputStreamPipe writer{std::make_shared<io::BufferStream>(json_writer_.toString())};
   auto new_flow = session_.create();
   new_flow->addAttribute(FRAGMENT_INDEX, std::to_string(flow_files_.size()));
   new_flow->addAttribute(FRAGMENT_IDENTIFIER, batch_id_.to_string());
-  session_.write(new_flow, &writer);
+  session_.writeBuffer(new_flow, json_writer_.toString());
   flow_files_.push_back(std::move(new_flow));
 }
 
diff --git a/extensions/standard-processors/processors/AttributesToJSON.cpp b/extensions/standard-processors/processors/AttributesToJSON.cpp
index 69dd573d6..49b24ae48 100644
--- a/extensions/standard-processors/processors/AttributesToJSON.cpp
+++ b/extensions/standard-processors/processors/AttributesToJSON.cpp
@@ -167,8 +167,7 @@ void AttributesToJSON::onTrigger(core::ProcessContext* /*context*/, core::Proces
     session->transfer(flow_file, Success);
   } else {
     logger_->log_debug("Writing the following attribute data to flowfile: %s", json_data);
-    AttributesToJSON::WriteCallback callback(json_data);
-    session->write(flow_file, &callback);
+    session->writeBuffer(flow_file, json_data);
     session->transfer(flow_file, Success);
   }
 }
diff --git a/extensions/standard-processors/processors/AttributesToJSON.h b/extensions/standard-processors/processors/AttributesToJSON.h
index 35149adad..b085130b8 100644
--- a/extensions/standard-processors/processors/AttributesToJSON.h
+++ b/extensions/standard-processors/processors/AttributesToJSON.h
@@ -72,17 +72,6 @@ class AttributesToJSON : public core::Processor {
   }
 
  private:
-  class WriteCallback : public OutputStreamCallback {
-   public:
-    explicit WriteCallback(const std::string& json_data) : json_data_(json_data) {}
-    int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
-      const auto write_ret = stream->write(reinterpret_cast<const uint8_t*>(json_data_.data()), json_data_.length());
-      return io::isError(write_ret) ? -1 : gsl::narrow<int64_t>(write_ret);
-    }
-   private:
-    std::string json_data_;
-  };
-
   bool isCoreAttributeToBeFiltered(const std::string& attribute) const;
   std::optional<std::unordered_set<std::string>> getAttributesToBeWritten(const core::FlowFile::AttributeMap& flowfile_attributes) const;
   void addAttributeToJson(rapidjson::Document& document, const std::string& key, const std::optional<std::string>& value);
diff --git a/extensions/standard-processors/processors/DefragmentText.cpp b/extensions/standard-processors/processors/DefragmentText.cpp
index 3d5411e0a..c6c23d0df 100644
--- a/extensions/standard-processors/processors/DefragmentText.cpp
+++ b/extensions/standard-processors/processors/DefragmentText.cpp
@@ -165,19 +165,6 @@ void DefragmentText::updateAttributesForSplitFiles(const core::FlowFile& origina
 }
 
 namespace {
-class AppendFlowFileToFlowFile : public OutputStreamCallback {
- public:
-  explicit AppendFlowFileToFlowFile(const gsl::not_null<std::shared_ptr<core::FlowFile>>& flow_file_to_append, PayloadSerializer& serializer)
-      : flow_file_to_append_(flow_file_to_append), serializer_(serializer) {}
-
-  int64_t process(const std::shared_ptr<io::BaseStream> &stream) override {
-    return serializer_.serialize(flow_file_to_append_, stream);
-  }
- private:
-  const gsl::not_null<std::shared_ptr<core::FlowFile>>& flow_file_to_append_;
-  PayloadSerializer& serializer_;
-};
-
 void updateAppendedAttributes(core::FlowFile& buffered_ff) {
   std::string base_name, post_name, offset_str;
   if (!buffered_ff.getAttribute(textfragmentutils::BASE_NAME_ATTRIBUTE, base_name))
@@ -192,18 +179,6 @@ void updateAppendedAttributes(core::FlowFile& buffered_ff) {
   buffered_ff.setAttribute(core::SpecialFlowAttribute::FILENAME, buffer_new_name);
 }
 
-struct ReadFlowFileContent : public InputStreamCallback {
-  std::string content;
-
-  int64_t process(const std::shared_ptr<io::BaseStream> &stream) override {
-    content.resize(stream->size());
-    const auto ret = stream->read(gsl::make_span(content).as_span<std::byte>());
-    if (io::isError(ret))
-      return -1;
-    return gsl::narrow<int64_t>(ret);
-  }
-};
-
 size_t getSplitPosition(const utils::SMatch& last_match, DefragmentText::PatternLocation pattern_location) {
   size_t split_position = last_match.position(0);
   if (pattern_location == DefragmentText::PatternLocation::END_OF_MESSAGE) {
@@ -218,9 +193,8 @@ bool DefragmentText::splitFlowFileAtLastPattern(core::ProcessSession *session,
                                                 const gsl::not_null<std::shared_ptr<core::FlowFile>> &original_flow_file,
                                                 std::shared_ptr<core::FlowFile> &split_before_last_pattern,
                                                 std::shared_ptr<core::FlowFile> &split_after_last_pattern) const {
-  ReadFlowFileContent read_flow_file_content;
-  session->read(original_flow_file, &read_flow_file_content);
-  auto last_regex_match = utils::getLastRegexMatch(read_flow_file_content.content, pattern_);
+  const auto read_result = session->readBuffer(original_flow_file);
+  auto last_regex_match = utils::getLastRegexMatch(to_string(read_result), pattern_);
   if (!last_regex_match.ready()) {
     split_before_last_pattern = session->clone(original_flow_file);
     split_after_last_pattern = nullptr;
@@ -256,13 +230,14 @@ void DefragmentText::Buffer::append(core::ProcessSession* session, const gsl::no
     store(session, flow_file_to_append);
     return;
   }
-  auto flowFileReader = [&] (const std::shared_ptr<core::FlowFile>& ff, InputStreamCallback* cb) {
+  auto flowFileReader = [&] (const std::shared_ptr<core::FlowFile>& ff, const io::InputStreamCallback& cb) {
     return session->read(ff, cb);
   };
   PayloadSerializer serializer(flowFileReader);
-  AppendFlowFileToFlowFile append_flow_file_to_flow_file(flow_file_to_append, serializer);
   session->add(buffered_flow_file_);
-  session->append(buffered_flow_file_, &append_flow_file_to_flow_file);
+  session->append(buffered_flow_file_, [&serializer, &flow_file_to_append](const auto& output_stream) -> int64_t {
+    return serializer.serialize(flow_file_to_append, output_stream);
+  });
   updateAppendedAttributes(*buffered_flow_file_);
   session->transfer(buffered_flow_file_, Self);
 
diff --git a/extensions/standard-processors/processors/ExecuteProcess.cpp b/extensions/standard-processors/processors/ExecuteProcess.cpp
index 29d444d02..b1b5bfba2 100644
--- a/extensions/standard-processors/processors/ExecuteProcess.cpp
+++ b/extensions/standard-processors/processors/ExecuteProcess.cpp
@@ -166,13 +166,12 @@ void ExecuteProcess::onTrigger(core::ProcessContext *context, core::ProcessSessi
             if (numRead <= 0)
               break;
             logger_->log_debug("Execute Command Respond %zd", numRead);
-            ExecuteProcess::WriteCallback callback(buffer, gsl::narrow<uint64_t>(numRead));
             auto flowFile = session->create();
             if (!flowFile)
               continue;
             flowFile->addAttribute("command", _command);
             flowFile->addAttribute("command.arguments", _commandArgument);
-            session->write(flowFile, &callback);
+            session->writeBuffer(flowFile, gsl::make_span(buffer, gsl::narrow<size_t>(numRead)));
             session->transfer(flowFile, Success);
             session->commit();
           }
@@ -187,16 +186,16 @@ void ExecuteProcess::onTrigger(core::ProcessContext *context, core::ProcessSessi
               if (totalRead > 0) {
                 logger_->log_debug("Execute Command Respond %zu", totalRead);
                 // child exits and close the pipe
-                ExecuteProcess::WriteCallback callback(buffer, totalRead);
+                const auto buffer_span = gsl::make_span(buffer, totalRead);
                 if (!flowFile) {
                   flowFile = session->create();
                   if (!flowFile)
                     break;
                   flowFile->addAttribute("command", _command);
                   flowFile->addAttribute("command.arguments", _commandArgument);
-                  session->write(flowFile, &callback);
+                  session->writeBuffer(flowFile, buffer_span);
                 } else {
-                  session->append(flowFile, &callback);
+                  session->appendBuffer(flowFile, buffer_span);
                 }
                 session->transfer(flowFile, Success);
               }
@@ -205,16 +204,15 @@ void ExecuteProcess::onTrigger(core::ProcessContext *context, core::ProcessSessi
               if (numRead == static_cast<ssize_t>((sizeof(buffer) - totalRead))) {
                 // we reach the max buffer size
                 logger_->log_debug("Execute Command Max Respond %zu", sizeof(buffer));
-                ExecuteProcess::WriteCallback callback(buffer, sizeof(buffer));
                 if (!flowFile) {
                   flowFile = session->create();
                   if (!flowFile)
                     continue;
                   flowFile->addAttribute("command", _command);
                   flowFile->addAttribute("command.arguments", _commandArgument);
-                  session->write(flowFile, &callback);
+                  session->writeBuffer(flowFile, buffer);
                 } else {
-                  session->append(flowFile, &callback);
+                  session->appendBuffer(flowFile, buffer);
                 }
                 // Rewind
                 totalRead = 0;
diff --git a/extensions/standard-processors/processors/ExecuteProcess.h b/extensions/standard-processors/processors/ExecuteProcess.h
index 9960c7528..b6e14a9d3 100644
--- a/extensions/standard-processors/processors/ExecuteProcess.h
+++ b/extensions/standard-processors/processors/ExecuteProcess.h
@@ -80,23 +80,6 @@ class ExecuteProcess : public core::Processor {
   // Supported Relationships
   static core::Relationship Success;
 
-  // Nest Callback Class for write stream
-  class WriteCallback : public OutputStreamCallback {
-   public:
-    WriteCallback(char *data, uint64_t size)
-        : _data(data),
-          _dataSize(size) {
-    }
-    char *_data;
-    uint64_t _dataSize;
-    // void process(std::ofstream *stream) {
-    int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
-      if (!_data || _dataSize <= 0) return 0;
-      const auto write_ret = stream->write(reinterpret_cast<uint8_t*>(_data), _dataSize);
-      return io::isError(write_ret) ? -1 : gsl::narrow<int64_t>(write_ret);
-    }
-  };
-
  public:
   // OnTrigger method, implemented by NiFi ExecuteProcess
   void onTrigger(core::ProcessContext *context, core::ProcessSession *session) override;
diff --git a/extensions/standard-processors/processors/ExtractText.cpp b/extensions/standard-processors/processors/ExtractText.cpp
index 68cd6d4e3..6febe23e3 100644
--- a/extensions/standard-processors/processors/ExtractText.cpp
+++ b/extensions/standard-processors/processors/ExtractText.cpp
@@ -107,15 +107,16 @@ void ExtractText::onTrigger(core::ProcessContext *context, core::ProcessSession
     return;
   }
 
-  ReadCallback cb(flowFile, context, logger_);
-  session->read(flowFile, &cb);
+  session->read(flowFile, ReadCallback{flowFile, context, logger_});
   session->transfer(flowFile, Success);
 }
 
-int64_t ExtractText::ReadCallback::process(const std::shared_ptr<io::BaseStream>& stream) {
+int64_t ExtractText::ReadCallback::operator()(const std::shared_ptr<io::BaseStream>& stream) const {
   size_t read_size = 0;
   bool regex_mode;
   size_t size_limit = flowFile_->getSize();
+  std::vector<std::byte> buffer;
+  buffer.resize(std::min(gsl::narrow<size_t>(flowFile_->getSize()), MAX_BUFFER_SIZE));
 
   std::string attrKey, sizeLimitStr;
   ctx_->getProperty(Attribute.getName(), attrKey);
@@ -131,8 +132,8 @@ int64_t ExtractText::ReadCallback::process(const std::shared_ptr<io::BaseStream>
 
   while (read_size < size_limit) {
     // Don't read more than config limit or the size of the buffer
-    const auto length = std::min(size_limit - read_size, buffer_.size());
-    const auto ret = stream->read(gsl::make_span(buffer_).subspan(0, length));
+    const auto length = std::min(size_limit - read_size, buffer.size());
+    const auto ret = stream->read(gsl::make_span(buffer).subspan(0, length));
 
     if (io::isError(ret)) {
       return -1;  // Stream error
@@ -140,7 +141,7 @@ int64_t ExtractText::ReadCallback::process(const std::shared_ptr<io::BaseStream>
       break;  // End of stream, no more data
     }
 
-    contentStream.write(reinterpret_cast<const char*>(buffer_.data()), gsl::narrow<std::streamsize>(ret));
+    contentStream.write(reinterpret_cast<const char*>(buffer.data()), gsl::narrow<std::streamsize>(ret));
     read_size += ret;
     if (contentStream.fail()) {
       return -1;
@@ -219,7 +220,6 @@ ExtractText::ReadCallback::ReadCallback(std::shared_ptr<core::FlowFile> flowFile
     : flowFile_(std::move(flowFile)),
       ctx_(ctx),
       logger_(std::move(lgr)) {
-  buffer_.resize(std::min(gsl::narrow<size_t>(flowFile_->getSize()), MAX_BUFFER_SIZE));
 }
 
 REGISTER_RESOURCE(ExtractText, "Extracts the content of a FlowFile and places it into an attribute.");
diff --git a/extensions/standard-processors/processors/ExtractText.h b/extensions/standard-processors/processors/ExtractText.h
index 57cf788a5..ca84a7cdb 100644
--- a/extensions/standard-processors/processors/ExtractText.h
+++ b/extensions/standard-processors/processors/ExtractText.h
@@ -68,16 +68,14 @@ class ExtractText : public core::Processor {
     return true;
   }
 
-  class ReadCallback : public InputStreamCallback {
+  class ReadCallback {
    public:
     ReadCallback(std::shared_ptr<core::FlowFile> flowFile, core::ProcessContext *ct, std::shared_ptr<core::logging::Logger> lgr);
-    ~ReadCallback() override = default;
-    int64_t process(const std::shared_ptr<io::BaseStream>& stream) override;
+    int64_t operator()(const std::shared_ptr<io::BaseStream>& stream) const;
 
    private:
     std::shared_ptr<core::FlowFile> flowFile_;
     core::ProcessContext *ctx_;
-    std::vector<std::byte> buffer_;
     std::shared_ptr<core::logging::Logger> logger_;
   };
 
diff --git a/extensions/standard-processors/processors/GenerateFlowFile.cpp b/extensions/standard-processors/processors/GenerateFlowFile.cpp
index 654e5ff1f..7dd9d6423 100644
--- a/extensions/standard-processors/processors/GenerateFlowFile.cpp
+++ b/extensions/standard-processors/processors/GenerateFlowFile.cpp
@@ -149,11 +149,9 @@ void GenerateFlowFile::onTrigger(core::ProcessContext* /*context*/, core::Proces
       if (fileSize_ > 0) {
         generateData(data, textData_);
       }
-      GenerateFlowFile::WriteCallback callback(data);
-      session->write(flowFile, &callback);
+      session->writeBuffer(flowFile, data);
     } else {
-      GenerateFlowFile::WriteCallback callback(data_);
-      session->write(flowFile, &callback);
+      session->writeBuffer(flowFile, data_);
     }
     session->transfer(flowFile, Success);
   }
diff --git a/extensions/standard-processors/processors/GenerateFlowFile.h b/extensions/standard-processors/processors/GenerateFlowFile.h
index 343e7330f..5788e68aa 100644
--- a/extensions/standard-processors/processors/GenerateFlowFile.h
+++ b/extensions/standard-processors/processors/GenerateFlowFile.h
@@ -64,19 +64,6 @@ class GenerateFlowFile : public core::Processor {
   EXTENSIONAPI static const char *DATA_FORMAT_TEXT;
   // Supported Relationships
   EXTENSIONAPI static core::Relationship Success;
-  // Nest Callback Class for write stream
-  class WriteCallback : public OutputStreamCallback {
-   public:
-    explicit WriteCallback(const std::vector<char>& data)
-        :data_(&data)
-    { }
-    gsl::not_null<const std::vector<char>*> data_;
-    int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
-      if (data_->empty()) return 0;
-      const auto write_ret = stream->write(reinterpret_cast<const uint8_t*>(data_->data()), data_->size());
-      return io::isError(write_ret) ? -1 : gsl::narrow<int64_t>(write_ret);
-    }
-  };
 
  public:
   void onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) override;
diff --git a/extensions/standard-processors/processors/GetFile.cpp b/extensions/standard-processors/processors/GetFile.cpp
index 721860f36..2d59cdc25 100644
--- a/extensions/standard-processors/processors/GetFile.cpp
+++ b/extensions/standard-processors/processors/GetFile.cpp
@@ -189,8 +189,7 @@ void GetFile::getSingleFile(core::ProcessSession& session, const std::string& fi
   flow_file->addAttribute(core::SpecialFlowAttribute::ABSOLUTE_PATH, file_name);
 
   try {
-    utils::FileReaderCallback file_reader_callback{file_name};
-    session.write(flow_file, &file_reader_callback);
+    session.write(flow_file, utils::FileReaderCallback{file_name});
     session.transfer(flow_file, Success);
     if (!request_.keepSourceFile) {
       auto remove_status = remove(file_name.c_str());
diff --git a/extensions/standard-processors/processors/GetTCP.cpp b/extensions/standard-processors/processors/GetTCP.cpp
index 0e3e42241..f3aa31314 100644
--- a/extensions/standard-processors/processors/GetTCP.cpp
+++ b/extensions/standard-processors/processors/GetTCP.cpp
@@ -85,9 +85,7 @@ int16_t DataHandler::handle(std::string source, uint8_t *message, size_t size, b
   std::shared_ptr<core::ProcessSession> my_session = sessionFactory_->createSession();
   std::shared_ptr<core::FlowFile> flowFile = my_session->create();
 
-  DataHandlerCallback callback(message, size);
-
-  my_session->write(flowFile, &callback);
+  my_session->writeBuffer(flowFile, gsl::make_span(reinterpret_cast<const std::byte*>(message), size));
 
   my_session->putAttribute(flowFile, SOURCE_ENDPOINT_ATTRIBUTE, source);
 
diff --git a/extensions/standard-processors/processors/GetTCP.h b/extensions/standard-processors/processors/GetTCP.h
index 5ab1630a2..53845e6b7 100644
--- a/extensions/standard-processors/processors/GetTCP.h
+++ b/extensions/standard-processors/processors/GetTCP.h
@@ -89,25 +89,6 @@ class SocketAfterExecute : public utils::AfterExecute<int> {
   std::map<std::string, std::future<int>*> *list_;
 };
 
-class DataHandlerCallback : public OutputStreamCallback {
- public:
-  DataHandlerCallback(uint8_t *message, size_t size)
-      : message_(message),
-        size_(size) {
-  }
-
-  ~DataHandlerCallback() override = default;
-
-  int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
-    const auto write_ret = stream->write(message_, size_);
-    return io::isError(write_ret) ? -1 : gsl::narrow<int64_t>(write_ret);
-  }
-
- private:
-  uint8_t *message_;
-  size_t size_;
-};
-
 class DataHandler {
  public:
   DataHandler(std::shared_ptr<core::ProcessSessionFactory> sessionFactory) // NOLINT
diff --git a/extensions/standard-processors/processors/HashContent.cpp b/extensions/standard-processors/processors/HashContent.cpp
index f2237a321..58007fe30 100644
--- a/extensions/standard-processors/processors/HashContent.cpp
+++ b/extensions/standard-processors/processors/HashContent.cpp
@@ -25,6 +25,7 @@
 #include <memory>
 #include <set>
 #include <string>
+#include <utility>
 
 #include "HashContent.h"
 #include "core/ProcessContext.h"
@@ -91,28 +92,20 @@ void HashContent::onTrigger(core::ProcessContext *, core::ProcessSession *sessio
   }
 
   logger_->log_trace("attempting read");
-  ReadCallback cb(flowFile, *this);
-  session->read(flowFile, &cb);
-  session->transfer(flowFile, Success);
-}
-
-int64_t HashContent::ReadCallback::process(const std::shared_ptr<io::BaseStream>& stream) {
-  // This throws in case algo is not found, but that's fine
-  parent_.logger_->log_trace("Searching for %s", parent_.algoName_);
-  auto algo = HashAlgos.at(parent_.algoName_);
+  session->read(flowFile, [&flowFile, this](const std::shared_ptr<io::BaseStream>& stream) {
+    // This throws in case algo is not found, but that's fine
+    logger_->log_trace("Searching for %s", algoName_);
+    auto algo = HashAlgos.at(algoName_);
 
-  const auto& ret_val = algo(stream);
+    const auto& ret_val = algo(stream);
 
-  flowFile_->setAttribute(parent_.attrKey_, ret_val.first);
+    flowFile->setAttribute(attrKey_, ret_val.first);
 
-  return ret_val.second;
+    return ret_val.second;
+  });
+  session->transfer(flowFile, Success);
 }
 
-HashContent::ReadCallback::ReadCallback(std::shared_ptr<core::FlowFile> flowFile, const HashContent& parent)
-  : flowFile_(flowFile),
-    parent_(parent)
-  {}
-
 REGISTER_RESOURCE(HashContent,"HashContent calculates the checksum of the content of the flowfile and adds it as an attribute. Configuration options exist to select hashing algorithm and set the name of the attribute."); // NOLINT
 
 }  // namespace processors
diff --git a/extensions/standard-processors/processors/HashContent.h b/extensions/standard-processors/processors/HashContent.h
index 66646999c..ce0089f22 100644
--- a/extensions/standard-processors/processors/HashContent.h
+++ b/extensions/standard-processors/processors/HashContent.h
@@ -153,17 +153,6 @@ class HashContent : public core::Processor {
   //! Initialize, over write by NiFi HashContent
   void initialize() override;
 
-  class ReadCallback : public InputStreamCallback {
-   public:
-    ReadCallback(std::shared_ptr<core::FlowFile> flowFile, const HashContent& parent);
-    ~ReadCallback() override = default;
-    int64_t process(const std::shared_ptr<io::BaseStream>& stream) override;
-
-   private:
-    std::shared_ptr<core::FlowFile> flowFile_;
-    const HashContent& parent_;
-  };
-
  private:
   core::annotation::Input getInputRequirement() const override {
     return core::annotation::Input::INPUT_REQUIRED;
diff --git a/extensions/standard-processors/processors/LogAttribute.cpp b/extensions/standard-processors/processors/LogAttribute.cpp
index 6acffb933..3f3f8ab3a 100644
--- a/extensions/standard-processors/processors/LogAttribute.cpp
+++ b/extensions/standard-processors/processors/LogAttribute.cpp
@@ -144,14 +144,13 @@ void LogAttribute::onTrigger(const std::shared_ptr<core::ProcessContext> &contex
     }
     if (logPayload && flow->getSize() <= 1024 * 1024) {
       message << "\n" << "Payload:" << "\n";
-      ReadCallback callback(logger_, gsl::narrow<size_t>(flow->getSize()));
-      session->read(flow, &callback);
+      const auto read_result = session->readBuffer(flow);
 
       std::string printable_payload;
       if (hexencode_) {
-        printable_payload = utils::StringUtils::to_hex(callback.buffer_);
+        printable_payload = utils::StringUtils::to_hex(read_result.buffer);
       } else {
-        printable_payload = utils::span_to<std::string>(gsl::make_span(callback.buffer_).as_span<char>());
+        printable_payload = to_string(read_result);
       }
 
       if (max_line_length_ == 0U) {
diff --git a/extensions/standard-processors/processors/LogAttribute.h b/extensions/standard-processors/processors/LogAttribute.h
index f7f25da4d..92b30c0f4 100644
--- a/extensions/standard-processors/processors/LogAttribute.h
+++ b/extensions/standard-processors/processors/LogAttribute.h
@@ -95,25 +95,6 @@ class LogAttribute : public core::Processor {
       return false;
     }
   }
-  // Nest Callback Class for read stream
-  class ReadCallback : public InputStreamCallback {
-   public:
-    ReadCallback(std::shared_ptr<core::logging::Logger> logger, size_t size)
-        : logger_(std::move(logger))
-        , buffer_(size)  {
-    }
-    int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
-      if (buffer_.empty()) return 0U;
-      const auto ret = stream->read(buffer_);
-      if (ret != buffer_.size()) {
-        logger_->log_error("%zu bytes were requested from the stream but %zu bytes were read. Rolling back.", buffer_.size(), size_t{ret});
-        throw Exception(PROCESSOR_EXCEPTION, "Failed to read the entire FlowFile.");
-      }
-      return gsl::narrow<int64_t>(buffer_.size());
-    }
-    std::shared_ptr<core::logging::Logger> logger_;
-    std::vector<std::byte> buffer_;
-  };
 
  public:
   void onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &factory) override;
diff --git a/extensions/standard-processors/processors/PutFile.cpp b/extensions/standard-processors/processors/PutFile.cpp
index 1e2896306..c6586f331 100644
--- a/extensions/standard-processors/processors/PutFile.cpp
+++ b/extensions/standard-processors/processors/PutFile.cpp
@@ -243,7 +243,7 @@ bool PutFile::putFile(core::ProcessSession *session, std::shared_ptr<core::FlowF
 
   if (flowFile->getSize() > 0) {
     ReadCallback cb(tmpFile, destFile);
-    session->read(flowFile, &cb);
+    session->read(flowFile, std::ref(cb));
     logger_->log_debug("Committing %s", destFile);
     success = cb.commit();
   } else {
@@ -314,7 +314,7 @@ PutFile::ReadCallback::ReadCallback(std::string tmp_file, std::string dest_file)
 }
 
 // Copy the entire file contents to the temporary file
-int64_t PutFile::ReadCallback::process(const std::shared_ptr<io::BaseStream>& stream) {
+int64_t PutFile::ReadCallback::operator()(const std::shared_ptr<io::BaseStream>& stream) {
   // Copy file contents into tmp file
   write_succeeded_ = false;
   size_t size = 0;
diff --git a/extensions/standard-processors/processors/PutFile.h b/extensions/standard-processors/processors/PutFile.h
index 31899e741..959da1cbe 100644
--- a/extensions/standard-processors/processors/PutFile.h
+++ b/extensions/standard-processors/processors/PutFile.h
@@ -79,11 +79,11 @@ class PutFile : public core::Processor {
   void onTrigger(core::ProcessContext *context, core::ProcessSession *session) override;
   void initialize() override;
 
-  class ReadCallback : public InputStreamCallback {
+  class ReadCallback {
    public:
     ReadCallback(std::string tmp_file, std::string dest_file);
-    ~ReadCallback() override;
-    int64_t process(const std::shared_ptr<io::BaseStream>& stream) override;
+    ~ReadCallback();
+    int64_t operator()(const std::shared_ptr<io::BaseStream>& stream);
     bool commit();
 
    private:
diff --git a/extensions/standard-processors/processors/ReplaceText.cpp b/extensions/standard-processors/processors/ReplaceText.cpp
index 4c9b5608d..cfd7daba1 100644
--- a/extensions/standard-processors/processors/ReplaceText.cpp
+++ b/extensions/standard-processors/processors/ReplaceText.cpp
@@ -175,43 +175,13 @@ ReplaceText::Parameters ReplaceText::readParameters(const std::shared_ptr<core::
   return parameters;
 }
 
-namespace {
-
-struct ReadFlowFileIntoBuffer : public InputStreamCallback {
-  std::vector<std::byte> buffer_;
-
-  int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
-    buffer_.resize(stream->size());
-    size_t bytes_read = stream->read(buffer_);
-    return io::isError(bytes_read) ? -1 : gsl::narrow<int64_t>(bytes_read);
-  }
-};
-
-struct WriteBufferToFlowFile : public OutputStreamCallback {
-  const std::vector<uint8_t>& buffer_;
-
-  explicit WriteBufferToFlowFile(const std::vector<uint8_t>& buffer) : buffer_(buffer) {}
-
-  int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
-    size_t bytes_written = stream->write(buffer_, buffer_.size());
-    return io::isError(bytes_written) ? -1 : gsl::narrow<int64_t>(bytes_written);
-  }
-};
-
-}  // namespace
-
 void ReplaceText::replaceTextInEntireFile(const std::shared_ptr<core::FlowFile>& flow_file, const std::shared_ptr<core::ProcessSession>& session, const Parameters& parameters) const {
   gsl_Expects(flow_file);
   gsl_Expects(session);
 
   try {
-    const auto read_result = session->readBuffer(flow_file);
-    std::string output = applyReplacements(to_string(read_result), flow_file, parameters);
-    std::vector<uint8_t> modified_text{output.begin(), output.end()};
-
-    WriteBufferToFlowFile write_callback{modified_text};
-    session->write(flow_file, &write_callback);
-
+    const auto input = to_string(session->readBuffer(flow_file));
+    session->writeBuffer(flow_file, applyReplacements(input, flow_file, parameters));
     session->transfer(flow_file, Success);
   } catch (const Exception& exception) {
     logger_->log_error("Error in ReplaceText (Entire text mode): %s", exception.what());
@@ -239,7 +209,7 @@ void ReplaceText::replaceTextLineByLine(const std::shared_ptr<core::FlowFile>& f
       }
       throw Exception{PROCESSOR_EXCEPTION, utils::StringUtils::join_pack("Unsupported ", LineByLineEvaluationMode.getName(), ": ", line_by_line_evaluation_mode_.toString())};
     }};
-    session->readWrite(flow_file, &read_write_callback);
+    session->readWrite(flow_file, std::move(read_write_callback));
     session->transfer(flow_file, Success);
   } catch (const Exception& exception) {
     logger_->log_error("Error in ReplaceText (Line-by-Line mode): %s", exception.what());
diff --git a/extensions/standard-processors/processors/RouteText.cpp b/extensions/standard-processors/processors/RouteText.cpp
index 3c97339f0..c14632ede 100644
--- a/extensions/standard-processors/processors/RouteText.cpp
+++ b/extensions/standard-processors/processors/RouteText.cpp
@@ -140,14 +140,14 @@ void RouteText::onSchedule(core::ProcessContext* context, core::ProcessSessionFa
   context->getProperty(GroupingFallbackValue.getName(), group_fallback_);
 }
 
-class RouteText::ReadCallback : public InputStreamCallback {
+class RouteText::ReadCallback {
   using Fn = std::function<void(Segment)>;
 
  public:
   ReadCallback(Segmentation segmentation, size_t file_size, Fn&& fn)
     : segmentation_(segmentation), file_size_(file_size), fn_(std::move(fn)) {}
 
-  int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
+  int64_t operator()(const std::shared_ptr<io::BaseStream>& stream) const {
     std::vector<std::byte> buffer;
     buffer.resize(file_size_);
     size_t ret = stream->read(buffer);
@@ -364,7 +364,7 @@ void RouteText::onTrigger(core::ProcessContext *context, core::ProcessSession *s
     }
     throw Exception(PROCESSOR_EXCEPTION, "Unknown routing strategy");
   });
-  session->read(flow_file, &callback);
+  session->read(flow_file, std::move(callback));
 
   for (const auto& [route, content] : flow_file_contents) {
     auto new_flow_file = session->create(flow_file);
diff --git a/extensions/standard-processors/processors/TailFile.cpp b/extensions/standard-processors/processors/TailFile.cpp
index 2e065a6ba..ef0f00307 100644
--- a/extensions/standard-processors/processors/TailFile.cpp
+++ b/extensions/standard-processors/processors/TailFile.cpp
@@ -211,7 +211,7 @@ void openFile(const std::string &file_name, uint64_t offset, std::ifstream &inpu
 
 constexpr std::size_t BUFFER_SIZE = 4096;
 
-class FileReaderCallback : public OutputStreamCallback {
+class FileReaderCallback {
  public:
   FileReaderCallback(const std::string &file_name,
                      uint64_t offset,
@@ -222,7 +222,7 @@ class FileReaderCallback : public OutputStreamCallback {
     openFile(file_name, offset, input_stream_, logger_);
   }
 
-  int64_t process(const std::shared_ptr<io::BaseStream>& output_stream) override {
+  int64_t operator()(const std::shared_ptr<io::BaseStream>& output_stream) {
     io::CRCStream<io::BaseStream> crc_stream{gsl::make_not_null(output_stream.get()), checksum_};
 
     uint64_t num_bytes_written = 0;
@@ -282,7 +282,7 @@ class FileReaderCallback : public OutputStreamCallback {
   bool latest_flow_file_ends_with_delimiter_ = true;
 };
 
-class WholeFileReaderCallback : public OutputStreamCallback {
+class WholeFileReaderCallback {
  public:
   WholeFileReaderCallback(const std::string &file_name,
                           uint64_t offset,
@@ -295,7 +295,7 @@ class WholeFileReaderCallback : public OutputStreamCallback {
     return checksum_;
   }
 
-  int64_t process(const std::shared_ptr<io::BaseStream>& output_stream) override {
+  int64_t operator()(const std::shared_ptr<io::BaseStream>& output_stream) {
     std::array<char, BUFFER_SIZE> buffer;
 
     io::CRCStream<io::BaseStream> crc_stream{gsl::make_not_null(output_stream.get()), checksum_};
@@ -796,7 +796,7 @@ void TailFile::processSingleFile(const std::shared_ptr<core::ProcessSession> &se
 
     while (file_reader.hasMoreToRead()) {
       auto flow_file = session->create();
-      session->write(flow_file, &file_reader);
+      session->write(flow_file, std::ref(file_reader));
 
       if (file_reader.useLatestFlowFile()) {
         updateFlowFileAttributes(full_file_name, state_copy, fileName, baseName, extension, flow_file);
@@ -816,7 +816,7 @@ void TailFile::processSingleFile(const std::shared_ptr<core::ProcessSession> &se
   } else {
     WholeFileReaderCallback file_reader{full_file_name, state.position_, state.checksum_};
     auto flow_file = session->create();
-    session->write(flow_file, &file_reader);
+    session->write(flow_file, std::ref(file_reader));
 
     updateFlowFileAttributes(full_file_name, state, fileName, baseName, extension, flow_file);
     session->transfer(flow_file, Success);
diff --git a/extensions/standard-processors/tests/unit/DefragmentTextTests.cpp b/extensions/standard-processors/tests/unit/DefragmentTextTests.cpp
index 5398434a9..1f7c4e1e0 100644
--- a/extensions/standard-processors/tests/unit/DefragmentTextTests.cpp
+++ b/extensions/standard-processors/tests/unit/DefragmentTextTests.cpp
@@ -319,8 +319,6 @@ class FragmentGenerator : public core::Processor {
   void onTrigger(core::ProcessContext*, core::ProcessSession* session) override {
     std::vector<core::FlowFile> flow_files;
     for (const size_t max_i = i_ + batch_size_; i_ < fragment_contents_.size() && i_ < max_i; ++i_) {
-      auto& fragment_content = fragment_contents_[i_];
-      WriteCallback callback(fragment_content);
       std::shared_ptr<core::FlowFile> flow_file = session->create();
       if (base_name_attribute_)
         flow_file->addAttribute(textfragmentutils::BASE_NAME_ATTRIBUTE, *base_name_attribute_);
@@ -329,8 +327,9 @@ class FragmentGenerator : public core::Processor {
       if (absolute_path_attribute_)
         flow_file->addAttribute(core::SpecialFlowAttribute::ABSOLUTE_PATH, *absolute_path_attribute_);
       flow_file->addAttribute(textfragmentutils::OFFSET_ATTRIBUTE, std::to_string(offset_));
+      auto& fragment_content = fragment_contents_[i_];
       offset_ += fragment_content.size();
-      session->write(flow_file, &callback);
+      session->writeBuffer(flow_file, fragment_content);
       session->transfer(flow_file, Success);
     }
   }
@@ -347,17 +346,6 @@ class FragmentGenerator : public core::Processor {
   void clearBaseNameAttribute() { base_name_attribute_.reset(); }
 
  protected:
-  struct WriteCallback : public org::apache::nifi::minifi::OutputStreamCallback {
-    const gsl::span<const uint8_t> content_;
-
-    explicit WriteCallback(const std::string& content) : content_(reinterpret_cast<const uint8_t*>(content.data()), content.size()) {}
-
-    int64_t process(const std::shared_ptr<org::apache::nifi::minifi::io::BaseStream> &stream) override {
-      size_t bytes_written = stream->write(content_.begin(), content_.size());
-      return org::apache::nifi::minifi::io::isError(bytes_written) ? -1 : gsl::narrow<int64_t>(bytes_written);
-    }
-  };
-
   size_t offset_ = 0;
   size_t batch_size_ = 1;
   size_t i_ = 0;
diff --git a/extensions/usb-camera/GetUSBCamera.cpp b/extensions/usb-camera/GetUSBCamera.cpp
index 42e21f553..279037848 100644
--- a/extensions/usb-camera/GetUSBCamera.cpp
+++ b/extensions/usb-camera/GetUSBCamera.cpp
@@ -109,25 +109,18 @@ void GetUSBCamera::onFrame(uvc_frame_t *frame, void *ptr) {
     flow_file->getAttribute("filename", flow_file_name);
     cb_data->logger->log_info("Created flow file: %s", flow_file_name);
 
-    // Initialize callback according to output format
-    std::shared_ptr<OutputStreamCallback> write_cb;
-
-    if (cb_data->format == "PNG") {
-      write_cb = std::make_shared<GetUSBCamera::PNGWriteCallback>(cb_data->png_write_mtx,
-                                                                  cb_data->frame_buffer,
-                                                                  cb_data->device_width,
-                                                                  cb_data->device_height);
-    } else if (cb_data->format == "RAW") {
-      write_cb = std::make_shared<GetUSBCamera::RawWriteCallback>(cb_data->frame_buffer);
+    if (cb_data->format == "RAW") {
+      session->writeBuffer(flow_file, gsl::make_span(static_cast<const std::byte*>(cb_data->frame_buffer->data), cb_data->frame_buffer->data_bytes));
     } else {
-      cb_data->logger->log_warn("Invalid format specified (%s); defaulting to PNG", cb_data->format);
-      write_cb = std::make_shared<GetUSBCamera::PNGWriteCallback>(cb_data->png_write_mtx,
-                                                                  cb_data->frame_buffer,
-                                                                  cb_data->device_width,
-                                                                  cb_data->device_height);
+      if (cb_data->format != "PNG") {
+        cb_data->logger->log_warn("Invalid format specified (%s); defaulting to PNG", cb_data->format);
+      }
+      session->write(flow_file, GetUSBCamera::PNGWriteCallback{
+          cb_data->png_write_mtx,
+          cb_data->frame_buffer,
+          cb_data->device_width,
+          cb_data->device_height});
     }
-
-    session->write(flow_file, write_cb.get());
     session->transfer(flow_file, GetUSBCamera::Success);
     session->commit();
   } catch (std::exception &exception) {
@@ -407,7 +400,7 @@ GetUSBCamera::PNGWriteCallback::PNGWriteCallback(std::shared_ptr<std::mutex> wri
       height_(height) {
 }
 
-int64_t GetUSBCamera::PNGWriteCallback::process(const std::shared_ptr<io::BaseStream>& stream) {
+int64_t GetUSBCamera::PNGWriteCallback::operator()(const std::shared_ptr<io::BaseStream>& stream) {
   std::lock_guard<std::mutex> lock(*png_write_mtx_);
   logger_->log_info("Writing %d bytes of raw capture data to PNG output", frame_->data_bytes);
   png_structp png = png_create_write_struct(PNG_LIBPNG_VER_STRING, nullptr, nullptr, nullptr);
@@ -467,16 +460,6 @@ int64_t GetUSBCamera::PNGWriteCallback::process(const std::shared_ptr<io::BaseSt
   return io::isError(write_ret) ? -1 : gsl::narrow<int64_t>(write_ret);
 }
 
-GetUSBCamera::RawWriteCallback::RawWriteCallback(uvc_frame_t *frame)
-    : frame_(frame) {
-}
-
-int64_t GetUSBCamera::RawWriteCallback::process(const std::shared_ptr<io::BaseStream>& stream) {
-  logger_->log_info("Writing %d bytes of raw capture data", frame_->data_bytes);
-  const auto write_ret = stream->write(reinterpret_cast<uint8_t*>(frame_->data), frame_->data_bytes);
-  return io::isError(write_ret) ? -1 : gsl::narrow<int64_t>(write_ret);
-}
-
 REGISTER_RESOURCE(GetUSBCamera, "Gets images from USB Video Class (UVC)-compatible devices. Outputs one flow file per frame at the rate specified by the FPS property in the format specified by the Format property.");  // NOLINT line length
 
 } /* namespace processors */
diff --git a/extensions/usb-camera/GetUSBCamera.h b/extensions/usb-camera/GetUSBCamera.h
index 49da8c64f..047fe1ec3 100644
--- a/extensions/usb-camera/GetUSBCamera.h
+++ b/extensions/usb-camera/GetUSBCamera.h
@@ -93,10 +93,10 @@ class GetUSBCamera : public core::Processor {
   static void onFrame(uvc_frame_t *frame, void *ptr);
 
   // Write callback for storing camera capture data in PNG format
-  class PNGWriteCallback : public OutputStreamCallback {
+  class PNGWriteCallback {
    public:
     PNGWriteCallback(std::shared_ptr<std::mutex> write_mtx, uvc_frame_t *frame, uint32_t width, uint32_t height);
-    int64_t process(const std::shared_ptr<io::BaseStream>& stream) override;
+    int64_t operator()(const std::shared_ptr<io::BaseStream>& stream);
 
    private:
     std::shared_ptr<std::mutex> png_write_mtx_;
@@ -107,17 +107,6 @@ class GetUSBCamera : public core::Processor {
     std::shared_ptr<core::logging::Logger> logger_ = core::logging::LoggerFactory<PNGWriteCallback>::getLogger();
   };
 
-  // Write callback for storing camera capture data as a raw RGB pixel buffer
-  class RawWriteCallback : public OutputStreamCallback {
-   public:
-    explicit RawWriteCallback(uvc_frame_t *frame);
-    int64_t process(const std::shared_ptr<io::BaseStream>& stream) override;
-
-   private:
-    uvc_frame_t *frame_;
-    std::shared_ptr<core::logging::Logger> logger_ = core::logging::LoggerFactory<RawWriteCallback>::getLogger();
-  };
-
  private:
   std::shared_ptr<core::logging::Logger> logger_ = core::logging::LoggerFactory<GetUSBCamera>::getLogger();
   static std::shared_ptr<utils::IdGenerator> id_generator_;
diff --git a/extensions/windows-event-log/CollectorInitiatedSubscription.cpp b/extensions/windows-event-log/CollectorInitiatedSubscription.cpp
index 69f53402d..434036e75 100644
--- a/extensions/windows-event-log/CollectorInitiatedSubscription.cpp
+++ b/extensions/windows-event-log/CollectorInitiatedSubscription.cpp
@@ -627,29 +627,13 @@ void CollectorInitiatedSubscription::unsubscribe() {
 }
 
 int CollectorInitiatedSubscription::processQueue(const std::shared_ptr<core::ProcessSession> &session) {
-  struct WriteCallback: public OutputStreamCallback {
-    explicit WriteCallback(const std::string& str)
-      : str_(&str) {
-    }
-
-    int64_t process(const std::shared_ptr<io::BaseStream>& stream) {
-      const auto write_ret = stream->write(reinterpret_cast<const uint8_t*>(str_->data()), str_->size());
-      return io::isError(write_ret) ? -1 : gsl::narrow<int64_t>(write_ret);
-    }
-
-    gsl::not_null<const std::string*> str_;
-  };
-
   int flowFileCount = 0;
 
   std::string xml;
   while (renderedXMLs_.try_dequeue(xml)) {
     auto flowFile = session->create();
 
-    {
-      WriteCallback wc{ xml };
-      session->write(flowFile, &wc);
-    }
+    session->writeBuffer(flowFile, xml);
     session->putAttribute(flowFile, core::SpecialFlowAttribute::MIME_TYPE, "application/xml");
     session->getProvenanceReporter()->receive(flowFile, provenanceUri_, getUUIDStr(), "Consume windows event logs", 0ms);
     session->transfer(flowFile, s_success);
diff --git a/extensions/windows-event-log/ConsumeWindowsEventLog.cpp b/extensions/windows-event-log/ConsumeWindowsEventLog.cpp
index a2c8e7e6b..7a0033340 100644
--- a/extensions/windows-event-log/ConsumeWindowsEventLog.cpp
+++ b/extensions/windows-event-log/ConsumeWindowsEventLog.cpp
@@ -693,24 +693,8 @@ void ConsumeWindowsEventLog::refreshTimeZoneData() {
 }
 
 void ConsumeWindowsEventLog::putEventRenderFlowFileToSession(const EventRender& eventRender, core::ProcessSession& session) const {
-  struct WriteCallback : public OutputStreamCallback {
-    explicit WriteCallback(const std::string& str)
-        : str_(str) {
-    }
-
-    int64_t process(const std::shared_ptr<io::BaseStream>& stream) {
-      const auto write_ret = stream->write(reinterpret_cast<const uint8_t*>(str_.c_str()), str_.size());
-      return io::isError(write_ret) ? -1 : gsl::narrow<int64_t>(write_ret);
-    }
-
-    const std::string& str_;
-  };
-
   auto commitFlowFile = [&] (const std::shared_ptr<core::FlowFile>& flowFile, const std::string& content, const std::string& mimeType) {
-    {
-      WriteCallback wc{ content };
-      session.write(flowFile, &wc);
-    }
+    session.writeBuffer(flowFile, content);
     session.putAttribute(flowFile, core::SpecialFlowAttribute::MIME_TYPE, mimeType);
     session.putAttribute(flowFile, "Timezone name", timezone_name_);
     session.putAttribute(flowFile, "Timezone offset", timezone_offset_);
diff --git a/libminifi/include/core/ProcessSession.h b/libminifi/include/core/ProcessSession.h
index 171c446a6..ed8ba3355 100644
--- a/libminifi/include/core/ProcessSession.h
+++ b/libminifi/include/core/ProcessSession.h
@@ -95,24 +95,21 @@ class ProcessSession : public ReferenceContainer {
   // Remove Flow File
   void remove(const std::shared_ptr<core::FlowFile> &flow);
   // Execute the given read callback against the content
-  int64_t read(const std::shared_ptr<core::FlowFile> &flow, InputStreamCallback *callback);
-  int64_t read(const std::shared_ptr<core::FlowFile> &flow, InputStreamCallback&& callback) {
-    return read(flow, &callback);
-  }
+  int64_t read(const std::shared_ptr<core::FlowFile> &flow, const io::InputStreamCallback& callback);
   // Read content into buffer
   detail::ReadBufferResult readBuffer(const std::shared_ptr<core::FlowFile>& flow);
   // Execute the given write callback against the content
-  void write(const std::shared_ptr<core::FlowFile> &flow, OutputStreamCallback *callback);
-
-  void write(const std::shared_ptr<core::FlowFile> &flow, OutputStreamCallback&& callback) {
-    return write(flow, &callback);
-  }
+  void write(const std::shared_ptr<core::FlowFile> &flow, const io::OutputStreamCallback& callback);
   // Read and write the flow file at the same time (eg. for processing it line by line)
-  int64_t readWrite(const std::shared_ptr<core::FlowFile> &flow, InputOutputStreamCallback *callback);
+  int64_t readWrite(const std::shared_ptr<core::FlowFile> &flow, const io::InputOutputStreamCallback& callback);
   // Replace content with buffer
   void writeBuffer(const std::shared_ptr<core::FlowFile>& flow_file, gsl::span<const char> buffer);
+  void writeBuffer(const std::shared_ptr<core::FlowFile>& flow_file, gsl::span<const std::byte> buffer);
   // Execute the given write/append callback against the content
-  void append(const std::shared_ptr<core::FlowFile> &flow, OutputStreamCallback *callback);
+  void append(const std::shared_ptr<core::FlowFile> &flow, const io::OutputStreamCallback& callback);
+  // Append buffer to content
+  void appendBuffer(const std::shared_ptr<core::FlowFile>& flow, gsl::span<const char> buffer);
+  void appendBuffer(const std::shared_ptr<core::FlowFile>& flow, gsl::span<const std::byte> buffer);
   // Penalize the flow
   void penalize(const std::shared_ptr<core::FlowFile> &flow);
 
diff --git a/libminifi/include/core/ProcessSessionReadCallback.h b/libminifi/include/core/ProcessSessionReadCallback.h
index 00a6f93af..19a57c8c5 100644
--- a/libminifi/include/core/ProcessSessionReadCallback.h
+++ b/libminifi/include/core/ProcessSessionReadCallback.h
@@ -32,12 +32,12 @@ namespace apache {
 namespace nifi {
 namespace minifi {
 namespace core {
-class ProcessSessionReadCallback : public InputStreamCallback {
+class ProcessSessionReadCallback {
  public:
   ProcessSessionReadCallback(const std::string &tmpFile, const std::string &destFile,
       std::shared_ptr<logging::Logger> logger);
   ~ProcessSessionReadCallback();
-  virtual int64_t process(const std::shared_ptr<io::BaseStream>& stream);
+  int64_t operator()(const std::shared_ptr<io::BaseStream>& stream);
   bool commit();
 
  private:
diff --git a/libminifi/src/serialization/PayloadSerializer.cpp b/libminifi/include/io/StreamCallback.h
similarity index 57%
copy from libminifi/src/serialization/PayloadSerializer.cpp
copy to libminifi/include/io/StreamCallback.h
index 8bfa1128a..0116ea746 100644
--- a/libminifi/src/serialization/PayloadSerializer.cpp
+++ b/libminifi/include/io/StreamCallback.h
@@ -1,5 +1,4 @@
 /**
- *
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -15,21 +14,19 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+#pragma once
+
+#include <functional>
+#include <memory>
 
-#include "serialization/PayloadSerializer.h"
-#include "core/ProcessSession.h"
+namespace org::apache::nifi::minifi::io {
 
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
+class BaseStream;
 
-int64_t PayloadSerializer::serialize(const std::shared_ptr<core::FlowFile>& flowFile, const std::shared_ptr<io::OutputStream>& out) {
-  InputStreamPipe pipe(out);
-  return reader_(flowFile, &pipe);
-}
+// FlowFile IO Callback functions for input and output
+// throw exception for error
+using InputStreamCallback = std::function<int64_t(const std::shared_ptr<BaseStream>& input_stream)>;
+using OutputStreamCallback = std::function<int64_t(const std::shared_ptr<BaseStream>& output_stream)>;
+using InputOutputStreamCallback = std::function<int64_t(const std::shared_ptr<BaseStream>& input_stream, const std::shared_ptr<BaseStream>& output_stream)>;
 
-} /* namespace minifi */
-} /* namespace nifi */
-} /* namespace apache */
-} /* namespace org */
+}  // namespace org::apache::nifi::minifi::io
diff --git a/libminifi/include/io/StreamPipe.h b/libminifi/include/io/StreamPipe.h
index ab7236759..1e3a23a37 100644
--- a/libminifi/include/io/StreamPipe.h
+++ b/libminifi/include/io/StreamPipe.h
@@ -20,69 +20,26 @@
 
 #pragma once
 
+#include <functional>
 #include <memory>
 #include <utility>
 #include "BaseStream.h"
+#include "StreamCallback.h"
 
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-
-// FlowFile IO Callback functions for input and output
-// throw exception for error
-class InputStreamCallback {
- public:
-  virtual ~InputStreamCallback() = default;
-
-  virtual int64_t process(const std::shared_ptr<io::BaseStream>& stream) = 0;
-};
-class OutputStreamCallback {
- public:
-  virtual ~OutputStreamCallback() = default;
-  virtual int64_t process(const std::shared_ptr<io::BaseStream>& stream) = 0;
-};
-class InputOutputStreamCallback {
- public:
-  virtual ~InputOutputStreamCallback() = default;
-  virtual int64_t process(const std::shared_ptr<io::BaseStream>& input, const std::shared_ptr<io::BaseStream>& output) = 0;
-};
-
-class FunctionOutputStreamCallback : public OutputStreamCallback {
- public:
-  explicit FunctionOutputStreamCallback(std::function<int64_t(const std::shared_ptr<io::OutputStream>&)> fn) : fn_(std::move(fn)) {}
-
-  int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
-    return fn_(stream);
-  }
- private:
-  std::function<int64_t(const std::shared_ptr<io::OutputStream>&)> fn_;
-};
-
-class FunctionInputStreamCallback : public InputStreamCallback {
- public:
-  explicit FunctionInputStreamCallback(std::function<int64_t(const std::shared_ptr<io::InputStream>&)> fn) : fn_(std::move(fn)) {}
-
-  int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
-    return fn_(stream);
-  }
- private:
-  std::function<int64_t(const std::shared_ptr<io::InputStream>&)> fn_;
-};
-
+namespace org::apache::nifi::minifi {
 namespace internal {
 
-inline int64_t pipe(io::InputStream* src, io::OutputStream* dst) {
+inline int64_t pipe(io::InputStream& src, io::OutputStream& dst) {
   std::array<std::byte, 4096> buffer{};
   int64_t totalTransferred = 0;
   while (true) {
-    const auto readRet = src->read(buffer);
+    const auto readRet = src.read(buffer);
     if (io::isError(readRet)) return -1;
     if (readRet == 0) break;
     auto remaining = readRet;
     int transferred = 0;
     while (remaining > 0) {
-      const auto writeRet = dst->write(gsl::make_span(buffer).subspan(transferred, remaining));
+      const auto writeRet = dst.write(gsl::make_span(buffer).subspan(transferred, remaining));
       // TODO(adebreceni):
       //   write might return 0, e.g. in case of a congested server
       //   what should we return then?
@@ -99,39 +56,30 @@ inline int64_t pipe(io::InputStream* src, io::OutputStream* dst) {
   return totalTransferred;
 }
 
-inline int64_t pipe(const std::shared_ptr<io::InputStream>& src, const std::shared_ptr<io::OutputStream>& dst) {
-  return pipe(src.get(), dst.get());
-}
-
 }  // namespace internal
 
-class InputStreamPipe : public InputStreamCallback {
+class InputStreamPipe {
  public:
-  explicit InputStreamPipe(std::shared_ptr<io::OutputStream> output) : output_(std::move(output)) {}
+  explicit InputStreamPipe(io::OutputStream& output) : output_(&output) {}
 
-  int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
-    return internal::pipe(stream, output_);
+  int64_t operator()(const std::shared_ptr<io::BaseStream>& stream) const {
+    return internal::pipe(*stream, *output_);
   }
 
  private:
-  std::shared_ptr<io::OutputStream> output_;
+  gsl::not_null<io::OutputStream*> output_;
 };
 
-class OutputStreamPipe : public OutputStreamCallback {
+class OutputStreamPipe {
  public:
-  explicit OutputStreamPipe(std::shared_ptr<io::InputStream> input) : input_(std::move(input)) {}
+  explicit OutputStreamPipe(io::InputStream& input) : input_(&input) {}
 
-  int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
-    return internal::pipe(input_, stream);
+  int64_t operator()(const std::shared_ptr<io::BaseStream>& stream) const {
+    return internal::pipe(*input_, *stream);
   }
 
  private:
-  std::shared_ptr<io::InputStream> input_;
+  gsl::not_null<io::InputStream*> input_;
 };
 
-
-
-}  // namespace minifi
-}  // namespace nifi
-}  // namespace apache
-}  // namespace org
+}  // namespace org::apache::nifi::minifi
diff --git a/libminifi/include/serialization/FlowFileSerializer.h b/libminifi/include/serialization/FlowFileSerializer.h
index 539306873..6948a0042 100644
--- a/libminifi/include/serialization/FlowFileSerializer.h
+++ b/libminifi/include/serialization/FlowFileSerializer.h
@@ -21,6 +21,7 @@
 #include <memory>
 #include <utility>
 #include <functional>
+#include "io/StreamCallback.h"
 
 namespace org {
 namespace apache {
@@ -38,11 +39,9 @@ class FlowFile;
 
 } /* namespace core */
 
-class InputStreamCallback;
-
 class FlowFileSerializer {
  public:
-  using FlowFileReader = std::function<int64_t(const std::shared_ptr<core::FlowFile>&, InputStreamCallback*)>;
+  using FlowFileReader = std::function<int64_t(const std::shared_ptr<core::FlowFile>&, const io::InputStreamCallback&)>;
 
   explicit FlowFileSerializer(FlowFileReader reader) : reader_(std::move(reader)) {}
 
diff --git a/libminifi/include/sitetosite/SiteToSiteClient.h b/libminifi/include/sitetosite/SiteToSiteClient.h
index 632f4c14c..be1b3b0ab 100644
--- a/libminifi/include/sitetosite/SiteToSiteClient.h
+++ b/libminifi/include/sitetosite/SiteToSiteClient.h
@@ -266,60 +266,6 @@ class SiteToSiteClient : public core::Connectable {
   std::shared_ptr<core::logging::Logger> logger_{core::logging::LoggerFactory<SiteToSiteClient>::getLogger()};
 };
 
-// Nest Callback Class for write stream
-class WriteCallback : public OutputStreamCallback {
- public:
-  WriteCallback(DataPacket *packet) // NOLINT
-      : _packet(packet) {
-  }
-  DataPacket *_packet;
-  // void process(std::ofstream *stream) {
-  int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
-    std::array<std::byte, 16384> buffer{};
-    uint64_t len = _packet->_size;
-    uint64_t total = 0;
-    while (len > 0) {
-      const auto size = std::min(len, uint64_t{16384});
-      const auto ret = _packet->transaction_->getStream().read(buffer);
-      if (ret != size) {
-        core::logging::LOG_ERROR(_packet->logger_reference_) << "Site2Site Receive Flow Size " << size << " Failed " << ret << ", should have received " << len;
-        return -1;
-      }
-      stream->write(buffer);
-      len -= size;
-      total += size;
-    }
-    core::logging::LOG_INFO(_packet->logger_reference_) << "Received " << total << " from stream";
-    return gsl::narrow<int64_t>(len);
-  }
-};
-// Nest Callback Class for read stream
-class ReadCallback : public InputStreamCallback {
- public:
-  ReadCallback(DataPacket *packet) // NOLINT
-      : _packet(packet) {
-  }
-  DataPacket *_packet;
-  int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
-    _packet->_size = 0;
-    std::array<std::byte, 8192> buffer{};
-    size_t size = 0;
-    do {
-      const auto readSize = stream->read(buffer);
-      if (readSize == 0) break;
-      if (io::isError(readSize)) return -1;
-      const auto ret = _packet->transaction_->getStream().write(gsl::make_span(buffer).subspan(0, readSize));
-      if (io::isError(ret) || gsl::narrow<size_t>(ret) != readSize) {
-        core::logging::LOG_INFO(_packet->logger_reference_) << "Site2Site Send Flow Size " << readSize << " Failed " << ret;
-        return -1;
-      }
-      size += readSize;
-    } while (size < stream->size());
-    _packet->_size = size;
-    return gsl::narrow<int64_t>(size);
-  }
-};
-
 }  // namespace sitetosite
 }  // namespace minifi
 }  // namespace nifi
diff --git a/libminifi/include/utils/ByteArrayCallback.h b/libminifi/include/utils/ByteArrayCallback.h
index d07e842da..bbf6dbb8c 100644
--- a/libminifi/include/utils/ByteArrayCallback.h
+++ b/libminifi/include/utils/ByteArrayCallback.h
@@ -31,12 +31,11 @@ namespace org::apache::nifi::minifi::utils {
 /**
  * General vector based uint8_t callback.
  */
-class ByteInputCallBack : public InputStreamCallback {
+class ByteInputCallback {
  public:
-  ByteInputCallBack() = default;
-  ~ByteInputCallBack() override = default;
+  virtual ~ByteInputCallback() = default;
 
-  int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
+  virtual int64_t operator()(const std::shared_ptr<io::BaseStream>& stream) {
     stream->seek(0);
 
     if (stream->size() > 0) {
@@ -76,13 +75,13 @@ class ByteInputCallBack : public InputStreamCallback {
  * While calls are thread safe, the class is intended to have
  * a single consumer.
  */
-class ByteOutputCallback : public OutputStreamCallback {
+class ByteOutputCallback {
  public:
   ByteOutputCallback() = delete;
 
   explicit ByteOutputCallback(size_t max_size, bool wait_on_read = false)
       : max_size_(max_size),
-        read_started_(wait_on_read ? false : true),
+        read_started_(!wait_on_read),
         logger_(core::logging::LoggerFactory<ByteOutputCallback>::getLogger()) {
     current_str_pos = 0;
     size_ = 0;
@@ -95,7 +94,7 @@ class ByteOutputCallback : public OutputStreamCallback {
     close();
   }
 
-  virtual int64_t process(const std::shared_ptr<io::BaseStream>& stream);
+  virtual int64_t operator()(const std::shared_ptr<io::BaseStream>& stream);
 
   virtual std::vector<char> to_string();
 
@@ -141,9 +140,9 @@ class StreamOutputCallback : public ByteOutputCallback {
       : ByteOutputCallback(max_size, wait_on_read) {
   }
 
-  virtual void write(char *data, size_t size);
+  void write(char *data, size_t size) override;
 
-  virtual int64_t process(const std::shared_ptr<io::BaseStream>& stream);
+  int64_t operator()(const std::shared_ptr<io::BaseStream>& stream) override;
 };
 
 }  // namespace org::apache::nifi::minifi::utils
diff --git a/libminifi/include/utils/FileReaderCallback.h b/libminifi/include/utils/FileReaderCallback.h
index a36822221..e1030a547 100644
--- a/libminifi/include/utils/FileReaderCallback.h
+++ b/libminifi/include/utils/FileReaderCallback.h
@@ -33,13 +33,13 @@ namespace utils {
 /**
  * Simple callback to read a file, to be used with ProcessSession::write().
  */
-class FileReaderCallback : public OutputStreamCallback {
+class FileReaderCallback {
  public:
-  explicit FileReaderCallback(const std::string& file_name);
-  int64_t process(const std::shared_ptr<io::BaseStream>& output_stream) override;
+  explicit FileReaderCallback(std::string file_name);
+  int64_t operator()(const std::shared_ptr<io::BaseStream>& output_stream) const;
 
  private:
-  std::ifstream input_stream_;
+  std::string file_name_;
   std::shared_ptr<core::logging::Logger> logger_;
 };
 
diff --git a/libminifi/include/utils/HTTPClient.h b/libminifi/include/utils/HTTPClient.h
index 6d07ef521..91dc71394 100644
--- a/libminifi/include/utils/HTTPClient.h
+++ b/libminifi/include/utils/HTTPClient.h
@@ -59,7 +59,7 @@ struct HTTPUploadCallback {
   }
   std::mutex mutex;
   std::atomic<bool> stop;
-  ByteInputCallBack *ptr;
+  ByteInputCallback *ptr;
   size_t pos;
 
   size_t getPos() {
diff --git a/libminifi/include/utils/JsonCallback.h b/libminifi/include/utils/JsonCallback.h
index 3fb278d75..03887704d 100644
--- a/libminifi/include/utils/JsonCallback.h
+++ b/libminifi/include/utils/JsonCallback.h
@@ -33,18 +33,19 @@ namespace nifi {
 namespace minifi {
 namespace utils {
 
-class JsonOutputCallback : public OutputStreamCallback {
+class JsonOutputCallback {
  public:
   explicit JsonOutputCallback(rapidjson::Document&& root, std::optional<uint8_t> decimal_places)
       : root_(std::move(root)), decimal_places_(decimal_places) {}
 
-  int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
+  int64_t operator()(const std::shared_ptr<io::BaseStream>& stream) const {
     rapidjson::StringBuffer buffer;
     rapidjson::Writer<rapidjson::StringBuffer> writer(buffer);
     if (decimal_places_.has_value())
       writer.SetMaxDecimalPlaces(decimal_places_.value());
     root_.Accept(writer);
-    return stream->write(reinterpret_cast<const uint8_t*>(buffer.GetString()), gsl::narrow<int>(buffer.GetSize()));
+    const auto write_return = stream->write(reinterpret_cast<const uint8_t*>(buffer.GetString()), buffer.GetSize());
+    return !io::isError(write_return) ? gsl::narrow<int64_t>(write_return) : -1;
   }
 
  protected:
@@ -52,18 +53,19 @@ class JsonOutputCallback : public OutputStreamCallback {
   std::optional<uint8_t> decimal_places_;
 };
 
-class PrettyJsonOutputCallback : public OutputStreamCallback {
+class PrettyJsonOutputCallback {
  public:
   explicit PrettyJsonOutputCallback(rapidjson::Document&& root, std::optional<uint8_t> decimal_places)
       : root_(std::move(root)), decimal_places_(decimal_places) {}
 
-  int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
+  int64_t operator()(const std::shared_ptr<io::BaseStream>& stream) const {
     rapidjson::StringBuffer buffer;
     rapidjson::PrettyWriter<rapidjson::StringBuffer> writer(buffer);
     if (decimal_places_.has_value())
       writer.SetMaxDecimalPlaces(decimal_places_.value());
     root_.Accept(writer);
-    return stream->write(reinterpret_cast<const uint8_t*>(buffer.GetString()), gsl::narrow<int>(buffer.GetSize()));
+    const auto write_return = stream->write(reinterpret_cast<const uint8_t*>(buffer.GetString()), buffer.GetSize());
+    return !io::isError(write_return) ? gsl::narrow<int64_t>(write_return) : -1;
   }
 
  protected:
diff --git a/libminifi/include/utils/LineByLineInputOutputStreamCallback.h b/libminifi/include/utils/LineByLineInputOutputStreamCallback.h
index cf0db4b8b..6538020e2 100644
--- a/libminifi/include/utils/LineByLineInputOutputStreamCallback.h
+++ b/libminifi/include/utils/LineByLineInputOutputStreamCallback.h
@@ -28,11 +28,11 @@
 
 namespace org::apache::nifi::minifi::utils {
 
-class LineByLineInputOutputStreamCallback : public InputOutputStreamCallback {
+class LineByLineInputOutputStreamCallback {
  public:
   using CallbackType = std::function<std::string(const std::string& input_line, bool is_first_line, bool is_last_line)>;
   explicit LineByLineInputOutputStreamCallback(CallbackType callback);
-  int64_t process(const std::shared_ptr<io::BaseStream>& input, const std::shared_ptr<io::BaseStream>& output) override;
+  int64_t operator()(const std::shared_ptr<io::BaseStream>& input, const std::shared_ptr<io::BaseStream>& output);
 
  private:
   int64_t readInput(io::InputStream& stream);
diff --git a/libminifi/include/utils/StringUtils.h b/libminifi/include/utils/StringUtils.h
index d43486bc2..cf8d8c691 100644
--- a/libminifi/include/utils/StringUtils.h
+++ b/libminifi/include/utils/StringUtils.h
@@ -401,6 +401,7 @@ class StringUtils {
     return to_hex(gsl::make_span(str).as_span<const std::byte>(), uppercase);
   }
 
+
   /**
    * Decodes the Base64 encoded string into data
    * @param data the output buffer where the decoded bytes will be written. Must be at least (base64_length / 4 + 1) * 3 bytes long.
diff --git a/libminifi/src/c2/C2Agent.cpp b/libminifi/src/c2/C2Agent.cpp
index dbc349ea2..d3c635a1d 100644
--- a/libminifi/src/c2/C2Agent.cpp
+++ b/libminifi/src/c2/C2Agent.cpp
@@ -669,7 +669,7 @@ C2Payload C2Agent::bundleDebugInfo(std::map<std::string, std::unique_ptr<io::Inp
     if (!archiver->newEntry({filename, file_size})) {
       throw C2DebugBundleError("Couldn't initialize archive entry for '" + filename + "'");
     }
-    if (gsl::narrow<int64_t>(file_size) != internal::pipe(stream.get(), archiver.get())) {
+    if (gsl::narrow<int64_t>(file_size) != internal::pipe(*stream, *archiver)) {
       // we have touched the input streams, they cannot be reused
       throw C2DebugBundleError("Error while writing file '" + filename + "' into the debug bundle");
     }
diff --git a/libminifi/src/core/ProcessSession.cpp b/libminifi/src/core/ProcessSession.cpp
index 1c3f0c87d..eabebf5d5 100644
--- a/libminifi/src/core/ProcessSession.cpp
+++ b/libminifi/src/core/ProcessSession.cpp
@@ -232,7 +232,7 @@ void ProcessSession::transfer(const std::shared_ptr<core::FlowFile> &flow, Relat
   flow->setDeleted(false);
 }
 
-void ProcessSession::write(const std::shared_ptr<core::FlowFile> &flow, OutputStreamCallback *callback) {
+void ProcessSession::write(const std::shared_ptr<core::FlowFile> &flow, const io::OutputStreamCallback& callback) {
   auto flow_file_equality_checker = [&flow](const auto& flow_file) { return flow == flow_file; };
   gsl_ExpectsAudit(_updatedFlowFiles.contains(flow->getUUID())
       || _addedFlowFiles.contains(flow->getUUID())
@@ -247,7 +247,7 @@ void ProcessSession::write(const std::shared_ptr<core::FlowFile> &flow, OutputSt
     if (nullptr == stream) {
       throw Exception(FILE_OPERATION_EXCEPTION, "Failed to open flowfile content for write");
     }
-    if (callback->process(stream) < 0) {
+    if (callback(stream) < 0) {
       throw Exception(FILE_OPERATION_EXCEPTION, "Failed to process flowfile content");
     }
 
@@ -269,18 +269,16 @@ void ProcessSession::write(const std::shared_ptr<core::FlowFile> &flow, OutputSt
 }
 
 void ProcessSession::writeBuffer(const std::shared_ptr<core::FlowFile>& flow_file, gsl::span<const char> buffer) {
-  struct BufferOutputStreamCallback : OutputStreamCallback {
-    explicit BufferOutputStreamCallback(gsl::span<const char> buffer) :buffer{buffer} {}
-    int64_t process(const std::shared_ptr<io::BaseStream>& stream) final {
-      return stream->write(reinterpret_cast<const uint8_t*>(buffer.data()), buffer.size());
-    }
-    gsl::span<const char> buffer;
-  };
-  BufferOutputStreamCallback cb{ buffer };
-  write(flow_file, &cb);
+  writeBuffer(flow_file, buffer.as_span<const std::byte>());
+}
+void ProcessSession::writeBuffer(const std::shared_ptr<core::FlowFile>& flow_file, gsl::span<const std::byte> buffer) {
+  write(flow_file, [buffer](const std::shared_ptr<io::BaseStream>& output_stream) {
+    const auto write_status = output_stream->write(buffer);
+    return io::isError(write_status) ? -1 : gsl::narrow<int64_t>(write_status);
+  });
 }
 
-void ProcessSession::append(const std::shared_ptr<core::FlowFile> &flow, OutputStreamCallback *callback) {
+void ProcessSession::append(const std::shared_ptr<core::FlowFile> &flow, const io::OutputStreamCallback& callback) {
   auto flow_file_equality_checker = [&flow](const auto& flow_file) { return flow == flow_file; };
   gsl_ExpectsAudit(_updatedFlowFiles.contains(flow->getUUID())
       || _addedFlowFiles.contains(flow->getUUID())
@@ -305,7 +303,7 @@ void ProcessSession::append(const std::shared_ptr<core::FlowFile> &flow, OutputS
     // this prevents an issue if we write, above, with zero length.
     if (stream_size_before_callback > 0)
       stream->seek(stream_size_before_callback + 1);
-    if (callback->process(stream) < 0) {
+    if (callback(stream) < 0) {
       throw Exception(FILE_OPERATION_EXCEPTION, "Failed to process flowfile content");
     }
     flow->setSize(flow_file_size + (stream->size() - stream_size_before_callback));
@@ -322,8 +320,17 @@ void ProcessSession::append(const std::shared_ptr<core::FlowFile> &flow, OutputS
     throw;
   }
 }
+void ProcessSession::appendBuffer(const std::shared_ptr<core::FlowFile>& flow_file, gsl::span<const char> buffer) {
+  appendBuffer(flow_file, buffer.as_span<const std::byte>());
+}
+void ProcessSession::appendBuffer(const std::shared_ptr<core::FlowFile>& flow_file, gsl::span<const std::byte> buffer) {
+  append(flow_file, [buffer](const std::shared_ptr<io::BaseStream>& output_stream) {
+    const auto write_status = output_stream->write(buffer);
+    return io::isError(write_status) ? -1 : gsl::narrow<int64_t>(write_status);
+  });
+}
 
-int64_t ProcessSession::read(const std::shared_ptr<core::FlowFile> &flow, InputStreamCallback *callback) {
+int64_t ProcessSession::read(const std::shared_ptr<core::FlowFile> &flow, const io::InputStreamCallback& callback) {
   try {
     std::shared_ptr<ResourceClaim> claim = nullptr;
 
@@ -345,7 +352,7 @@ int64_t ProcessSession::read(const std::shared_ptr<core::FlowFile> &flow, InputS
     }
 
     auto flow_file_stream = std::make_shared<io::StreamSlice>(stream, flow->getOffset(), flow->getSize());
-    auto ret = callback->process(flow_file_stream);
+    auto ret = callback(flow_file_stream);
     if (ret < 0) {
       throw Exception(FILE_OPERATION_EXCEPTION, "Failed to process flowfile content");
     }
@@ -359,7 +366,8 @@ int64_t ProcessSession::read(const std::shared_ptr<core::FlowFile> &flow, InputS
   }
 }
 
-int64_t ProcessSession::readWrite(const std::shared_ptr<core::FlowFile> &flow, InputOutputStreamCallback *callback) {
+
+int64_t ProcessSession::readWrite(const std::shared_ptr<core::FlowFile> &flow, const io::InputOutputStreamCallback& callback) {
   gsl_Expects(callback);
 
   try {
@@ -384,7 +392,7 @@ int64_t ProcessSession::readWrite(const std::shared_ptr<core::FlowFile> &flow, I
       throw Exception(FILE_OPERATION_EXCEPTION, "Failed to open flowfile content for write");
     }
 
-    int64_t bytes_written = callback->process(input_stream, output_stream);
+    int64_t bytes_written = callback(input_stream, output_stream);
     if (bytes_written < 0) {
       throw Exception(FILE_OPERATION_EXCEPTION, "Failed to process flowfile content");
     }
@@ -408,25 +416,15 @@ int64_t ProcessSession::readWrite(const std::shared_ptr<core::FlowFile> &flow, I
 
 detail::ReadBufferResult ProcessSession::readBuffer(const std::shared_ptr<core::FlowFile>& flow) {
   detail::ReadBufferResult result;
-  struct Callback : InputStreamCallback {
-    detail::ReadBufferResult& result;
-    ProcessSession& session;
-    Callback(detail::ReadBufferResult& result, ProcessSession& session)
-      :result{result}, session{session}
-    {}
-    int64_t process(const std::shared_ptr<io::BaseStream>& inputStream) final {
-      gsl_Expects(inputStream);
-      result.buffer.resize(inputStream->size());
-      const auto read_status = inputStream->read(result.buffer);
-      if (read_status != result.buffer.size()) {
-        session.logger_->log_error("readBuffer: %zu bytes were requested from the stream but %zu bytes were read. Rolling back.", result.buffer.size(), read_status);
-        throw Exception(PROCESSOR_EXCEPTION, "Failed to read the entire FlowFile.");
-      }
-      return gsl::narrow<int64_t>(read_status);
+  result.status = read(flow, [&result, this](const std::shared_ptr<io::BaseStream>& input_stream) {
+    result.buffer.resize(input_stream->size());
+    const auto read_status = input_stream->read(result.buffer);
+    if (read_status != result.buffer.size()) {
+      logger_->log_error("readBuffer: %zu bytes were requested from the stream but %zu bytes were read. Rolling back.", result.buffer.size(), read_status);
+      throw Exception(PROCESSOR_EXCEPTION, "Failed to read the entire FlowFile.");
     }
-  };
-  Callback cb{result, *this};
-  result.status = read(flow, &cb);
+    return gsl::narrow<int64_t>(read_status);
+  });
   return result;
 }
 
@@ -686,7 +684,7 @@ bool ProcessSession::exportContent(const std::string &destination, const std::st
   logger_->log_debug("Exporting content of %s to %s", flow->getUUIDStr(), destination);
 
   ProcessSessionReadCallback cb(tmpFile, destination, logger_);
-  read(flow, &cb);
+  read(flow, std::ref(cb));
 
   logger_->log_info("Committing %s", destination);
   bool commit_ok = cb.commit();
@@ -1022,8 +1020,8 @@ void ProcessSession::ensureNonNullResourceClaim(
         logger_->log_debug("Processor %s (%s) did not create a ResourceClaim, creating an empty one",
                            process_context_->getProcessorNode()->getUUIDStr(),
                            process_context_->getProcessorNode()->getName());
-        OutputStreamPipe emptyStreamCallback(std::make_shared<io::BufferStream>());
-        write(flowFile, &emptyStreamCallback);
+        io::BufferStream emptyBufferStream;
+        write(flowFile, OutputStreamPipe{emptyBufferStream});
       }
     }
   }
diff --git a/libminifi/src/core/ProcessSessionReadCallback.cpp b/libminifi/src/core/ProcessSessionReadCallback.cpp
index 7d925b595..810e46c89 100644
--- a/libminifi/src/core/ProcessSessionReadCallback.cpp
+++ b/libminifi/src/core/ProcessSessionReadCallback.cpp
@@ -43,7 +43,7 @@ ProcessSessionReadCallback::ProcessSessionReadCallback(const std::string &tmpFil
 }
 
 // Copy the entire file contents to the temporary file
-int64_t ProcessSessionReadCallback::process(const std::shared_ptr<io::BaseStream>& stream) {
+int64_t ProcessSessionReadCallback::operator()(const std::shared_ptr<io::BaseStream>& stream) {
   // Copy file contents into tmp file
   _writeSucceeded = false;
   size_t size = 0;
diff --git a/libminifi/src/serialization/FlowFileV3Serializer.cpp b/libminifi/src/serialization/FlowFileV3Serializer.cpp
index 588c36ffc..5bd69c40e 100644
--- a/libminifi/src/serialization/FlowFileV3Serializer.cpp
+++ b/libminifi/src/serialization/FlowFileV3Serializer.cpp
@@ -92,9 +92,8 @@ int64_t FlowFileV3Serializer::serialize(const std::shared_ptr<core::FlowFile>& f
     if (io::isError(ret)) return -1;
     sum += ret;
   }
-  InputStreamPipe pipe(out);
   {
-    const auto ret = reader_(flowFile, &pipe);
+    const auto ret = reader_(flowFile, InputStreamPipe{*out});
     if (ret < 0) return -1;
     sum += gsl::narrow<size_t>(ret);
   }
diff --git a/libminifi/src/serialization/PayloadSerializer.cpp b/libminifi/src/serialization/PayloadSerializer.cpp
index 8bfa1128a..4da0d0bf3 100644
--- a/libminifi/src/serialization/PayloadSerializer.cpp
+++ b/libminifi/src/serialization/PayloadSerializer.cpp
@@ -25,8 +25,7 @@ namespace nifi {
 namespace minifi {
 
 int64_t PayloadSerializer::serialize(const std::shared_ptr<core::FlowFile>& flowFile, const std::shared_ptr<io::OutputStream>& out) {
-  InputStreamPipe pipe(out);
-  return reader_(flowFile, &pipe);
+  return reader_(flowFile, InputStreamPipe{*out});
 }
 
 } /* namespace minifi */
diff --git a/libminifi/src/sitetosite/SiteToSiteClient.cpp b/libminifi/src/sitetosite/SiteToSiteClient.cpp
index 968ea397c..fbe118ce4 100644
--- a/libminifi/src/sitetosite/SiteToSiteClient.cpp
+++ b/libminifi/src/sitetosite/SiteToSiteClient.cpp
@@ -471,8 +471,12 @@ int16_t SiteToSiteClient::send(const utils::Identifier &transactionID, DataPacke
       return -1;
     }
     if (flowFile->getSize() > 0) {
-      sitetosite::ReadCallback callback(packet);
-      session->read(flowFile, &callback);
+      session->read(flowFile, [packet](const std::shared_ptr<io::BaseStream>& input_stream) -> int64_t {
+        const auto result = internal::pipe(*input_stream, packet->transaction_->getStream());
+        if (result == -1) return -1;
+        packet->_size = gsl::narrow<size_t>(result);
+        return result;
+      });
       if (flowFile->getSize() != packet->_size) {
         logger_->log_debug("Mismatched sizes %llu %llu", flowFile->getSize(), packet->_size);
         return -2;
@@ -694,8 +698,9 @@ bool SiteToSiteClient::receiveFlowFiles(const std::shared_ptr<core::ProcessConte
       }
 
       if (packet._size > 0) {
-        sitetosite::WriteCallback callback(&packet);
-        session->write(flowFile, &callback);
+        session->write(flowFile, [&packet](const std::shared_ptr<io::BaseStream>& output_stream) -> int64_t {
+          return internal::pipe(packet.transaction_->getStream(), *output_stream);
+        });
         if (flowFile->getSize() != packet._size) {
           std::stringstream message;
           message << "Receive size not correct, expected to send " << flowFile->getSize() << " bytes, but actually sent " << packet._size;
diff --git a/libminifi/src/utils/ByteArrayCallback.cpp b/libminifi/src/utils/ByteArrayCallback.cpp
index 429cba73b..8fe16a946 100644
--- a/libminifi/src/utils/ByteArrayCallback.cpp
+++ b/libminifi/src/utils/ByteArrayCallback.cpp
@@ -30,7 +30,7 @@ namespace nifi {
 namespace minifi {
 namespace utils {
 
-int64_t ByteOutputCallback::process(const std::shared_ptr<io::BaseStream>& stream) {
+int64_t ByteOutputCallback::operator()(const std::shared_ptr<io::BaseStream>& stream) {
   stream->seek(0);
   if (stream->size() > 0) {
     std::vector<std::byte> buffer;
@@ -42,7 +42,7 @@ int64_t ByteOutputCallback::process(const std::shared_ptr<io::BaseStream>& strea
   return gsl::narrow<int64_t>(size_.load());
 }
 
-int64_t StreamOutputCallback::process(const std::shared_ptr<io::BaseStream>& stream) {
+int64_t StreamOutputCallback::operator()(const std::shared_ptr<io::BaseStream>& stream) {
   stream->seek(0);
   std::unique_ptr<char> buffer = std::unique_ptr<char>(new char[size_.load()]);
   auto written = readFully(buffer.get(), size_);
diff --git a/libminifi/src/utils/FileReaderCallback.cpp b/libminifi/src/utils/FileReaderCallback.cpp
index ee5958655..825ae3647 100644
--- a/libminifi/src/utils/FileReaderCallback.cpp
+++ b/libminifi/src/utils/FileReaderCallback.cpp
@@ -35,31 +35,32 @@ namespace nifi {
 namespace minifi {
 namespace utils {
 
-FileReaderCallback::FileReaderCallback(const std::string& file_name)
-    : logger_(core::logging::LoggerFactory<FileReaderCallback>::getLogger()) {
-  logger_->log_debug("Opening %s", file_name);
-  input_stream_.open(file_name.c_str(), std::fstream::in | std::fstream::binary);
-  if (!input_stream_.is_open()) {
-    throw FileReaderCallbackIOError(StringUtils::join_pack("Error opening file: ", std::strerror(errno)), errno);
-  }
+FileReaderCallback::FileReaderCallback(std::string file_name)
+    : file_name_{std::move(file_name)},
+    logger_(core::logging::LoggerFactory<FileReaderCallback>::getLogger()) {
 }
 
-int64_t FileReaderCallback::process(const std::shared_ptr<io::BaseStream>& output_stream) {
+int64_t FileReaderCallback::operator()(const std::shared_ptr<io::BaseStream>& output_stream) const {
   std::array<char, BUFFER_SIZE> buffer;
   uint64_t num_bytes_written = 0;
 
-  while (input_stream_.good()) {
-    input_stream_.read(buffer.data(), buffer.size());
-    if (input_stream_.bad()) {
+  std::ifstream input_stream{file_name_, std::ifstream::in | std::ifstream::binary};
+  if (!input_stream.is_open()) {
+    throw FileReaderCallbackIOError(StringUtils::join_pack("Error opening file: ", std::strerror(errno)), errno);
+  }
+  logger_->log_debug("Opening %s", file_name_);
+  while (input_stream.good()) {
+    input_stream.read(buffer.data(), buffer.size());
+    if (input_stream.bad()) {
       throw FileReaderCallbackIOError(StringUtils::join_pack("Error reading file: ", std::strerror(errno)), errno);
     }
-    const auto num_bytes_read = input_stream_.gcount();
+    const auto num_bytes_read = input_stream.gcount();
     logger_->log_trace("Read %jd bytes of input", std::intmax_t{num_bytes_read});
-    const int len = gsl::narrow<int>(num_bytes_read);
+    const auto len = gsl::narrow<size_t>(num_bytes_read);
     output_stream->write(reinterpret_cast<uint8_t*>(buffer.data()), len);
     num_bytes_written += len;
   }
-  input_stream_.close();
+  input_stream.close();
 
   logger_->log_debug("Finished reading %" PRIu64 " bytes from the file", num_bytes_written);
   return num_bytes_written;
diff --git a/libminifi/src/utils/LineByLineInputOutputStreamCallback.cpp b/libminifi/src/utils/LineByLineInputOutputStreamCallback.cpp
index 4df55fde4..f06ba7786 100644
--- a/libminifi/src/utils/LineByLineInputOutputStreamCallback.cpp
+++ b/libminifi/src/utils/LineByLineInputOutputStreamCallback.cpp
@@ -25,7 +25,7 @@ LineByLineInputOutputStreamCallback::LineByLineInputOutputStreamCallback(Callbac
   : callback_(std::move(callback)) {
 }
 
-int64_t LineByLineInputOutputStreamCallback::process(const std::shared_ptr<io::BaseStream>& input, const std::shared_ptr<io::BaseStream>& output) {
+int64_t LineByLineInputOutputStreamCallback::operator()(const std::shared_ptr<io::BaseStream>& input, const std::shared_ptr<io::BaseStream>& output) {
   gsl_Expects(input);
   gsl_Expects(output);
 
diff --git a/libminifi/test/ReadFromFlowFileTestProcessor.cpp b/libminifi/test/ReadFromFlowFileTestProcessor.cpp
index ba8564eff..72434b258 100644
--- a/libminifi/test/ReadFromFlowFileTestProcessor.cpp
+++ b/libminifi/test/ReadFromFlowFileTestProcessor.cpp
@@ -16,10 +16,7 @@
  */
 #include "ReadFromFlowFileTestProcessor.h"
 
-#include <string>
 #include <utility>
-#include <vector>
-
 #include "utils/gsl.h"
 
 namespace org::apache::nifi::minifi::processors {
@@ -34,18 +31,6 @@ void ReadFromFlowFileTestProcessor::onSchedule(core::ProcessContext*, core::Proc
   logger_->log_info("%s", ON_SCHEDULE_LOG_STR);
 }
 
-namespace {
-struct ReadFlowFileIntoBuffer : public InputStreamCallback {
-  std::vector<std::byte> buffer_;
-
-  int64_t process(const std::shared_ptr<io::BaseStream> &stream) override {
-    buffer_.resize(stream->size());
-    size_t bytes_read = stream->read(buffer_);
-    return io::isError(bytes_read) ? -1 : gsl::narrow<int64_t>(bytes_read);
-  }
-};
-}  // namespace
-
 void ReadFromFlowFileTestProcessor::onTrigger(core::ProcessContext* context, core::ProcessSession* session) {
   gsl_Expects(context && session);
   logger_->log_info("%s", ON_TRIGGER_LOG_STR);
@@ -53,8 +38,6 @@ void ReadFromFlowFileTestProcessor::onTrigger(core::ProcessContext* context, cor
     clear();
 
   while (std::shared_ptr<core::FlowFile> flow_file = session->get()) {
-    ReadFlowFileIntoBuffer callback;
-    session->read(flow_file, &callback);
     session->transfer(flow_file, Success);
     flow_files_read_.emplace_back(session, gsl::not_null(std::move(flow_file)));
   }
@@ -65,9 +48,7 @@ void ReadFromFlowFileTestProcessor::onUnSchedule() {
 }
 
 ReadFromFlowFileTestProcessor::FlowFileData::FlowFileData(core::ProcessSession* session, const gsl::not_null<std::shared_ptr<core::FlowFile>>& flow_file) {
-  ReadFlowFileIntoBuffer callback;
-  session->read(flow_file, &callback);
-  content_ = utils::span_to<std::string>(gsl::make_span(callback.buffer_).as_span<char>());
+  content_ = to_string(session->readBuffer(flow_file));
   attributes_ = flow_file->getAttributes();
 }
 
diff --git a/libminifi/test/TestBase.cpp b/libminifi/test/TestBase.cpp
index efa20c362..6e94d80e9 100644
--- a/libminifi/test/TestBase.cpp
+++ b/libminifi/test/TestBase.cpp
@@ -603,7 +603,7 @@ std::string TestPlan::getContent(const minifi::core::FlowFile& file) const {
   auto content_claim = file.getResourceClaim();
   auto content_stream = content_repo_->read(*content_claim);
   auto output_stream = std::make_shared<minifi::io::BufferStream>();
-  minifi::InputStreamPipe{output_stream}.process(content_stream);
+  minifi::InputStreamPipe{*output_stream}(content_stream);
   return utils::span_to<std::string>(output_stream->getBuffer().as_span<const char>());
 }
 
diff --git a/libminifi/test/WriteToFlowFileTestProcessor.cpp b/libminifi/test/WriteToFlowFileTestProcessor.cpp
index 90a509029..f8fa19026 100644
--- a/libminifi/test/WriteToFlowFileTestProcessor.cpp
+++ b/libminifi/test/WriteToFlowFileTestProcessor.cpp
@@ -17,9 +17,6 @@
 
 #include "WriteToFlowFileTestProcessor.h"
 
-#include <string>
-#include <vector>
-
 #include "utils/gsl.h"
 
 namespace org::apache::nifi::minifi::processors {
@@ -35,19 +32,6 @@ void WriteToFlowFileTestProcessor::onSchedule(core::ProcessContext*, core::Proce
   logger_->log_info("%s", ON_SCHEDULE_LOG_STR);
 }
 
-namespace {
-struct WriteToFlowFileCallback : public OutputStreamCallback {
-  const gsl::span<const uint8_t> content_;
-
-  explicit WriteToFlowFileCallback(const std::string& content) : content_(reinterpret_cast<const uint8_t*>(content.data()), content.size()) {}
-
-  int64_t process(const std::shared_ptr<io::BaseStream> &stream) override {
-    size_t bytes_written = stream->write(content_.begin(), content_.size());
-    return io::isError(bytes_written) ? -1 : gsl::narrow<int64_t>(bytes_written);
-  }
-};
-}  // namespace
-
 void WriteToFlowFileTestProcessor::onTrigger(core::ProcessContext* context, core::ProcessSession* session) {
   gsl_Expects(context && session);
   logger_->log_info("%s", ON_TRIGGER_LOG_STR);
@@ -60,8 +44,7 @@ void WriteToFlowFileTestProcessor::onTrigger(core::ProcessContext* context, core
     logger_->log_error("Failed to create flowfile!");
     return;
   }
-  WriteToFlowFileCallback callback(content_);
-  session->write(flow_file, &callback);
+  session->writeBuffer(flow_file, content_);
   session->transfer(flow_file, Success);
 }
 
diff --git a/libminifi/test/archive-tests/CompressContentTests.cpp b/libminifi/test/archive-tests/CompressContentTests.cpp
index 6804e3a45..2792b416c 100644
--- a/libminifi/test/archive-tests/CompressContentTests.cpp
+++ b/libminifi/test/archive-tests/CompressContentTests.cpp
@@ -44,7 +44,7 @@
 #include "../Utils.h"
 #include "utils/gsl.h"
 
-class ReadCallback : public minifi::InputStreamCallback {
+class ReadCallback {
  public:
   explicit ReadCallback(size_t size)
       :buffer_{size}
@@ -54,7 +54,7 @@ class ReadCallback : public minifi::InputStreamCallback {
   ReadCallback& operator=(const ReadCallback&) = delete;
   ReadCallback& operator=(ReadCallback&&) = delete;
 
-  int64_t process(const std::shared_ptr<minifi::io::BaseStream>& stream) override {
+  int64_t operator()(const std::shared_ptr<minifi::io::BaseStream>& stream) {
     int64_t total_read = 0;
     do {
       const auto ret = stream->read(gsl::make_span(buffer_).subspan(read_size_));
@@ -175,7 +175,7 @@ class CompressDecompressionTestController : public TestController{
   }
 
   void read(const std::shared_ptr<core::FlowFile>& file, ReadCallback& reader) {
-    helper_session->read(file, &reader);
+    helper_session->read(file, std::ref(reader));
   }
 
  public:
diff --git a/libminifi/test/archive-tests/MergeFileTests.cpp b/libminifi/test/archive-tests/MergeFileTests.cpp
index 932ec04c5..0ed9fa877 100644
--- a/libminifi/test/archive-tests/MergeFileTests.cpp
+++ b/libminifi/test/archive-tests/MergeFileTests.cpp
@@ -65,7 +65,7 @@ void init_file_paths() {
   static Initializer initializer;
 }
 
-class FixedBuffer : public minifi::InputStreamCallback {
+class FixedBuffer {
  public:
   explicit FixedBuffer(std::size_t capacity) : capacity_(capacity) {
     buf_.reset(new uint8_t[capacity_]);
@@ -97,7 +97,7 @@ class FixedBuffer : public minifi::InputStreamCallback {
     } while (size_ != capacity_);
     return total_read;
   }
-  int64_t process(const std::shared_ptr<minifi::io::BaseStream>& stream) override {
+  int64_t operator()(const std::shared_ptr<minifi::io::BaseStream>& stream) {
     return write(*stream, capacity_);
   }
 
@@ -246,13 +246,13 @@ TEST_CASE_METHOD(MergeTestController, "MergeFileDefragment", "[mergefiletest1]")
   REQUIRE(flow1->getSize() == 96);
   {
     FixedBuffer callback(gsl::narrow<size_t>(flow1->getSize()));
-    sessionGenFlowFile.read(flow1, &callback);
+    sessionGenFlowFile.read(flow1, std::ref(callback));
     REQUIRE(callback.to_string() == expected[0]);
   }
   REQUIRE(flow2->getSize() == 96);
   {
     FixedBuffer callback(gsl::narrow<size_t>(flow2->getSize()));
-    sessionGenFlowFile.read(flow2, &callback);
+    sessionGenFlowFile.read(flow2, std::ref(callback));
     REQUIRE(callback.to_string() == expected[1]);
   }
 }
@@ -308,13 +308,13 @@ TEST_CASE_METHOD(MergeTestController, "MergeFileDefragmentDelimiter", "[mergefil
   REQUIRE(flow1->getSize() == 128);
   {
     FixedBuffer callback(gsl::narrow<size_t>(flow1->getSize()));
-    sessionGenFlowFile.read(flow1, &callback);
+    sessionGenFlowFile.read(flow1, std::ref(callback));
     REQUIRE(callback.to_string() == expected[0]);
   }
   REQUIRE(flow2->getSize() == 128);
   {
     FixedBuffer callback(gsl::narrow<size_t>(flow2->getSize()));
-    sessionGenFlowFile.read(flow2, &callback);
+    sessionGenFlowFile.read(flow2, std::ref(callback));
     REQUIRE(callback.to_string() == expected[1]);
   }
 }
@@ -371,13 +371,13 @@ TEST_CASE_METHOD(MergeTestController, "MergeFileDefragmentDropFlow", "[mergefile
   REQUIRE(flow1->getSize() == 96);
   {
     FixedBuffer callback(gsl::narrow<size_t>(flow1->getSize()));
-    sessionGenFlowFile.read(flow1, &callback);
+    sessionGenFlowFile.read(flow1, std::ref(callback));
     REQUIRE(callback.to_string() == expected[0]);
   }
   REQUIRE(flow2->getSize() == 64);
   {
     FixedBuffer callback(gsl::narrow<size_t>(flow2->getSize()));
-    sessionGenFlowFile.read(flow2, &callback);
+    sessionGenFlowFile.read(flow2, std::ref(callback));
     REQUIRE(callback.to_string() == expected[1]);
   }
 }
@@ -419,13 +419,13 @@ TEST_CASE_METHOD(MergeTestController, "MergeFileBinPack", "[mergefiletest4]") {
   REQUIRE(flow1->getSize() == 96);
   {
     FixedBuffer callback(gsl::narrow<size_t>(flow1->getSize()));
-    sessionGenFlowFile.read(flow1, &callback);
+    sessionGenFlowFile.read(flow1, std::ref(callback));
     REQUIRE(callback.to_string() == expected[0]);
   }
   REQUIRE(flow2->getSize() == 96);
   {
     FixedBuffer callback(gsl::narrow<size_t>(flow2->getSize()));
-    sessionGenFlowFile.read(flow2, &callback);
+    sessionGenFlowFile.read(flow2, std::ref(callback));
     REQUIRE(callback.to_string() == expected[1]);
   }
 }
@@ -463,7 +463,7 @@ TEST_CASE_METHOD(MergeTestController, "MergeFileTar", "[mergefiletest4]") {
   REQUIRE(flow1->getSize() > 0);
   {
     FixedBuffer callback(gsl::narrow<size_t>(flow1->getSize()));
-    sessionGenFlowFile.read(flow1, &callback);
+    sessionGenFlowFile.read(flow1, std::ref(callback));
     auto archives = read_archives(callback);
     REQUIRE(archives.size() == 3);
     for (int i = 0; i < 3; i++) {
@@ -473,7 +473,7 @@ TEST_CASE_METHOD(MergeTestController, "MergeFileTar", "[mergefiletest4]") {
   REQUIRE(flow2->getSize() > 0);
   {
     FixedBuffer callback(gsl::narrow<size_t>(flow2->getSize()));
-    sessionGenFlowFile.read(flow2, &callback);
+    sessionGenFlowFile.read(flow2, std::ref(callback));
     auto archives = read_archives(callback);
     REQUIRE(archives.size() == 3);
     for (int i = 3; i < 6; i++) {
@@ -514,7 +514,7 @@ TEST_CASE_METHOD(MergeTestController, "MergeFileZip", "[mergefiletest5]") {
   REQUIRE(flow1->getSize() > 0);
   {
     FixedBuffer callback(gsl::narrow<size_t>(flow1->getSize()));
-    sessionGenFlowFile.read(flow1, &callback);
+    sessionGenFlowFile.read(flow1, std::ref(callback));
     auto archives = read_archives(callback);
     REQUIRE(archives.size() == 3);
     for (int i = 0; i < 3; i++) {
@@ -524,7 +524,7 @@ TEST_CASE_METHOD(MergeTestController, "MergeFileZip", "[mergefiletest5]") {
   REQUIRE(flow2->getSize() > 0);
   {
     FixedBuffer callback(gsl::narrow<size_t>(flow2->getSize()));
-    sessionGenFlowFile.read(flow2, &callback);
+    sessionGenFlowFile.read(flow2, std::ref(callback));
     auto archives = read_archives(callback);
     REQUIRE(archives.size() == 3);
     for (int i = 3; i < 6; i++) {
@@ -572,12 +572,12 @@ TEST_CASE_METHOD(MergeTestController, "MergeFileOnAttribute", "[mergefiletest5]"
   std::shared_ptr<core::FlowFile> flow2 = output_->poll(expiredFlowRecords);
   {
     FixedBuffer callback(flow1->getSize());
-    sessionGenFlowFile.read(flow1, &callback);
+    sessionGenFlowFile.read(flow1, std::ref(callback));
     REQUIRE(callback.to_string() == expected[0]);
   }
   {
     FixedBuffer callback(flow2->getSize());
-    sessionGenFlowFile.read(flow2, &callback);
+    sessionGenFlowFile.read(flow2, std::ref(callback));
     REQUIRE(callback.to_string() == expected[1]);
   }
 }
@@ -697,10 +697,10 @@ TEST_CASE("FlowFile serialization", "[testFlowFileSerialization]") {
 
   core::ProcessSession session(context);
 
-  minifi::PayloadSerializer payloadSerializer([&] (const std::shared_ptr<core::FlowFile>& ff, minifi::InputStreamCallback* cb) {
+  minifi::PayloadSerializer payloadSerializer([&] (const std::shared_ptr<core::FlowFile>& ff, const minifi::io::InputStreamCallback& cb) {
     return session.read(ff, cb);
   });
-  minifi::FlowFileV3Serializer ffV3Serializer([&] (const std::shared_ptr<core::FlowFile>& ff, minifi::InputStreamCallback* cb) {
+  minifi::FlowFileV3Serializer ffV3Serializer([&] (const std::shared_ptr<core::FlowFile>& ff, const minifi::io::InputStreamCallback& cb) {
     return session.read(ff, cb);
   });
 
@@ -768,7 +768,7 @@ TEST_CASE("FlowFile serialization", "[testFlowFileSerialization]") {
   REQUIRE(expiredFlowRecords.empty());
   {
     FixedBuffer callback(flow->getSize());
-    session.read(flow, &callback);
+    session.read(flow, std::ref(callback));
     REQUIRE(callback.to_string() == expected);
   }
 
@@ -812,13 +812,13 @@ TEST_CASE_METHOD(MergeTestController, "Batch Size", "[testMergeFileBatchSize]")
   REQUIRE(flow1);
   {
     FixedBuffer callback(gsl::narrow<size_t>(flow1->getSize()));
-    sessionGenFlowFile.read(flow1, &callback);
+    sessionGenFlowFile.read(flow1, std::ref(callback));
     REQUIRE(callback.to_string() == expected[0]);
   }
   REQUIRE(flow2);
   {
     FixedBuffer callback(gsl::narrow<size_t>(flow2->getSize()));
-    sessionGenFlowFile.read(flow2, &callback);
+    sessionGenFlowFile.read(flow2, std::ref(callback));
     REQUIRE(callback.to_string() == expected[1]);
   }
 }
diff --git a/libminifi/test/persistence-tests/PersistenceTests.cpp b/libminifi/test/persistence-tests/PersistenceTests.cpp
index 276a06445..19e51af29 100644
--- a/libminifi/test/persistence-tests/PersistenceTests.cpp
+++ b/libminifi/test/persistence-tests/PersistenceTests.cpp
@@ -109,8 +109,7 @@ struct TestFlow{
     return flow;
   }
   std::string read(const std::shared_ptr<core::FlowFile>& file) {
-    core::ProcessSession session(processorContext);
-    return to_string(session.readBuffer(file));
+    return to_string(core::ProcessSession{processorContext}.readBuffer(file));
   }
   void trigger() {
     auto session = std::make_shared<core::ProcessSession>(processorContext);
diff --git a/libminifi/test/unit/ContentRepositoryDependentTests.h b/libminifi/test/unit/ContentRepositoryDependentTests.h
index 49c735e7d..dc4d656ac 100644
--- a/libminifi/test/unit/ContentRepositoryDependentTests.h
+++ b/libminifi/test/unit/ContentRepositoryDependentTests.h
@@ -33,21 +33,10 @@
 
 namespace ContentRepositoryDependentTests {
 
-struct WriteStringToFlowFile : public minifi::OutputStreamCallback {
-  const std::vector<uint8_t> buffer_;
-
-  explicit WriteStringToFlowFile(const std::string& buffer) : buffer_(buffer.begin(), buffer.end()) {}
-
-  int64_t process(const std::shared_ptr<minifi::io::BaseStream> &stream) override {
-    size_t bytes_written = stream->write(buffer_, buffer_.size());
-    return minifi::io::isError(bytes_written) ? -1 : gsl::narrow<int64_t>(bytes_written);
-  }
-};
-
-struct ReadUntilItCan : public minifi::InputStreamCallback {
+struct ReadUntilItCan {
   std::string value_;
 
-  int64_t process(const std::shared_ptr<minifi::io::BaseStream> &stream) override {
+  int64_t operator()(const std::shared_ptr<minifi::io::BaseStream> &stream) {
     value_.clear();
     std::array<std::byte, 1024> buffer{};
     size_t bytes_read = 0;
@@ -93,14 +82,12 @@ class Fixture {
   }
 
   void writeToFlowFile(const std::shared_ptr<core::FlowFile>& flow_file, const std::string content) {
-    WriteStringToFlowFile callback(content);
-    process_session_->write(flow_file, &callback);
+    process_session_->writeBuffer(flow_file, content);
   }
 
   void appendToFlowFile(const std::shared_ptr<core::FlowFile>& flow_file, const std::string content_to_append) {
-    WriteStringToFlowFile callback(content_to_append);
     process_session_->add(flow_file);
-    process_session_->append(flow_file, &callback);
+    process_session_->appendBuffer(flow_file, content_to_append);
   }
 
  private:
@@ -125,17 +112,17 @@ void testReadOnSmallerClonedFlowFiles(std::shared_ptr<core::ContentRepository> c
   REQUIRE(clone_second_half != nullptr);
   ReadUntilItCan read_until_it_can_callback;
   const auto read_result_original = process_session.readBuffer(original_ff);
-  process_session.read(original_ff, &read_until_it_can_callback);
+  process_session.read(original_ff, std::ref(read_until_it_can_callback));
   CHECK(original_ff->getSize() == 6);
   CHECK(to_string(read_result_original) == "foobar");
   CHECK(read_until_it_can_callback.value_ == "foobar");
   const auto read_result_first_half = process_session.readBuffer(clone_first_half);
-  process_session.read(clone_first_half, &read_until_it_can_callback);
+  process_session.read(clone_first_half, std::ref(read_until_it_can_callback));
   CHECK(clone_first_half->getSize() == 3);
   CHECK(to_string(read_result_first_half) == "foo");
   CHECK(read_until_it_can_callback.value_ == "foo");
   const auto read_result_second_half = process_session.readBuffer(clone_second_half);
-  process_session.read(clone_second_half, &read_until_it_can_callback);
+  process_session.read(clone_second_half, std::ref(read_until_it_can_callback));
   CHECK(clone_second_half->getSize() == 3);
   CHECK(to_string(read_result_second_half) == "bar");
   CHECK(read_until_it_can_callback.value_ == "bar");
@@ -155,7 +142,7 @@ void testAppendToUnmanagedFlowFile(std::shared_ptr<core::ContentRepository> cont
   CHECK(flow_file->getSize() == 8);
   ReadUntilItCan read_until_it_can_callback;
   const auto read_result = process_session.readBuffer(flow_file);
-  process_session.read(flow_file, &read_until_it_can_callback);
+  process_session.read(flow_file, std::ref(read_until_it_can_callback));
   CHECK(to_string(read_result) == "myfoobar");
   CHECK(read_until_it_can_callback.value_ == "myfoobar");
 }
@@ -173,8 +160,8 @@ void testAppendToManagedFlowFile(std::shared_ptr<core::ContentRepository> conten
   CHECK(flow_file->getSize() == 8);
   const auto read_result = process_session.readBuffer(flow_file);
   ReadUntilItCan read_until_it_can_callback;
-  process_session.read(flow_file, &read_until_it_can_callback);
   CHECK(to_string(read_result) == "myfoobar");
+  process_session.read(flow_file, std::ref(read_until_it_can_callback));
   CHECK(read_until_it_can_callback.value_ == "myfoobar");
 }
 
@@ -187,7 +174,6 @@ void testReadFromZeroLengthFlowFile(std::shared_ptr<core::ContentRepository> con
 
   CHECK(flow_file->getSize() == 0);
   REQUIRE_NOTHROW(process_session.readBuffer(flow_file));
-  ReadUntilItCan read_until_it_can_callback;
-  REQUIRE_NOTHROW(process_session.read(flow_file, &read_until_it_can_callback));
+  REQUIRE_NOTHROW(process_session.read(flow_file, ReadUntilItCan{}));
 }
 }  // namespace ContentRepositoryDependentTests
diff --git a/libminifi/test/unit/FlowFileSerializationTests.cpp b/libminifi/test/unit/FlowFileSerializationTests.cpp
index aa8ffa3d3..dcfca26eb 100644
--- a/libminifi/test/unit/FlowFileSerializationTests.cpp
+++ b/libminifi/test/unit/FlowFileSerializationTests.cpp
@@ -47,8 +47,8 @@ TEST_CASE("Payload Serializer", "[testPayload]") {
   flowFile->addAttribute("first", "one");
   flowFile->addAttribute("second", "two");
 
-  minifi::PayloadSerializer serializer([&] (const std::shared_ptr<core::FlowFile>&, minifi::InputStreamCallback* cb) {
-    return gsl::narrow<int>(cb->process(contentStream));
+  minifi::PayloadSerializer serializer([&] (const std::shared_ptr<core::FlowFile>&, const minifi::io::InputStreamCallback& cb) {
+    return cb(contentStream);
   });
   serializer.serialize(flowFile, result);
   const auto serialized = utils::span_to<std::string>(result->getBuffer().as_span<const char>());
@@ -67,8 +67,8 @@ TEST_CASE("FFv3 Serializer", "[testFFv3]") {
   flowFile->addAttribute("first", "one");
   flowFile->addAttribute("second", "two");
 
-  minifi::FlowFileV3Serializer serializer([&] (const std::shared_ptr<core::FlowFile>&, minifi::InputStreamCallback* cb) {
-    return gsl::narrow<int>(cb->process(contentStream));
+  minifi::FlowFileV3Serializer serializer([&] (const std::shared_ptr<core::FlowFile>&, const minifi::io::InputStreamCallback& cb) {
+    return cb(contentStream);
   });
   serializer.serialize(flowFile, result);
   const auto serialized = utils::span_to<std::string>(result->getBuffer().as_span<const char>());
diff --git a/libminifi/test/unit/LineByLineInputOutputStreamCallbackTests.cpp b/libminifi/test/unit/LineByLineInputOutputStreamCallbackTests.cpp
index 1266ec615..74e59f1c6 100644
--- a/libminifi/test/unit/LineByLineInputOutputStreamCallbackTests.cpp
+++ b/libminifi/test/unit/LineByLineInputOutputStreamCallbackTests.cpp
@@ -70,7 +70,7 @@ TEST_CASE("LineByLineInputOutputStreamCallback can process a stream line by line
   }
 
   LineByLineInputOutputStreamCallback line_by_line_input_output_stream_callback{line_processor};
-  line_by_line_input_output_stream_callback.process(input_stream, output_stream);
+  line_by_line_input_output_stream_callback(input_stream, output_stream);
   const auto output_data = utils::span_to<std::string>(output_stream->getBuffer().as_span<const char>());
   CHECK(output_data == expected_output);
 }
@@ -91,7 +91,7 @@ TEST_CASE("LineByLineInputOutputStreamCallback can handle Windows line endings",
                                "3: Five six, picking up sticks\r\n";
 
   LineByLineInputOutputStreamCallback line_by_line_input_output_stream_callback{line_processor};
-  line_by_line_input_output_stream_callback.process(input_stream, output_stream);
+  line_by_line_input_output_stream_callback(input_stream, output_stream);
   const auto output_data = utils::span_to<std::string>(output_stream->getBuffer().as_span<const char>());
   CHECK(output_data == expected_output);
 }
@@ -101,6 +101,6 @@ TEST_CASE("LineByLineInputOutputStreamCallback can handle an empty input", "[pro
   const auto output_stream = std::make_shared<minifi::io::BufferStream>();
   const auto line_processor = [](const std::string& input_line, bool, bool) { return input_line; };
   LineByLineInputOutputStreamCallback line_by_line_input_output_stream_callback{line_processor};
-  line_by_line_input_output_stream_callback.process(input_stream, output_stream);
+  line_by_line_input_output_stream_callback(input_stream, output_stream);
   CHECK(output_stream->size() == 0);
 }
diff --git a/libminifi/test/unit/LoggerTests.cpp b/libminifi/test/unit/LoggerTests.cpp
index 73f4751b2..deec59861 100644
--- a/libminifi/test/unit/LoggerTests.cpp
+++ b/libminifi/test/unit/LoggerTests.cpp
@@ -156,7 +156,7 @@ using namespace minifi::io;
 std::string decompress(const std::shared_ptr<InputStream>& input) {
   auto output = std::make_unique<BufferStream>();
   auto decompressor = std::make_shared<ZlibDecompressStream>(gsl::make_not_null(output.get()));
-  minifi::internal::pipe(input, decompressor);
+  minifi::internal::pipe(*input, *decompressor);
   decompressor->close();
   return utils::span_to<std::string>(output->getBuffer().as_span<const char>());
 }