You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2022/05/03 15:06:03 UTC

[nifi-minifi-cpp] branch main updated (863f5d85d -> 2d9784504)

This is an automated email from the ASF dual-hosted git repository.

martinzink pushed a change to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git


    from 863f5d85d MINIFICPP-1808 Improve RawSocketProtocol authorization error logging
     new 3070b8550 MINIFICPP-1717 Refactor *StreamCallback to be function objects
     new b9545e7dc MINIFICPP-1721 Fix GetFileTests transient failure
     new 2d9784504 MINIFICPP-1816 adapt bootstrap to Ubuntu 22.04

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 aptitude.sh                                        |  6 +-
 extensions/aws/processors/FetchS3Object.cpp        | 28 ++++---
 extensions/aws/processors/FetchS3Object.h          | 21 ------
 extensions/aws/processors/PutS3Object.cpp          |  2 +-
 extensions/aws/processors/PutS3Object.h            |  4 +-
 .../azure/processors/FetchAzureBlobStorage.cpp     | 40 +++-------
 .../azure/processors/FetchAzureDataLakeStorage.cpp | 13 +++-
 .../azure/processors/FetchAzureDataLakeStorage.h   | 28 -------
 .../azure/processors/PutAzureBlobStorage.cpp       |  2 +-
 extensions/azure/processors/PutAzureBlobStorage.h  |  4 +-
 .../azure/processors/PutAzureDataLakeStorage.cpp   |  4 +-
 .../azure/processors/PutAzureDataLakeStorage.h     |  6 +-
 extensions/azure/storage/AzureBlobStorage.cpp      |  2 +-
 extensions/azure/storage/AzureDataLakeStorage.cpp  |  2 +-
 extensions/bustache/ApplyTemplate.cpp              | 58 ++++++---------
 extensions/bustache/ApplyTemplate.h                |  5 +-
 extensions/civetweb/processors/ListenHTTP.cpp      | 15 +---
 extensions/civetweb/processors/ListenHTTP.h        | 10 ---
 extensions/gcp/processors/PutGCSObject.cpp         |  6 +-
 extensions/http-curl/client/HTTPCallback.h         |  6 +-
 extensions/http-curl/processors/InvokeHTTP.cpp     |  8 +-
 extensions/http-curl/protocols/RESTSender.cpp      |  6 +-
 .../http-curl/tests/HttpPostIntegrationTest.cpp    |  1 -
 extensions/jni/jvm/JniProcessSession.cpp           | 29 +++-----
 extensions/jni/jvm/JniReferenceObjects.h           | 21 +-----
 extensions/libarchive/CompressContent.cpp          | 14 ++--
 extensions/libarchive/CompressContent.h            | 62 ++++++----------
 extensions/libarchive/FocusArchiveEntry.cpp        |  9 +--
 extensions/libarchive/FocusArchiveEntry.h          |  5 +-
 extensions/libarchive/ManipulateArchive.cpp        |  7 +-
 extensions/libarchive/MergeContent.cpp             | 11 +--
 extensions/libarchive/MergeContent.h               | 12 +--
 extensions/libarchive/UnfocusArchiveEntry.cpp      |  4 +-
 extensions/libarchive/UnfocusArchiveEntry.h        |  4 +-
 extensions/librdkafka/ConsumeKafka.cpp             | 10 +--
 extensions/librdkafka/ConsumeKafka.h               | 11 ---
 extensions/librdkafka/PublishKafka.cpp             |  8 +-
 extensions/mqtt/processors/ConsumeMQTT.cpp         | 17 ++++-
 extensions/mqtt/processors/ConsumeMQTT.h           | 22 ------
 extensions/mqtt/processors/ConvertHeartBeat.cpp    |  2 +-
 extensions/mqtt/processors/ConvertJSONAck.cpp      | 17 ++---
 extensions/mqtt/processors/ConvertJSONAck.h        |  1 -
 extensions/mqtt/processors/PublishMQTT.cpp         |  2 +-
 extensions/mqtt/processors/PublishMQTT.h           |  5 +-
 extensions/opc/include/fetchopc.h                  | 13 +---
 extensions/opc/src/fetchopc.cpp                    |  5 +-
 extensions/opc/src/putopc.cpp                      |  1 +
 extensions/opencv/CaptureRTSPFrame.cpp             |  9 ++-
 extensions/opencv/CaptureRTSPFrame.h               | 32 +-------
 extensions/opencv/FrameIO.h                        | 79 --------------------
 extensions/opencv/MotionDetector.cpp               | 22 ++++--
 .../SourceInitiatedSubscriptionListener.cpp        | 25 +------
 .../SourceInitiatedSubscriptionListener.h          | 23 +-----
 extensions/pdh/PerformanceDataMonitor.cpp          |  4 +-
 extensions/procfs/processors/ProcFsMonitor.cpp     |  4 +-
 extensions/script/lua/LuaProcessSession.cpp        | 12 ++-
 extensions/script/lua/LuaProcessSession.h          | 32 --------
 extensions/script/python/PyProcessSession.cpp      | 10 ++-
 extensions/script/python/PyProcessSession.h        | 30 --------
 extensions/sensors/GetEnvironmentalSensors.cpp     |  3 +-
 extensions/sensors/GetMovementSensors.cpp          |  3 +-
 extensions/sensors/SensorBase.h                    | 21 ------
 extensions/sftp/processors/FetchSFTP.cpp           | 23 ++----
 extensions/sftp/processors/FetchSFTP.h             | 13 ----
 extensions/sftp/processors/PutSFTP.cpp             | 31 +++-----
 extensions/sftp/processors/PutSFTP.h               | 15 ----
 extensions/splunk/PutSplunkHTTP.cpp                |  7 +-
 extensions/sql/processors/FlowFileSource.cpp       |  3 +-
 .../processors/AttributesToJSON.cpp                |  3 +-
 .../processors/AttributesToJSON.h                  | 11 ---
 .../processors/DefragmentText.cpp                  | 37 ++--------
 .../processors/ExecuteProcess.cpp                  | 14 ++--
 .../processors/ExecuteProcess.h                    | 17 -----
 .../standard-processors/processors/ExtractText.cpp | 14 ++--
 .../standard-processors/processors/ExtractText.h   |  6 +-
 .../processors/GenerateFlowFile.cpp                |  6 +-
 .../processors/GenerateFlowFile.h                  | 13 ----
 .../standard-processors/processors/GetFile.cpp     |  3 +-
 .../standard-processors/processors/GetTCP.cpp      |  4 +-
 extensions/standard-processors/processors/GetTCP.h | 19 -----
 .../standard-processors/processors/HashContent.cpp | 27 +++----
 .../standard-processors/processors/HashContent.h   | 11 ---
 .../processors/LogAttribute.cpp                    |  7 +-
 .../standard-processors/processors/LogAttribute.h  | 19 -----
 .../standard-processors/processors/PutFile.cpp     |  4 +-
 .../standard-processors/processors/PutFile.h       |  6 +-
 .../standard-processors/processors/ReplaceText.cpp | 36 +--------
 .../standard-processors/processors/RouteText.cpp   |  6 +-
 .../standard-processors/processors/TailFile.cpp    | 12 +--
 .../tests/unit/DefragmentTextTests.cpp             | 16 +---
 .../tests/unit/GetFileTests.cpp                    |  2 +-
 extensions/usb-camera/GetUSBCamera.cpp             | 39 +++-------
 extensions/usb-camera/GetUSBCamera.h               | 15 +---
 .../CollectorInitiatedSubscription.cpp             | 18 +----
 .../windows-event-log/ConsumeWindowsEventLog.cpp   | 18 +----
 libminifi/include/core/ProcessSession.h            | 19 ++---
 .../include/core/ProcessSessionReadCallback.h      |  4 +-
 .../include/io/StreamCallback.h                    | 33 +++------
 libminifi/include/io/StreamPipe.h                  | 86 +++++-----------------
 .../include/serialization/FlowFileSerializer.h     |  5 +-
 libminifi/include/sitetosite/SiteToSiteClient.h    | 54 --------------
 libminifi/include/utils/ByteArrayCallback.h        | 17 ++---
 libminifi/include/utils/FileReaderCallback.h       |  8 +-
 libminifi/include/utils/HTTPClient.h               |  2 +-
 libminifi/include/utils/JsonCallback.h             | 14 ++--
 .../utils/LineByLineInputOutputStreamCallback.h    |  4 +-
 libminifi/include/utils/StringUtils.h              |  1 +
 libminifi/src/c2/C2Agent.cpp                       |  2 +-
 libminifi/src/core/ProcessSession.cpp              | 74 +++++++++----------
 libminifi/src/core/ProcessSessionReadCallback.cpp  |  2 +-
 .../src/serialization/FlowFileV3Serializer.cpp     |  3 +-
 libminifi/src/serialization/PayloadSerializer.cpp  |  3 +-
 libminifi/src/sitetosite/SiteToSiteClient.cpp      | 13 +++-
 libminifi/src/utils/ByteArrayCallback.cpp          |  4 +-
 libminifi/src/utils/FileReaderCallback.cpp         | 29 ++++----
 .../utils/LineByLineInputOutputStreamCallback.cpp  |  2 +-
 libminifi/test/ReadFromFlowFileTestProcessor.cpp   | 21 +-----
 libminifi/test/TestBase.cpp                        |  2 +-
 libminifi/test/WriteToFlowFileTestProcessor.cpp    | 19 +----
 .../test/archive-tests/CompressContentTests.cpp    |  6 +-
 libminifi/test/archive-tests/MergeFileTests.cpp    | 42 +++++------
 .../test/persistence-tests/PersistenceTests.cpp    |  3 +-
 .../test/unit/ContentRepositoryDependentTests.h    | 34 +++------
 libminifi/test/unit/FlowFileSerializationTests.cpp |  8 +-
 .../LineByLineInputOutputStreamCallbackTests.cpp   |  6 +-
 libminifi/test/unit/LoggerTests.cpp                |  2 +-
 126 files changed, 508 insertions(+), 1367 deletions(-)
 delete mode 100644 extensions/opencv/FrameIO.h
 copy extensions/script/ScriptProcessContext.h => libminifi/include/io/StreamCallback.h (59%)


[nifi-minifi-cpp] 03/03: MINIFICPP-1816 adapt bootstrap to Ubuntu 22.04

Posted by ma...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

martinzink pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit 2d978450419f008ce294382cfb5fb9894beb7ed9
Author: Marton Szasz <sz...@apache.org>
AuthorDate: Tue May 3 14:48:07 2022 +0200

    MINIFICPP-1816 adapt bootstrap to Ubuntu 22.04
    
    Closes #1319
    Signed-off-by: Martin Zink <ma...@apache.org>
---
 aptitude.sh | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)

diff --git a/aptitude.sh b/aptitude.sh
index 23abe159d..08c57d63c 100644
--- a/aptitude.sh
+++ b/aptitude.sh
@@ -23,7 +23,7 @@ verify_enable_platform(){
 add_os_flags() {
     CC=gcc
     CXX=g++
-    if [[ "$OS" = Ubuntu* ]]; then
+    if [[ "$OS" = Ubuntu* && "$OS_MAJOR" -lt 22 ]]; then
         CC=gcc-11
         CXX=g++-11
     fi
@@ -33,7 +33,7 @@ add_os_flags() {
 }
 bootstrap_cmake(){
     ## on Ubuntu install the latest CMake
-    if [[ "$OS" = Ubuntu* ]]; then
+    if [[ "$OS" = Ubuntu* && "$OS_MAJOR" -lt 22 ]]; then
         echo "Adding KitWare CMake apt repository..."
         sudo apt-get update && sudo apt-get install -y apt-transport-https ca-certificates gnupg software-properties-common wget
         wget -O - https://apt.kitware.com/keys/kitware-archive-latest.asc 2>/dev/null | sudo apt-key add -
@@ -44,7 +44,7 @@ bootstrap_cmake(){
 build_deps(){
     ## need to account for debian
     compiler_pkgs="gcc g++"
-    if [[ "$OS" = Ubuntu* ]]; then
+    if [[ "$OS" = Ubuntu* && "$OS_MAJOR" -lt 22 ]]; then
         sudo add-apt-repository -y ppa:ubuntu-toolchain-r/test
         compiler_pkgs="gcc-11 g++-11"
     fi


[nifi-minifi-cpp] 01/03: MINIFICPP-1717 Refactor *StreamCallback to be function objects

Posted by ma...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

martinzink pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit 3070b855038a6f1ccd58b6537f4f5580681d3dd9
Author: Marton Szasz <sz...@apache.org>
AuthorDate: Tue May 3 14:44:53 2022 +0200

    MINIFICPP-1717 Refactor *StreamCallback to be function objects
    
    Closes #1236
    Signed-off-by: Martin Zink <ma...@apache.org>
---
 extensions/aws/processors/FetchS3Object.cpp        | 28 ++++---
 extensions/aws/processors/FetchS3Object.h          | 21 ------
 extensions/aws/processors/PutS3Object.cpp          |  2 +-
 extensions/aws/processors/PutS3Object.h            |  4 +-
 .../azure/processors/FetchAzureBlobStorage.cpp     | 40 +++-------
 .../azure/processors/FetchAzureDataLakeStorage.cpp | 13 +++-
 .../azure/processors/FetchAzureDataLakeStorage.h   | 28 -------
 .../azure/processors/PutAzureBlobStorage.cpp       |  2 +-
 extensions/azure/processors/PutAzureBlobStorage.h  |  4 +-
 .../azure/processors/PutAzureDataLakeStorage.cpp   |  4 +-
 .../azure/processors/PutAzureDataLakeStorage.h     |  6 +-
 extensions/azure/storage/AzureBlobStorage.cpp      |  2 +-
 extensions/azure/storage/AzureDataLakeStorage.cpp  |  2 +-
 extensions/bustache/ApplyTemplate.cpp              | 58 ++++++---------
 extensions/bustache/ApplyTemplate.h                |  5 +-
 extensions/civetweb/processors/ListenHTTP.cpp      | 15 +---
 extensions/civetweb/processors/ListenHTTP.h        | 10 ---
 extensions/gcp/processors/PutGCSObject.cpp         |  6 +-
 extensions/http-curl/client/HTTPCallback.h         |  6 +-
 extensions/http-curl/processors/InvokeHTTP.cpp     |  8 +-
 extensions/http-curl/protocols/RESTSender.cpp      |  6 +-
 .../http-curl/tests/HttpPostIntegrationTest.cpp    |  1 -
 extensions/jni/jvm/JniProcessSession.cpp           | 29 +++-----
 extensions/jni/jvm/JniReferenceObjects.h           | 21 +-----
 extensions/libarchive/CompressContent.cpp          | 14 ++--
 extensions/libarchive/CompressContent.h            | 62 ++++++----------
 extensions/libarchive/FocusArchiveEntry.cpp        |  9 +--
 extensions/libarchive/FocusArchiveEntry.h          |  5 +-
 extensions/libarchive/ManipulateArchive.cpp        |  7 +-
 extensions/libarchive/MergeContent.cpp             | 11 +--
 extensions/libarchive/MergeContent.h               | 12 +--
 extensions/libarchive/UnfocusArchiveEntry.cpp      |  4 +-
 extensions/libarchive/UnfocusArchiveEntry.h        |  4 +-
 extensions/librdkafka/ConsumeKafka.cpp             | 10 +--
 extensions/librdkafka/ConsumeKafka.h               | 11 ---
 extensions/librdkafka/PublishKafka.cpp             |  8 +-
 extensions/mqtt/processors/ConsumeMQTT.cpp         | 17 ++++-
 extensions/mqtt/processors/ConsumeMQTT.h           | 22 ------
 extensions/mqtt/processors/ConvertHeartBeat.cpp    |  2 +-
 extensions/mqtt/processors/ConvertJSONAck.cpp      | 17 ++---
 extensions/mqtt/processors/ConvertJSONAck.h        |  1 -
 extensions/mqtt/processors/PublishMQTT.cpp         |  2 +-
 extensions/mqtt/processors/PublishMQTT.h           |  5 +-
 extensions/opc/include/fetchopc.h                  | 13 +---
 extensions/opc/src/fetchopc.cpp                    |  5 +-
 extensions/opc/src/putopc.cpp                      |  1 +
 extensions/opencv/CaptureRTSPFrame.cpp             |  9 ++-
 extensions/opencv/CaptureRTSPFrame.h               | 32 +-------
 extensions/opencv/FrameIO.h                        | 79 --------------------
 extensions/opencv/MotionDetector.cpp               | 22 ++++--
 .../SourceInitiatedSubscriptionListener.cpp        | 25 +------
 .../SourceInitiatedSubscriptionListener.h          | 23 +-----
 extensions/pdh/PerformanceDataMonitor.cpp          |  4 +-
 extensions/procfs/processors/ProcFsMonitor.cpp     |  4 +-
 extensions/script/lua/LuaProcessSession.cpp        | 12 ++-
 extensions/script/lua/LuaProcessSession.h          | 32 --------
 extensions/script/python/PyProcessSession.cpp      | 10 ++-
 extensions/script/python/PyProcessSession.h        | 30 --------
 extensions/sensors/GetEnvironmentalSensors.cpp     |  3 +-
 extensions/sensors/GetMovementSensors.cpp          |  3 +-
 extensions/sensors/SensorBase.h                    | 21 ------
 extensions/sftp/processors/FetchSFTP.cpp           | 23 ++----
 extensions/sftp/processors/FetchSFTP.h             | 13 ----
 extensions/sftp/processors/PutSFTP.cpp             | 31 +++-----
 extensions/sftp/processors/PutSFTP.h               | 15 ----
 extensions/splunk/PutSplunkHTTP.cpp                |  7 +-
 extensions/sql/processors/FlowFileSource.cpp       |  3 +-
 .../processors/AttributesToJSON.cpp                |  3 +-
 .../processors/AttributesToJSON.h                  | 11 ---
 .../processors/DefragmentText.cpp                  | 37 ++--------
 .../processors/ExecuteProcess.cpp                  | 14 ++--
 .../processors/ExecuteProcess.h                    | 17 -----
 .../standard-processors/processors/ExtractText.cpp | 14 ++--
 .../standard-processors/processors/ExtractText.h   |  6 +-
 .../processors/GenerateFlowFile.cpp                |  6 +-
 .../processors/GenerateFlowFile.h                  | 13 ----
 .../standard-processors/processors/GetFile.cpp     |  3 +-
 .../standard-processors/processors/GetTCP.cpp      |  4 +-
 extensions/standard-processors/processors/GetTCP.h | 19 -----
 .../standard-processors/processors/HashContent.cpp | 27 +++----
 .../standard-processors/processors/HashContent.h   | 11 ---
 .../processors/LogAttribute.cpp                    |  7 +-
 .../standard-processors/processors/LogAttribute.h  | 19 -----
 .../standard-processors/processors/PutFile.cpp     |  4 +-
 .../standard-processors/processors/PutFile.h       |  6 +-
 .../standard-processors/processors/ReplaceText.cpp | 36 +--------
 .../standard-processors/processors/RouteText.cpp   |  6 +-
 .../standard-processors/processors/TailFile.cpp    | 12 +--
 .../tests/unit/DefragmentTextTests.cpp             | 16 +---
 extensions/usb-camera/GetUSBCamera.cpp             | 39 +++-------
 extensions/usb-camera/GetUSBCamera.h               | 15 +---
 .../CollectorInitiatedSubscription.cpp             | 18 +----
 .../windows-event-log/ConsumeWindowsEventLog.cpp   | 18 +----
 libminifi/include/core/ProcessSession.h            | 19 ++---
 .../include/core/ProcessSessionReadCallback.h      |  4 +-
 .../io/StreamCallback.h}                           | 27 +++----
 libminifi/include/io/StreamPipe.h                  | 86 +++++-----------------
 .../include/serialization/FlowFileSerializer.h     |  5 +-
 libminifi/include/sitetosite/SiteToSiteClient.h    | 54 --------------
 libminifi/include/utils/ByteArrayCallback.h        | 17 ++---
 libminifi/include/utils/FileReaderCallback.h       |  8 +-
 libminifi/include/utils/HTTPClient.h               |  2 +-
 libminifi/include/utils/JsonCallback.h             | 14 ++--
 .../utils/LineByLineInputOutputStreamCallback.h    |  4 +-
 libminifi/include/utils/StringUtils.h              |  1 +
 libminifi/src/c2/C2Agent.cpp                       |  2 +-
 libminifi/src/core/ProcessSession.cpp              | 74 +++++++++----------
 libminifi/src/core/ProcessSessionReadCallback.cpp  |  2 +-
 .../src/serialization/FlowFileV3Serializer.cpp     |  3 +-
 libminifi/src/serialization/PayloadSerializer.cpp  |  3 +-
 libminifi/src/sitetosite/SiteToSiteClient.cpp      | 13 +++-
 libminifi/src/utils/ByteArrayCallback.cpp          |  4 +-
 libminifi/src/utils/FileReaderCallback.cpp         | 29 ++++----
 .../utils/LineByLineInputOutputStreamCallback.cpp  |  2 +-
 libminifi/test/ReadFromFlowFileTestProcessor.cpp   | 21 +-----
 libminifi/test/TestBase.cpp                        |  2 +-
 libminifi/test/WriteToFlowFileTestProcessor.cpp    | 19 +----
 .../test/archive-tests/CompressContentTests.cpp    |  6 +-
 libminifi/test/archive-tests/MergeFileTests.cpp    | 42 +++++------
 .../test/persistence-tests/PersistenceTests.cpp    |  3 +-
 .../test/unit/ContentRepositoryDependentTests.h    | 34 +++------
 libminifi/test/unit/FlowFileSerializationTests.cpp |  8 +-
 .../LineByLineInputOutputStreamCallbackTests.cpp   |  6 +-
 libminifi/test/unit/LoggerTests.cpp                |  2 +-
 124 files changed, 507 insertions(+), 1354 deletions(-)

diff --git a/extensions/aws/processors/FetchS3Object.cpp b/extensions/aws/processors/FetchS3Object.cpp
index e6c7abe2e..af93ebe55 100644
--- a/extensions/aws/processors/FetchS3Object.cpp
+++ b/extensions/aws/processors/FetchS3Object.cpp
@@ -26,6 +26,7 @@
 #include "core/ProcessContext.h"
 #include "core/ProcessSession.h"
 #include "core/Resource.h"
+#include "utils/OptionalUtils.h"
 
 namespace org {
 namespace apache {
@@ -112,10 +113,13 @@ void FetchS3Object::onTrigger(const std::shared_ptr<core::ProcessContext> &conte
     return;
   }
 
-  WriteCallback callback(*get_object_params, s3_wrapper_);
-  session->write(flow_file, &callback);
+  std::optional<minifi::aws::s3::GetObjectResult> result;
+  session->write(flow_file, [&get_object_params, &result, this](const std::shared_ptr<io::BaseStream>& stream) -> int64_t {
+    result = s3_wrapper_.getObject(*get_object_params, *stream);
+    return (result | minifi::utils::map(&s3::GetObjectResult::write_size)).value_or(0);
+  });
 
-  if (callback.result_) {
+  if (result) {
     auto putAttributeIfNotEmpty = [&](const std::string& attribute, const std::string& value) {
       if (!value.empty()) {
         session->putAttribute(flow_file, attribute, value);
@@ -124,15 +128,15 @@ void FetchS3Object::onTrigger(const std::shared_ptr<core::ProcessContext> &conte
 
     logger_->log_debug("Successfully fetched S3 object %s from bucket %s", get_object_params->object_key, get_object_params->bucket);
     session->putAttribute(flow_file, "s3.bucket", get_object_params->bucket);
-    session->putAttribute(flow_file, core::SpecialFlowAttribute::PATH, callback.result_->path);
-    session->putAttribute(flow_file, core::SpecialFlowAttribute::ABSOLUTE_PATH, callback.result_->absolute_path);
-    session->putAttribute(flow_file, core::SpecialFlowAttribute::FILENAME, callback.result_->filename);
-    putAttributeIfNotEmpty(core::SpecialFlowAttribute::MIME_TYPE, callback.result_->mime_type);
-    putAttributeIfNotEmpty("s3.etag", callback.result_->etag);
-    putAttributeIfNotEmpty("s3.expirationTime", callback.result_->expiration.expiration_time);
-    putAttributeIfNotEmpty("s3.expirationTimeRuleId", callback.result_->expiration.expiration_time_rule_id);
-    putAttributeIfNotEmpty("s3.sseAlgorithm", callback.result_->ssealgorithm);
-    putAttributeIfNotEmpty("s3.version", callback.result_->version);
+    session->putAttribute(flow_file, core::SpecialFlowAttribute::PATH, result->path);
+    session->putAttribute(flow_file, core::SpecialFlowAttribute::ABSOLUTE_PATH, result->absolute_path);
+    session->putAttribute(flow_file, core::SpecialFlowAttribute::FILENAME, result->filename);
+    putAttributeIfNotEmpty(core::SpecialFlowAttribute::MIME_TYPE, result->mime_type);
+    putAttributeIfNotEmpty("s3.etag", result->etag);
+    putAttributeIfNotEmpty("s3.expirationTime", result->expiration.expiration_time);
+    putAttributeIfNotEmpty("s3.expirationTimeRuleId", result->expiration.expiration_time_rule_id);
+    putAttributeIfNotEmpty("s3.sseAlgorithm", result->ssealgorithm);
+    putAttributeIfNotEmpty("s3.version", result->version);
     session->transfer(flow_file, Success);
   } else {
     logger_->log_error("Failed to fetch S3 object %s from bucket %s", get_object_params->object_key, get_object_params->bucket);
diff --git a/extensions/aws/processors/FetchS3Object.h b/extensions/aws/processors/FetchS3Object.h
index 4a2411d7c..4c16e5e71 100644
--- a/extensions/aws/processors/FetchS3Object.h
+++ b/extensions/aws/processors/FetchS3Object.h
@@ -64,27 +64,6 @@ class FetchS3Object : public S3Processor {
   void onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) override;
   void onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) override;
 
-  class WriteCallback : public OutputStreamCallback {
-   public:
-    WriteCallback(const minifi::aws::s3::GetObjectRequestParameters& get_object_params, aws::s3::S3Wrapper& s3_wrapper)
-      : get_object_params_(get_object_params),
-        s3_wrapper_(s3_wrapper) {
-    }
-
-    int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
-      result_ = s3_wrapper_.getObject(get_object_params_, *stream);
-      if (!result_) {
-        return 0;
-      }
-
-      return result_->write_size;
-    }
-
-    const minifi::aws::s3::GetObjectRequestParameters& get_object_params_;
-    aws::s3::S3Wrapper& s3_wrapper_;
-    std::optional<minifi::aws::s3::GetObjectResult> result_;
-  };
-
  private:
   core::annotation::Input getInputRequirement() const override {
     return core::annotation::Input::INPUT_REQUIRED;
diff --git a/extensions/aws/processors/PutS3Object.cpp b/extensions/aws/processors/PutS3Object.cpp
index 4027b5a6a..09b9ba337 100644
--- a/extensions/aws/processors/PutS3Object.cpp
+++ b/extensions/aws/processors/PutS3Object.cpp
@@ -281,7 +281,7 @@ void PutS3Object::onTrigger(const std::shared_ptr<core::ProcessContext> &context
   }
 
   PutS3Object::ReadCallback callback(flow_file->getSize(), *put_s3_request_params, s3_wrapper_);
-  session->read(flow_file, &callback);
+  session->read(flow_file, std::ref(callback));
   if (!callback.result_.has_value()) {
     logger_->log_error("Failed to upload S3 object to bucket '%s'", put_s3_request_params->bucket);
     session->transfer(flow_file, Failure);
diff --git a/extensions/aws/processors/PutS3Object.h b/extensions/aws/processors/PutS3Object.h
index 0ba71b187..2d1575502 100644
--- a/extensions/aws/processors/PutS3Object.h
+++ b/extensions/aws/processors/PutS3Object.h
@@ -78,7 +78,7 @@ class PutS3Object : public S3Processor {
   void onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) override;
   void onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) override;
 
-  class ReadCallback : public InputStreamCallback {
+  class ReadCallback {
    public:
     static const uint64_t MAX_SIZE;
     static const uint64_t BUFFER_SIZE;
@@ -89,7 +89,7 @@ class PutS3Object : public S3Processor {
       , s3_wrapper_(s3_wrapper) {
     }
 
-    int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
+    int64_t operator()(const std::shared_ptr<io::BaseStream>& stream) {
       if (flow_size_ > MAX_SIZE) {
         return -1;
       }
diff --git a/extensions/azure/processors/FetchAzureBlobStorage.cpp b/extensions/azure/processors/FetchAzureBlobStorage.cpp
index 31f987db1..5359ce599 100644
--- a/extensions/azure/processors/FetchAzureBlobStorage.cpp
+++ b/extensions/azure/processors/FetchAzureBlobStorage.cpp
@@ -42,34 +42,6 @@ const core::Property FetchAzureBlobStorage::RangeLength(
 const core::Relationship FetchAzureBlobStorage::Success("success", "All successfully processed FlowFiles are routed to this relationship");
 const core::Relationship FetchAzureBlobStorage::Failure("failure", "Unsuccessful operations will be transferred to the failure relationship");
 
-namespace {
-class WriteCallback : public OutputStreamCallback {
- public:
-  WriteCallback(storage::AzureBlobStorage& azure_blob_storage, const storage::FetchAzureBlobStorageParameters& params)
-    : azure_blob_storage_(azure_blob_storage),
-      params_(params) {
-  }
-
-  int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
-    result_size_ = azure_blob_storage_.fetchBlob(params_, *stream);
-    if (!result_size_) {
-      return 0;
-    }
-
-    return gsl::narrow<int64_t>(*result_size_);
-  }
-
-  [[nodiscard]] auto getResult() const {
-    return result_size_;
-  }
-
- private:
-  storage::AzureBlobStorage& azure_blob_storage_;
-  const storage::FetchAzureBlobStorageParameters& params_;
-  std::optional<uint64_t> result_size_ = std::nullopt;
-};
-}  // namespace
-
 void FetchAzureBlobStorage::initialize() {
   setSupportedProperties({
     AzureStorageCredentialsService,
@@ -127,10 +99,16 @@ void FetchAzureBlobStorage::onTrigger(const std::shared_ptr<core::ProcessContext
   }
 
   auto fetched_flow_file = session->create(flow_file);
-  WriteCallback callback(azure_blob_storage_, *params);
-  session->write(fetched_flow_file, &callback);
+  std::optional<int64_t> result_size;
+  session->write(fetched_flow_file, [&, this](const std::shared_ptr<io::BaseStream>& stream) -> int64_t {
+    result_size = azure_blob_storage_.fetchBlob(*params, *stream);
+    if (!result_size) {
+      return 0;
+    }
+    return gsl::narrow<int64_t>(*result_size);
+  });
 
-  if (callback.getResult() == std::nullopt) {
+  if (result_size == std::nullopt) {
     logger_->log_error("Failed to fetch blob '%s' from Azure Blob storage", params->blob_name);
     session->transfer(flow_file, Failure);
     session->remove(fetched_flow_file);
diff --git a/extensions/azure/processors/FetchAzureDataLakeStorage.cpp b/extensions/azure/processors/FetchAzureDataLakeStorage.cpp
index 007147770..c27eb19ab 100644
--- a/extensions/azure/processors/FetchAzureDataLakeStorage.cpp
+++ b/extensions/azure/processors/FetchAzureDataLakeStorage.cpp
@@ -109,10 +109,17 @@ void FetchAzureDataLakeStorage::onTrigger(const std::shared_ptr<core::ProcessCon
   }
 
   auto fetched_flow_file = session->create(flow_file);
-  WriteCallback callback(azure_data_lake_storage_, *params, logger_);
-  session->write(fetched_flow_file, &callback);
+  std::optional<uint64_t> result;
+  session->write(fetched_flow_file, [&, this](const std::shared_ptr<io::BaseStream>& output_stream) -> int64_t {
+    result = azure_data_lake_storage_.fetchFile(*params, *output_stream);
+    if (!result) {
+      return 0;
+    }
+
+    return gsl::narrow<int64_t>(*result);
+  });
 
-  if (callback.getResult() == std::nullopt) {
+  if (result == std::nullopt) {
     logger_->log_error("Failed to fetch file '%s' from Azure Data Lake storage", params->filename);
     session->transfer(flow_file, Failure);
     session->remove(fetched_flow_file);
diff --git a/extensions/azure/processors/FetchAzureDataLakeStorage.h b/extensions/azure/processors/FetchAzureDataLakeStorage.h
index 3c2d08374..ec84b699b 100644
--- a/extensions/azure/processors/FetchAzureDataLakeStorage.h
+++ b/extensions/azure/processors/FetchAzureDataLakeStorage.h
@@ -55,34 +55,6 @@ class FetchAzureDataLakeStorage final : public AzureDataLakeStorageFileProcessor
  private:
   friend class ::AzureDataLakeStorageTestsFixture<FetchAzureDataLakeStorage>;
 
-  class WriteCallback : public OutputStreamCallback {
-   public:
-    WriteCallback(storage::AzureDataLakeStorage& azure_data_lake_storage, const storage::FetchAzureDataLakeStorageParameters& params, std::shared_ptr<core::logging::Logger> logger)
-      : azure_data_lake_storage_(azure_data_lake_storage),
-        params_(params),
-        logger_(std::move(logger)) {
-    }
-
-    int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
-      result_size_ = azure_data_lake_storage_.fetchFile(params_, *stream);
-      if (!result_size_) {
-        return 0;
-      }
-
-      return gsl::narrow<int64_t>(*result_size_);
-    }
-
-    auto getResult() const {
-      return result_size_;
-    }
-
-   private:
-    storage::AzureDataLakeStorage& azure_data_lake_storage_;
-    const storage::FetchAzureDataLakeStorageParameters& params_;
-    std::optional<uint64_t> result_size_ = std::nullopt;
-    std::shared_ptr<core::logging::Logger> logger_;
-  };
-
   core::annotation::Input getInputRequirement() const override {
     return core::annotation::Input::INPUT_REQUIRED;
   }
diff --git a/extensions/azure/processors/PutAzureBlobStorage.cpp b/extensions/azure/processors/PutAzureBlobStorage.cpp
index 2e741b953..5cb70767b 100644
--- a/extensions/azure/processors/PutAzureBlobStorage.cpp
+++ b/extensions/azure/processors/PutAzureBlobStorage.cpp
@@ -99,7 +99,7 @@ void PutAzureBlobStorage::onTrigger(const std::shared_ptr<core::ProcessContext>
     }
   }
   PutAzureBlobStorage::ReadCallback callback(flow_file->getSize(), azure_blob_storage_, *params);
-  session->read(flow_file, &callback);
+  session->read(flow_file, std::ref(callback));
   const std::optional<storage::UploadBlobResult> upload_result = callback.getResult();
 
   if (!upload_result) {
diff --git a/extensions/azure/processors/PutAzureBlobStorage.h b/extensions/azure/processors/PutAzureBlobStorage.h
index 52e0ca22e..1a25fbf7a 100644
--- a/extensions/azure/processors/PutAzureBlobStorage.h
+++ b/extensions/azure/processors/PutAzureBlobStorage.h
@@ -53,7 +53,7 @@ class PutAzureBlobStorage final : public AzureBlobStorageProcessorBase {
   void onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &session_factory) override;
   void onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) override;
 
-  class ReadCallback : public InputStreamCallback {
+  class ReadCallback {
    public:
     ReadCallback(uint64_t flow_size, storage::AzureBlobStorage& azure_blob_storage, const storage::PutAzureBlobStorageParameters& params)
       : flow_size_(flow_size)
@@ -61,7 +61,7 @@ class PutAzureBlobStorage final : public AzureBlobStorageProcessorBase {
       , params_(params) {
     }
 
-    int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
+    int64_t operator()(const std::shared_ptr<io::BaseStream>& stream) {
       std::vector<std::byte> buffer;
       buffer.resize(flow_size_);
       size_t read_ret = stream->read(buffer);
diff --git a/extensions/azure/processors/PutAzureDataLakeStorage.cpp b/extensions/azure/processors/PutAzureDataLakeStorage.cpp
index bf53088bd..7df24ec4a 100644
--- a/extensions/azure/processors/PutAzureDataLakeStorage.cpp
+++ b/extensions/azure/processors/PutAzureDataLakeStorage.cpp
@@ -100,7 +100,7 @@ void PutAzureDataLakeStorage::onTrigger(const std::shared_ptr<core::ProcessConte
   }
 
   PutAzureDataLakeStorage::ReadCallback callback(flow_file->getSize(), azure_data_lake_storage_, *params, logger_);
-  session->read(flow_file, &callback);
+  session->read(flow_file, std::ref(callback));
   const storage::UploadDataLakeStorageResult result = callback.getResult();
 
   if (result.result_code == storage::UploadResultCode::FILE_ALREADY_EXISTS) {
@@ -138,7 +138,7 @@ PutAzureDataLakeStorage::ReadCallback::ReadCallback(
     logger_(std::move(logger)) {
 }
 
-int64_t PutAzureDataLakeStorage::ReadCallback::process(const std::shared_ptr<io::BaseStream>& stream) {
+int64_t PutAzureDataLakeStorage::ReadCallback::operator()(const std::shared_ptr<io::BaseStream>& stream) {
   std::vector<std::byte> buffer;
   buffer.resize(flow_size_);
   size_t read_ret = stream->read(buffer);
diff --git a/extensions/azure/processors/PutAzureDataLakeStorage.h b/extensions/azure/processors/PutAzureDataLakeStorage.h
index 4dddcf6cd..d6d312892 100644
--- a/extensions/azure/processors/PutAzureDataLakeStorage.h
+++ b/extensions/azure/processors/PutAzureDataLakeStorage.h
@@ -61,12 +61,12 @@ class PutAzureDataLakeStorage final : public AzureDataLakeStorageFileProcessorBa
  private:
   friend class ::AzureDataLakeStorageTestsFixture<PutAzureDataLakeStorage>;
 
-  class ReadCallback : public InputStreamCallback {
+  class ReadCallback {
    public:
     ReadCallback(uint64_t flow_size, storage::AzureDataLakeStorage& azure_data_lake_storage, const storage::PutAzureDataLakeStorageParameters& params, std::shared_ptr<core::logging::Logger> logger);
-    int64_t process(const std::shared_ptr<io::BaseStream>& stream) override;
+    int64_t operator()(const std::shared_ptr<io::BaseStream>& stream);
 
-    storage::UploadDataLakeStorageResult getResult() const {
+    [[nodiscard]] storage::UploadDataLakeStorageResult getResult() const {
       return result_;
     }
 
diff --git a/extensions/azure/storage/AzureBlobStorage.cpp b/extensions/azure/storage/AzureBlobStorage.cpp
index fb00421ff..14e26b959 100644
--- a/extensions/azure/storage/AzureBlobStorage.cpp
+++ b/extensions/azure/storage/AzureBlobStorage.cpp
@@ -78,7 +78,7 @@ bool AzureBlobStorage::deleteBlob(const DeleteAzureBlobStorageParameters& params
 std::optional<uint64_t> AzureBlobStorage::fetchBlob(const FetchAzureBlobStorageParameters& params, io::BaseStream& stream) {
   try {
     auto fetch_res = blob_storage_client_->fetchBlob(params);
-    return internal::pipe(fetch_res.get(), &stream);
+    return internal::pipe(*fetch_res, stream);
   } catch (const std::exception& ex) {
     logger_->log_error("An exception occurred while fetching blob '%s' of container '%s': %s", params.blob_name, params.container_name, ex.what());
     return std::nullopt;
diff --git a/extensions/azure/storage/AzureDataLakeStorage.cpp b/extensions/azure/storage/AzureDataLakeStorage.cpp
index 0ba2aeb73..1546b8f05 100644
--- a/extensions/azure/storage/AzureDataLakeStorage.cpp
+++ b/extensions/azure/storage/AzureDataLakeStorage.cpp
@@ -95,7 +95,7 @@ bool AzureDataLakeStorage::deleteFile(const DeleteAzureDataLakeStorageParameters
 std::optional<uint64_t> AzureDataLakeStorage::fetchFile(const FetchAzureDataLakeStorageParameters& params, io::BaseStream& stream) {
   try {
     auto result = data_lake_storage_client_->fetchFile(params);
-    return internal::pipe(result.get(), &stream);
+    return internal::pipe(*result, stream);
   } catch (const std::exception& ex) {
     logger_->log_error("An exception occurred while fetching '%s/%s' of filesystem '%s': %s", params.directory_name, params.filename, params.file_system_name, ex.what());
     return std::nullopt;
diff --git a/extensions/bustache/ApplyTemplate.cpp b/extensions/bustache/ApplyTemplate.cpp
index 1107f8db3..4b2041566 100644
--- a/extensions/bustache/ApplyTemplate.cpp
+++ b/extensions/bustache/ApplyTemplate.cpp
@@ -34,41 +34,6 @@ namespace org::apache::nifi::minifi::processors {
 const core::Property ApplyTemplate::Template("Template", "Path to the input mustache template file", "");
 const core::Relationship ApplyTemplate::Success("success", "success operational on the flow record");
 
-namespace {
-class WriteCallback : public OutputStreamCallback {
- public:
-  WriteCallback(std::filesystem::path templateFile, const core::FlowFile& flow_file)
-      :template_file_{std::move(templateFile)}, flow_file_{flow_file}
-  {}
-  int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
-    logger_->log_info("ApplyTemplate reading template file from %s", template_file_);
-    // TODO(szaszm): we might want to return to memory-mapped input files when the next todo is done. Until then, the agents stores the whole result in memory anyway, so no point in not doing the same
-    // with the template file itself
-    const auto template_file_contents = [this] {
-      std::ifstream ifs{template_file_};
-      return std::string{std::istreambuf_iterator<char>{ifs}, std::istreambuf_iterator<char>{}};
-    }();
-
-    bustache::format format(template_file_contents);
-    bustache::object data;
-
-    for (const auto &attr : flow_file_.getAttributes()) {
-      data[attr.first] = attr.second;
-    }
-
-    // TODO(calebj) write ostream reciever for format() to prevent excessive copying
-    std::string ostring = to_string(format(data));
-    stream->write(gsl::make_span(ostring).as_span<const std::byte>());
-    return gsl::narrow<int64_t>(ostring.length());
-  }
-
- private:
-  std::shared_ptr<core::logging::Logger> logger_ = core::logging::LoggerFactory<ApplyTemplate>::getLogger();
-  std::filesystem::path template_file_;
-  const core::FlowFile& flow_file_;
-};
-}  // namespace
-
 void ApplyTemplate::initialize() {
   setSupportedProperties({Template});
   setSupportedRelationships({Success});
@@ -83,8 +48,27 @@ void ApplyTemplate::onTrigger(const std::shared_ptr<core::ProcessContext> &conte
 
   std::string template_file;
   context->getProperty(Template, template_file, flow_file);
-  WriteCallback cb(template_file, *flow_file);
-  session->write(flow_file, &cb);
+  session->write(flow_file, [&template_file, &flow_file, this](const auto& output_stream) {
+    logger_->log_info("ApplyTemplate reading template file from %s", template_file);
+    // TODO(szaszm): we might want to return to memory-mapped input files when the next todo is done. Until then, the agents stores the whole result in memory anyway, so no point in not doing the same
+    // with the template file itself
+    const auto template_file_contents = [&] {
+      std::ifstream ifs{template_file};
+      return std::string{std::istreambuf_iterator<char>{ifs}, std::istreambuf_iterator<char>{}};
+    }();
+
+    bustache::format format(template_file_contents);
+    bustache::object data;
+
+    for (const auto &attr : flow_file->getAttributes()) {
+      data[attr.first] = attr.second;
+    }
+
+    // TODO(calebj) write ostream reciever for format() to prevent excessive copying
+    std::string ostring = to_string(format(data));
+    output_stream->write(gsl::make_span(ostring).as_span<const std::byte>());
+    return gsl::narrow<int64_t>(ostring.length());
+  });
   session->transfer(flow_file, Success);
 }
 
diff --git a/extensions/bustache/ApplyTemplate.h b/extensions/bustache/ApplyTemplate.h
index 5cc1d5a63..e0033172b 100644
--- a/extensions/bustache/ApplyTemplate.h
+++ b/extensions/bustache/ApplyTemplate.h
@@ -35,16 +35,15 @@ class ApplyTemplate : public core::Processor {
  public:
   explicit ApplyTemplate(const std::string& name, const utils::Identifier& uuid = {})
       : Processor(name, uuid) {}
-  ~ApplyTemplate() override = default;
   static constexpr char const *ProcessorName = "ApplyTemplate";
 
   static const core::Property Template;
 
   static const core::Relationship Success;
 
-  void onTrigger(const std::shared_ptr<core::ProcessContext> &context,
-                 const std::shared_ptr<core::ProcessSession> &session) override;
+  void onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) override;
   void initialize() override;
+
  private:
   std::shared_ptr<core::logging::Logger> logger_ = core::logging::LoggerFactory<ApplyTemplate>::getLogger();
 };
diff --git a/extensions/civetweb/processors/ListenHTTP.cpp b/extensions/civetweb/processors/ListenHTTP.cpp
index d88f0082a..52a0ae55e 100644
--- a/extensions/civetweb/processors/ListenHTTP.cpp
+++ b/extensions/civetweb/processors/ListenHTTP.cpp
@@ -281,8 +281,10 @@ void ListenHTTP::processRequestBuffer(core::ProcessSession *session) {
     session->add(flow_file);
 
     if (flow_file_buffer_pair.second) {
-      WriteCallback callback(std::move(flow_file_buffer_pair.second));
-      session->write(flow_file, &callback);
+      session->write(flow_file, [request_content = flow_file_buffer_pair.second.get()](const std::shared_ptr<io::BaseStream>& stream) -> int64_t {
+        const auto write_ret = stream->write(request_content->getBuffer());
+        return io::isError(write_ret) ? -1 : gsl::narrow<int64_t>(write_ret);
+      });
     }
 
     session->transfer(flow_file, Success);
@@ -504,15 +506,6 @@ std::unique_ptr<io::BufferStream> ListenHTTP::Handler::createContentBuffer(struc
   return content_buffer;
 }
 
-ListenHTTP::WriteCallback::WriteCallback(std::unique_ptr<io::BufferStream> request_content)
-    : request_content_(std::move(request_content)) {
-}
-
-int64_t ListenHTTP::WriteCallback::process(const std::shared_ptr<io::BaseStream>& stream) {
-  const auto write_ret = stream->write(request_content_->getBuffer());
-  return io::isError(write_ret) ? -1 : gsl::narrow<int64_t>(write_ret);
-}
-
 bool ListenHTTP::isSecure() const {
   return (listeningPort.length() > 0) && *listeningPort.rbegin() == 's';
 }
diff --git a/extensions/civetweb/processors/ListenHTTP.h b/extensions/civetweb/processors/ListenHTTP.h
index 76c9e2c32..683fc0dc8 100644
--- a/extensions/civetweb/processors/ListenHTTP.h
+++ b/extensions/civetweb/processors/ListenHTTP.h
@@ -126,16 +126,6 @@ class ListenHTTP : public core::Processor {
     utils::ConcurrentQueue<FlowFileBufferPair> request_buffer_;
   };
 
-  // Write callback for transferring data from HTTP request to content repo
-  class WriteCallback : public OutputStreamCallback {
-   public:
-    explicit WriteCallback(std::unique_ptr<io::BufferStream>);
-    int64_t process(const std::shared_ptr<io::BaseStream>& stream) override;
-
-   private:
-    std::unique_ptr<io::BufferStream> request_content_;
-  };
-
   static int logMessage(const struct mg_connection *conn, const char *message) {
     try {
       struct mg_context* ctx = mg_get_context(conn);
diff --git a/extensions/gcp/processors/PutGCSObject.cpp b/extensions/gcp/processors/PutGCSObject.cpp
index 9fadfedfe..00a3d0c26 100644
--- a/extensions/gcp/processors/PutGCSObject.cpp
+++ b/extensions/gcp/processors/PutGCSObject.cpp
@@ -114,7 +114,7 @@ const core::Relationship PutGCSObject::Failure("failure", "Files that could not
 
 
 namespace {
-class UploadToGCSCallback : public InputStreamCallback {
+class UploadToGCSCallback {
  public:
   UploadToGCSCallback(gcs::Client& client, std::string bucket, std::string key)
       : bucket_(std::move(bucket)),
@@ -122,7 +122,7 @@ class UploadToGCSCallback : public InputStreamCallback {
         client_(client) {
   }
 
-  int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
+  int64_t operator()(const std::shared_ptr<io::BaseStream>& stream) {
     std::string content;
     content.resize(stream->size());
     const auto read_ret = stream->read(gsl::make_span(content).as_span<std::byte>());
@@ -283,7 +283,7 @@ void PutGCSObject::onTrigger(const std::shared_ptr<core::ProcessContext>& contex
 
   callback.setEncryptionKey(encryption_key_);
 
-  session->read(flow_file, &callback);
+  session->read(flow_file, std::ref(callback));
   auto& result = callback.getResult();
   if (!result.ok()) {
     flow_file->setAttribute(GCS_ERROR_REASON, result.status().error_info().reason());
diff --git a/extensions/http-curl/client/HTTPCallback.h b/extensions/http-curl/client/HTTPCallback.h
index 83df0b31f..c05ad61d1 100644
--- a/extensions/http-curl/client/HTTPCallback.h
+++ b/extensions/http-curl/client/HTTPCallback.h
@@ -53,7 +53,7 @@ namespace utils {
  *  - because of this, all functions that request data at a specific offset are implicit seeks and potentially modify
  *    the current buffer
  */
-class HttpStreamingCallback : public ByteInputCallBack {
+class HttpStreamingCallback final : public ByteInputCallback {
  public:
   HttpStreamingCallback()
       : is_alive_(true),
@@ -78,7 +78,7 @@ class HttpStreamingCallback : public ByteInputCallBack {
     seekInner(lock, pos);
   }
 
-  int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
+  int64_t operator()(const std::shared_ptr<io::BaseStream>& stream) override {
     std::vector<std::byte> vec;
 
     if (stream->size() > 0) {
@@ -89,7 +89,7 @@ class HttpStreamingCallback : public ByteInputCallBack {
     return processInner(std::move(vec));
   }
 
-  virtual int64_t process(const uint8_t* data, size_t size) {
+  int64_t process(const uint8_t* data, size_t size) {
     std::vector<std::byte> vec;
     vec.resize(size);
     memcpy(vec.data(), data, size);
diff --git a/extensions/http-curl/processors/InvokeHTTP.cpp b/extensions/http-curl/processors/InvokeHTTP.cpp
index 1604ef288..3a4516883 100644
--- a/extensions/http-curl/processors/InvokeHTTP.cpp
+++ b/extensions/http-curl/processors/InvokeHTTP.cpp
@@ -294,7 +294,7 @@ void InvokeHTTP::onTrigger(const std::shared_ptr<core::ProcessContext> &context,
   std::string tx_id = generateId();
 
   // Note: callback must be declared before callbackObj so that they are destructed in the correct order
-  std::unique_ptr<utils::ByteInputCallBack> callback = nullptr;
+  std::unique_ptr<utils::ByteInputCallback> callback = nullptr;
   std::unique_ptr<utils::HTTPUploadCallback> callbackObj = nullptr;
 
   // Client declared after the callbacks to make sure the callbacks are still available when the client is destructed
@@ -324,11 +324,11 @@ void InvokeHTTP::onTrigger(const std::shared_ptr<core::ProcessContext> &context,
     logger_->log_trace("InvokeHTTP -- reading flowfile");
     std::shared_ptr<ResourceClaim> claim = flowFile->getResourceClaim();
     if (claim) {
-      callback = std::unique_ptr<utils::ByteInputCallBack>(new utils::ByteInputCallBack());
+      callback = std::make_unique<utils::ByteInputCallback>();
       if (send_body_) {
-        session->read(flowFile, callback.get());
+        session->read(flowFile, std::ref(*callback));
       }
-      callbackObj = std::unique_ptr<utils::HTTPUploadCallback>(new utils::HTTPUploadCallback);
+      callbackObj = std::make_unique<utils::HTTPUploadCallback>();
       callbackObj->ptr = callback.get();
       callbackObj->pos = 0;
       logger_->log_trace("InvokeHTTP -- Setting callback, size is %d", callback->getBufferSize());
diff --git a/extensions/http-curl/protocols/RESTSender.cpp b/extensions/http-curl/protocols/RESTSender.cpp
index af15b4dbf..c94e1c0a4 100644
--- a/extensions/http-curl/protocols/RESTSender.cpp
+++ b/extensions/http-curl/protocols/RESTSender.cpp
@@ -89,7 +89,7 @@ C2Payload RESTSender::sendPayload(const std::string url, const Direction directi
   }
 
   // Callback for transmit. Declared in order to destruct in proper order - take care!
-  std::vector<std::unique_ptr<utils::ByteInputCallBack>> inputs;
+  std::vector<std::unique_ptr<utils::ByteInputCallback>> inputs;
   std::vector<std::unique_ptr<utils::HTTPUploadCallback>> callbacks;
 
   // Callback for transfer. Declared in order to destruct in proper order - take care!
@@ -113,7 +113,7 @@ C2Payload RESTSender::sendPayload(const std::string url, const Direction directi
         if (filename.empty()) {
           throw std::logic_error("Missing filename");
         }
-        auto file_input = std::make_unique<utils::ByteInputCallBack>();
+        auto file_input = std::make_unique<utils::ByteInputCallback>();
         auto file_cb = std::make_unique<utils::HTTPUploadCallback>();
         file_input->write(file.getRawDataAsString());
         file_cb->ptr = file_input.get();
@@ -122,7 +122,7 @@ C2Payload RESTSender::sendPayload(const std::string url, const Direction directi
         callbacks.push_back(std::move(file_cb));
       }
     } else {
-      auto data_input = std::make_unique<utils::ByteInputCallBack>();
+      auto data_input = std::make_unique<utils::ByteInputCallback>();
       auto data_cb = std::make_unique<utils::HTTPUploadCallback>();
       data_input->write(data.value_or(""));
       data_cb->ptr = data_input.get();
diff --git a/extensions/http-curl/tests/HttpPostIntegrationTest.cpp b/extensions/http-curl/tests/HttpPostIntegrationTest.cpp
index eb850f6fb..159aad707 100644
--- a/extensions/http-curl/tests/HttpPostIntegrationTest.cpp
+++ b/extensions/http-curl/tests/HttpPostIntegrationTest.cpp
@@ -49,7 +49,6 @@ class HttpTestHarness : public HTTPIntegrationBase {
     LogTestController::getInstance().setTrace<minifi::processors::InvokeHTTP>();
     LogTestController::getInstance().setDebug<utils::HTTPClient>();
     LogTestController::getInstance().setDebug<minifi::processors::ListenHTTP>();
-    LogTestController::getInstance().setDebug<minifi::processors::ListenHTTP::WriteCallback>();
     LogTestController::getInstance().setDebug<minifi::processors::ListenHTTP::Handler>();
     LogTestController::getInstance().setDebug<minifi::processors::LogAttribute>();
     LogTestController::getInstance().setDebug<core::Processor>();
diff --git a/extensions/jni/jvm/JniProcessSession.cpp b/extensions/jni/jvm/JniProcessSession.cpp
index a82d44e36..5f4f0105c 100644
--- a/extensions/jni/jvm/JniProcessSession.cpp
+++ b/extensions/jni/jvm/JniProcessSession.cpp
@@ -18,21 +18,16 @@
 
 #include "JniProcessSession.h"
 
-#include <string>
 #include <memory>
+#include <string>
 #include <algorithm>
-#include <iterator>
 #include <set>
 #include <utility>
-#include "core/Property.h"
-#include "io/validation.h"
 #include "utils/StringUtils.h"
 #include "utils/file/FileUtils.h"
-#include "properties/Configure.h"
 #include "JVMLoader.h"
 #include "JniReferenceObjects.h"
 
-#include "core/Processor.h"
 #include "JniFlowFile.h"
 #include "../JavaException.h"
 
@@ -76,21 +71,15 @@ JNIEXPORT jobject JNICALL Java_org_apache_nifi_processor_JniProcessSession_readF
   minifi::jni::JniFlowFile *ptr = minifi::jni::JVMLoader::getInstance()->getReference<minifi::jni::JniFlowFile>(env, ff);
   if (ptr->get()) {
     auto jincls = minifi::jni::JVMLoader::getInstance()->load_class("org/apache/nifi/processor/JniInputStream", env);
-
     auto jin = jincls.newInstance(env);
-
     minifi::jni::ThrowIf(env);
 
-    std::unique_ptr<minifi::jni::JniByteInputStream> callback = std::unique_ptr<minifi::jni::JniByteInputStream>(new minifi::jni::JniByteInputStream(4096));
-
-    session->getSession()->read(ptr->get(), callback.get());
-
+    auto callback = std::make_unique<minifi::jni::JniByteInputStream>(4096);
+    session->getSession()->read(ptr->get(), std::ref(*callback));
     auto jniInpuStream = std::make_shared<minifi::jni::JniInputStream>(std::move(callback), jin, session->getServicer());
-
     session->addInputStream(jniInpuStream);
 
     minifi::jni::JVMLoader::getInstance()->setReference(jin, env, jniInpuStream.get());
-
     return jin;
   }
 
@@ -133,8 +122,9 @@ JNIEXPORT jboolean JNICALL Java_org_apache_nifi_processor_JniProcessSession_writ
     jbyte* buffer = env->GetByteArrayElements(byteArray, 0);
     jsize length = env->GetArrayLength(byteArray);
 
-    minifi::jni::JniByteOutStream outStream(buffer, (size_t) length);
-    session->getSession()->write(ptr->get(), &outStream);
+    if (length > 0) {
+      session->getSession()->writeBuffer(ptr->get(), gsl::make_span(reinterpret_cast<std::byte*>(buffer), gsl::narrow<size_t>(length)));
+    }
 
     env->ReleaseByteArrayElements(byteArray, buffer, 0);
 
@@ -436,16 +426,15 @@ JNIEXPORT jboolean JNICALL Java_org_apache_nifi_processor_JniProcessSession_appe
     return false;
   }
   THROW_IF((ff == nullptr || byteArray == nullptr), env, NO_FF_OBJECT);
-  minifi::jni::JniSession *session = minifi::jni::JVMLoader::getPtr<minifi::jni::JniSession>(env, obj);
-  minifi::jni::JniFlowFile *ptr = minifi::jni::JVMLoader::getInstance()->getReference<minifi::jni::JniFlowFile>(env, ff);
+  auto *session = minifi::jni::JVMLoader::getPtr<minifi::jni::JniSession>(env, obj);
+  auto *ptr = minifi::jni::JVMLoader::getInstance()->getReference<minifi::jni::JniFlowFile>(env, ff);
 
   if (ptr->get()) {
     jbyte* buffer = env->GetByteArrayElements(byteArray, 0);
     jsize length = env->GetArrayLength(byteArray);
 
     if (length > 0) {
-      minifi::jni::JniByteOutStream outStream(buffer, (size_t) length);
-      session->getSession()->append(ptr->get(), &outStream);
+      session->getSession()->appendBuffer(ptr->get(), gsl::make_span(reinterpret_cast<std::byte*>(buffer), gsl::narrow<size_t>(length)));
     }
 
     env->ReleaseByteArrayElements(byteArray, buffer, 0);
diff --git a/extensions/jni/jvm/JniReferenceObjects.h b/extensions/jni/jvm/JniReferenceObjects.h
index 66e98333a..366e89546 100644
--- a/extensions/jni/jvm/JniReferenceObjects.h
+++ b/extensions/jni/jvm/JniReferenceObjects.h
@@ -96,33 +96,16 @@ struct check_empty_ff : public std::unary_function<std::shared_ptr<JniFlowFile>,
   }
 };
 
-class JniByteOutStream : public minifi::OutputStreamCallback {
- public:
-  JniByteOutStream(jbyte *bytes, size_t length)
-      : bytes_(bytes),
-        length_(length) {
-  }
-
-  virtual ~JniByteOutStream() = default;
-  virtual int64_t process(const std::shared_ptr<minifi::io::BaseStream>& stream) {
-    const auto write_ret = stream->write(reinterpret_cast<uint8_t*>(bytes_), length_);
-    return io::isError(write_ret) ? -1 : gsl::narrow<int64_t>(write_ret);
-  }
- private:
-  jbyte *bytes_;
-  size_t length_;
-};
-
 /**
  * Jni byte input stream
  */
-class JniByteInputStream : public minifi::InputStreamCallback {
+class JniByteInputStream {
  public:
   explicit JniByteInputStream(uint64_t size)
       : buffer_(size),
         read_size_(0) {
   }
-  int64_t process(const std::shared_ptr<minifi::io::BaseStream>& stream) override {
+  int64_t operator()(const std::shared_ptr<minifi::io::BaseStream>& stream) {
     stream_ = stream;
     return 0;
   }
diff --git a/extensions/libarchive/CompressContent.cpp b/extensions/libarchive/CompressContent.cpp
index 874eb72c0..c03e63809 100644
--- a/extensions/libarchive/CompressContent.cpp
+++ b/extensions/libarchive/CompressContent.cpp
@@ -187,7 +187,7 @@ void CompressContent::processFlowFile(const std::shared_ptr<core::FlowFile>& flo
         if (!compressor.newEntry({filename, in->size()})) {
           return -1;
         }
-        return internal::pipe(in.get(), &compressor);
+        return internal::pipe(*in, compressor);
       };
     } else {
       transformer = [&] (const std::shared_ptr<io::InputStream>& in, const std::shared_ptr<io::OutputStream>& out) -> int64_t {
@@ -196,7 +196,7 @@ void CompressContent::processFlowFile(const std::shared_ptr<core::FlowFile>& flo
           success = false;
           return 0;  // prevents a session rollback
         }
-        auto ret = internal::pipe(&decompressor, out.get());
+        auto ret = internal::pipe(decompressor, *out);
         if (ret < 0) {
           success = false;
           return 0;  // prevents a session rollback
@@ -204,14 +204,14 @@ void CompressContent::processFlowFile(const std::shared_ptr<core::FlowFile>& flo
         return ret;
       };
     }
-    session->write(result, FunctionOutputStreamCallback([&] (const auto& out) {
-      return session->read(flowFile, FunctionInputStreamCallback([&] (const auto& in) {
+    session->write(result, [&] (const auto& out) {
+      return session->read(flowFile, [&] (const auto& in) {
         return transformer(in, out);
-      }));
-    }));
+      });
+    });
   } else {
     CompressContent::GzipWriteCallback callback(compressMode_, compressLevel_, flowFile, session);
-    session->write(result, &callback);
+    session->write(result, std::ref(callback));
     success = callback.success_;
   }
 
diff --git a/extensions/libarchive/CompressContent.h b/extensions/libarchive/CompressContent.h
index b485d52f6..f41adba00 100644
--- a/extensions/libarchive/CompressContent.h
+++ b/extensions/libarchive/CompressContent.h
@@ -88,7 +88,7 @@ class CompressContent : public core::Processor {
   )
 
  public:
-  class GzipWriteCallback : public OutputStreamCallback {
+  class GzipWriteCallback {
    public:
     GzipWriteCallback(CompressionMode compress_mode, int compress_level, std::shared_ptr<core::FlowFile> flow, std::shared_ptr<core::ProcessSession> session)
       : compress_mode_(std::move(compress_mode))
@@ -104,47 +104,33 @@ class CompressContent : public core::Processor {
     std::shared_ptr<core::ProcessSession> session_;
     bool success_{false};
 
-    int64_t process(const std::shared_ptr<io::BaseStream>& outputStream) override {
-      class ReadCallback : public InputStreamCallback {
-       public:
-        ReadCallback(GzipWriteCallback& writer, std::shared_ptr<io::OutputStream> outputStream)
-          : writer_(writer)
-          , outputStream_(std::move(outputStream)) {
-        }
-
-        int64_t process(const std::shared_ptr<io::BaseStream>& inputStream) override {
-          std::vector<std::byte> buffer(16 * 1024U);
-          size_t read_size = 0;
-          while (read_size < writer_.flow_->getSize()) {
-            const auto ret = inputStream->read(buffer);
-            if (io::isError(ret)) {
-              return -1;
-            } else if (ret == 0) {
-              break;
-            } else {
-              const auto writeret = outputStream_->write(gsl::make_span(buffer).subspan(0, ret));
-              if (io::isError(writeret) || gsl::narrow<size_t>(writeret) != ret) {
-                return -1;
-              }
-              read_size += ret;
-            }
-          }
-          outputStream_->close();
-          return gsl::narrow<int64_t>(read_size);
-        }
-
-        GzipWriteCallback& writer_;
-        std::shared_ptr<io::OutputStream> outputStream_;
-      };
-
+    int64_t operator()(const std::shared_ptr<io::BaseStream>& output_stream) {
       std::shared_ptr<io::ZlibBaseStream> filterStream;
       if (compress_mode_ == CompressionMode::Compress) {
-        filterStream = std::make_shared<io::ZlibCompressStream>(gsl::make_not_null(outputStream.get()), io::ZlibCompressionFormat::GZIP, compress_level_);
+        filterStream = std::make_shared<io::ZlibCompressStream>(gsl::make_not_null(output_stream.get()), io::ZlibCompressionFormat::GZIP, compress_level_);
       } else {
-        filterStream = std::make_shared<io::ZlibDecompressStream>(gsl::make_not_null(outputStream.get()), io::ZlibCompressionFormat::GZIP);
+        filterStream = std::make_shared<io::ZlibDecompressStream>(gsl::make_not_null(output_stream.get()), io::ZlibCompressionFormat::GZIP);
       }
-      ReadCallback readCb(*this, filterStream);
-      session_->read(flow_, &readCb);
+      session_->read(flow_, [this, &filterStream](const std::shared_ptr<io::BaseStream>& input_stream) -> int64_t {
+        std::vector<std::byte> buffer(16 * 1024U);
+        size_t read_size = 0;
+        while (read_size < flow_->getSize()) {
+          const auto ret = input_stream->read(buffer);
+          if (io::isError(ret)) {
+            return -1;
+          } else if (ret == 0) {
+            break;
+          } else {
+            const auto writeret = filterStream->write(gsl::make_span(buffer).subspan(0, ret));
+            if (io::isError(writeret) || gsl::narrow<size_t>(writeret) != ret) {
+              return -1;
+            }
+            read_size += ret;
+          }
+        }
+        filterStream->close();
+        return gsl::narrow<int64_t>(read_size);
+      });
 
       success_ = filterStream->isFinished();
 
diff --git a/extensions/libarchive/FocusArchiveEntry.cpp b/extensions/libarchive/FocusArchiveEntry.cpp
index 43caa9cfa..b38378b9c 100644
--- a/extensions/libarchive/FocusArchiveEntry.cpp
+++ b/extensions/libarchive/FocusArchiveEntry.cpp
@@ -72,8 +72,7 @@ void FocusArchiveEntry::onTrigger(core::ProcessContext *context, core::ProcessSe
   context->getProperty(Path.getName(), archiveMetadata.focusedEntry);
   flowFile->getAttribute("filename", archiveMetadata.archiveName);
 
-  ReadCallback cb(this, &file_man, &archiveMetadata);
-  session->read(flowFile, &cb);
+  session->read(flowFile, ReadCallback{this, &file_man, &archiveMetadata});
 
   // For each extracted entry, import & stash to key
   std::string targetEntryStashKey;
@@ -164,7 +163,7 @@ la_ssize_t FocusArchiveEntry::ReadCallback::read_cb(struct archive * a, void *d,
   return gsl::narrow<la_ssize_t>(read);
 }
 
-int64_t FocusArchiveEntry::ReadCallback::process(const std::shared_ptr<io::BaseStream>& stream) {
+int64_t FocusArchiveEntry::ReadCallback::operator()(const std::shared_ptr<io::BaseStream>& stream) const {
   auto inputArchive = archive_read_new();
   struct archive_entry *entry;
   int64_t nlen = 0;
@@ -184,7 +183,7 @@ int64_t FocusArchiveEntry::ReadCallback::process(const std::shared_ptr<io::BaseS
     return nlen;
   }
 
-  while (isRunning()) {
+  while (proc_->isRunning()) {
     res = archive_read_next_header(inputArchive, &entry);
 
     if (res == ARCHIVE_EOF) {
@@ -255,8 +254,6 @@ FocusArchiveEntry::ReadCallback::ReadCallback(core::Processor *processor, utils:
   _archiveMetadata = archiveMetadata;
 }
 
-FocusArchiveEntry::ReadCallback::~ReadCallback() = default;
-
 REGISTER_RESOURCE(FocusArchiveEntry, "Allows manipulation of entries within an archive (e.g. TAR) by focusing on one entry within the archive at a time. "
     "When an archive entry is focused, that entry is treated as the content of the FlowFile and may be manipulated independently of the rest of the archive."
     " To restore the FlowFile to its original state, use UnfocusArchiveEntry.");
diff --git a/extensions/libarchive/FocusArchiveEntry.h b/extensions/libarchive/FocusArchiveEntry.h
index 1c35e2b5e..9b5d3e16e 100644
--- a/extensions/libarchive/FocusArchiveEntry.h
+++ b/extensions/libarchive/FocusArchiveEntry.h
@@ -64,11 +64,10 @@ class FocusArchiveEntry : public core::Processor {
   //! Initialize, over write by NiFi FocusArchiveEntry
   void initialize() override;
 
-  class ReadCallback : public InputStreamCallback {
+  class ReadCallback {
    public:
     explicit ReadCallback(core::Processor*, utils::file::FileManager *file_man, ArchiveMetadata *archiveMetadata);
-    ~ReadCallback() override;
-    int64_t process(const std::shared_ptr<io::BaseStream>& stream) override;
+    int64_t operator()(const std::shared_ptr<io::BaseStream>& stream) const;
     bool isRunning() {return proc_->isRunning();}
 
    private:
diff --git a/extensions/libarchive/ManipulateArchive.cpp b/extensions/libarchive/ManipulateArchive.cpp
index cabf103b7..b62dcbab2 100644
--- a/extensions/libarchive/ManipulateArchive.cpp
+++ b/extensions/libarchive/ManipulateArchive.cpp
@@ -126,8 +126,7 @@ void ManipulateArchive::onTrigger(core::ProcessContext* /*context*/, core::Proce
     ArchiveMetadata archiveMetadata;
     utils::file::FileManager file_man;
 
-    FocusArchiveEntry::ReadCallback readCallback(this, &file_man, &archiveMetadata);
-    session->read(flowFile, &readCallback);
+    session->read(flowFile, FocusArchiveEntry::ReadCallback{this, &file_man, &archiveMetadata});
 
     auto entries_end = archiveMetadata.entryMetadata.end();
 
@@ -210,9 +209,7 @@ void ManipulateArchive::onTrigger(core::ProcessContext* /*context*/, core::Proce
         archiveMetadata.entryMetadata.insert(position, touchEntry);
     }
 
-    UnfocusArchiveEntry::WriteCallback writeCallback(&archiveMetadata);
-    session->write(flowFile, &writeCallback);
-
+    session->write(flowFile, UnfocusArchiveEntry::WriteCallback{&archiveMetadata});
     session->transfer(flowFile, Success);
 }
 
diff --git a/extensions/libarchive/MergeContent.cpp b/extensions/libarchive/MergeContent.cpp
index 15a508694..d0f4461ab 100644
--- a/extensions/libarchive/MergeContent.cpp
+++ b/extensions/libarchive/MergeContent.cpp
@@ -295,7 +295,7 @@ bool MergeContent::processBin(core::ProcessContext *context, core::ProcessSessio
     return false;
   }
 
-  auto flowFileReader = [&] (const std::shared_ptr<core::FlowFile>& ff, InputStreamCallback* cb) {
+  auto flowFileReader = [&] (const std::shared_ptr<core::FlowFile>& ff, const io::InputStreamCallback& cb) {
     return session->read(ff, cb);
   };
 
@@ -349,8 +349,7 @@ BinaryConcatenationMerge::BinaryConcatenationMerge(const std::string &header, co
 
 void BinaryConcatenationMerge::merge(core::ProcessContext* /*context*/, core::ProcessSession *session,
     std::deque<std::shared_ptr<core::FlowFile>> &flows, FlowFileSerializer& serializer, const std::shared_ptr<core::FlowFile>& merge_flow) {
-  BinaryConcatenationMerge::WriteCallback callback(header_, footer_, demarcator_, flows, serializer);
-  session->write(merge_flow, &callback);
+  session->write(merge_flow, BinaryConcatenationMerge::WriteCallback{header_, footer_, demarcator_, flows, serializer});
   std::string fileName;
   if (flows.size() == 1) {
     flows.front()->getAttribute(core::SpecialFlowAttribute::FILENAME, fileName);
@@ -363,8 +362,7 @@ void BinaryConcatenationMerge::merge(core::ProcessContext* /*context*/, core::Pr
 
 void TarMerge::merge(core::ProcessContext* /*context*/, core::ProcessSession *session,
     std::deque<std::shared_ptr<core::FlowFile>> &flows, FlowFileSerializer& serializer, const std::shared_ptr<core::FlowFile>& merge_flow) {
-  ArchiveMerge::WriteCallback callback(std::string(merge_content_options::MERGE_FORMAT_TAR_VALUE), flows, serializer);
-  session->write(merge_flow, &callback);
+  session->write(merge_flow, ArchiveMerge::WriteCallback{merge_content_options::MERGE_FORMAT_TAR_VALUE, flows, serializer});
   std::string fileName;
   merge_flow->getAttribute(core::SpecialFlowAttribute::FILENAME, fileName);
   if (flows.size() == 1) {
@@ -380,8 +378,7 @@ void TarMerge::merge(core::ProcessContext* /*context*/, core::ProcessSession *se
 
 void ZipMerge::merge(core::ProcessContext* /*context*/, core::ProcessSession *session,
     std::deque<std::shared_ptr<core::FlowFile>> &flows, FlowFileSerializer& serializer, const std::shared_ptr<core::FlowFile>& merge_flow) {
-  ArchiveMerge::WriteCallback callback(std::string(merge_content_options::MERGE_FORMAT_ZIP_VALUE), flows, serializer);
-  session->write(merge_flow, &callback);
+  session->write(merge_flow, ArchiveMerge::WriteCallback{merge_content_options::MERGE_FORMAT_ZIP_VALUE, flows, serializer});
   std::string fileName;
   merge_flow->getAttribute(core::SpecialFlowAttribute::FILENAME, fileName);
   if (flows.size() == 1) {
diff --git a/extensions/libarchive/MergeContent.h b/extensions/libarchive/MergeContent.h
index ca139cf0f..7cb6b24fe 100644
--- a/extensions/libarchive/MergeContent.h
+++ b/extensions/libarchive/MergeContent.h
@@ -21,6 +21,7 @@
 
 #include <deque>
 #include <map>
+#include <utility>
 #include <vector>
 #include <memory>
 #include <string>
@@ -72,7 +73,7 @@ class BinaryConcatenationMerge : public MergeBin {
   void merge(core::ProcessContext *context, core::ProcessSession *session,
       std::deque<std::shared_ptr<core::FlowFile>> &flows, FlowFileSerializer& serializer, const std::shared_ptr<core::FlowFile> &flowFile) override;
   // Nest Callback Class for write stream
-  class WriteCallback: public OutputStreamCallback {
+  class WriteCallback {
    public:
     WriteCallback(std::string &header, std::string &footer, std::string &demarcator,
         std::deque<std::shared_ptr<core::FlowFile>> &flows, FlowFileSerializer& serializer) :
@@ -85,7 +86,7 @@ class BinaryConcatenationMerge : public MergeBin {
     std::deque<std::shared_ptr<core::FlowFile>> &flows_;
     FlowFileSerializer& serializer_;
 
-    int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
+    int64_t operator()(const std::shared_ptr<io::BaseStream>& stream) const {
       size_t write_size_sum = 0;
       if (!header_.empty()) {
         const auto write_ret = stream->write(reinterpret_cast<const uint8_t*>(header_.data()), header_.size());
@@ -160,16 +161,15 @@ class ArchiveMerge {
     bool header_emitted_{false};
   };
   // Nest Callback Class for write stream
-  class WriteCallback: public OutputStreamCallback {
+  class WriteCallback {
    public:
     WriteCallback(std::string merge_type, std::deque<std::shared_ptr<core::FlowFile>> &flows, FlowFileSerializer& serializer)
-        : merge_type_(merge_type),
+        : merge_type_(std::move(merge_type)),
           flows_(flows),
           serializer_(serializer) {
       size_ = 0;
       stream_ = nullptr;
     }
-    ~WriteCallback() override = default;
 
     std::string merge_type_;
     std::deque<std::shared_ptr<core::FlowFile>> &flows_;
@@ -199,7 +199,7 @@ class ArchiveMerge {
       return totalWrote;
     }
 
-    int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
+    int64_t operator()(const std::shared_ptr<io::BaseStream>& stream) {
       struct archive *arch;
 
       arch = archive_write_new();
diff --git a/extensions/libarchive/UnfocusArchiveEntry.cpp b/extensions/libarchive/UnfocusArchiveEntry.cpp
index c32d2a600..2f6c1ac74 100644
--- a/extensions/libarchive/UnfocusArchiveEntry.cpp
+++ b/extensions/libarchive/UnfocusArchiveEntry.cpp
@@ -138,7 +138,7 @@ void UnfocusArchiveEntry::onTrigger(core::ProcessContext *context, core::Process
 
   // Create archive by restoring each entry in the archive from tmp files
   WriteCallback cb(&lensArchiveMetadata);
-  session->write(flowFile, &cb);
+  session->write(flowFile, std::cref(cb));
 
   // Transfer to the relationship
   session->transfer(flowFile, Success);
@@ -158,7 +158,7 @@ la_ssize_t UnfocusArchiveEntry::WriteCallback::write_cb(struct archive *, void *
   return io::isError(write_ret) ? -1 : gsl::narrow<la_ssize_t>(write_ret);
 }
 
-int64_t UnfocusArchiveEntry::WriteCallback::process(const std::shared_ptr<io::BaseStream>& stream) {
+int64_t UnfocusArchiveEntry::WriteCallback::operator()(const std::shared_ptr<io::BaseStream>& stream) const {
   auto outputArchive = archive_write_new();
   int64_t nlen = 0;
 
diff --git a/extensions/libarchive/UnfocusArchiveEntry.h b/extensions/libarchive/UnfocusArchiveEntry.h
index 6ada17eb3..905e886f5 100644
--- a/extensions/libarchive/UnfocusArchiveEntry.h
+++ b/extensions/libarchive/UnfocusArchiveEntry.h
@@ -64,10 +64,10 @@ class UnfocusArchiveEntry : public core::Processor {
   void initialize() override;
 
   //! Write callback for reconstituting lensed archive into flow file content
-  class WriteCallback : public OutputStreamCallback {
+  class WriteCallback {
    public:
     explicit WriteCallback(ArchiveMetadata *archiveMetadata);
-    int64_t process(const std::shared_ptr<io::BaseStream>& stream) override;
+    int64_t operator()(const std::shared_ptr<io::BaseStream>& stream) const;
    private:
     //! Logger
     std::shared_ptr<Logger> logger_ = core::logging::LoggerFactory<UnfocusArchiveEntry>::getLogger();
diff --git a/extensions/librdkafka/ConsumeKafka.cpp b/extensions/librdkafka/ConsumeKafka.cpp
index 555d6e6d8..5182db4bb 100644
--- a/extensions/librdkafka/ConsumeKafka.cpp
+++ b/extensions/librdkafka/ConsumeKafka.cpp
@@ -493,8 +493,7 @@ std::optional<std::vector<std::shared_ptr<FlowFileRecord>>> ConsumeKafka::transf
         return {};
       }
       // flowfile content is consumed here
-      WriteCallback stream_writer_callback(&flowfile_content[0], flowfile_content.size());
-      session.write(flow_file, &stream_writer_callback);
+      session.writeBuffer(flow_file, flowfile_content);
       for (const auto& kv : attributes_from_headers) {
         flow_file->setAttribute(kv.first, kv.second);
       }
@@ -538,13 +537,6 @@ void ConsumeKafka::onTrigger(core::ProcessContext* /* context */, core::ProcessS
   process_pending_messages(*session);
 }
 
-int64_t ConsumeKafka::WriteCallback::process(const std::shared_ptr<io::BaseStream>& stream) {
-  if (!data_) return 0;
-  const auto write_ret = stream->write(data_, dataSize_);
-  if (io::isError(write_ret)) return -1;
-  return gsl::narrow<int64_t>(write_ret);
-}
-
 REGISTER_RESOURCE(ConsumeKafka, "Consumes messages from Apache Kafka and transform them into MiNiFi FlowFiles. "
     "The application should make sure that the processor is triggered at regular intervals, even if no messages are expected, "
     "to serve any queued callbacks waiting to be called. Rebalancing can also only happen on trigger."); // NOLINT
diff --git a/extensions/librdkafka/ConsumeKafka.h b/extensions/librdkafka/ConsumeKafka.h
index bb7ebee2d..208bd1988 100644
--- a/extensions/librdkafka/ConsumeKafka.h
+++ b/extensions/librdkafka/ConsumeKafka.h
@@ -141,17 +141,6 @@ class ConsumeKafka : public KafkaProcessorBase {
   void process_pending_messages(core::ProcessSession& session);
 
  private:
-  class WriteCallback : public OutputStreamCallback {
-   public:
-    WriteCallback(char *data, uint64_t size) :
-        data_(reinterpret_cast<uint8_t*>(data)),
-        dataSize_(size) {}
-    int64_t process(const std::shared_ptr<io::BaseStream>& stream);
-   private:
-    uint8_t* data_;
-    uint64_t dataSize_;
-  };
-
   core::annotation::Input getInputRequirement() const override {
     return core::annotation::Input::INPUT_FORBIDDEN;
   }
diff --git a/extensions/librdkafka/PublishKafka.cpp b/extensions/librdkafka/PublishKafka.cpp
index 39111795b..72756be72 100644
--- a/extensions/librdkafka/PublishKafka.cpp
+++ b/extensions/librdkafka/PublishKafka.cpp
@@ -262,7 +262,7 @@ class PublishKafka::Messages {
 };
 
 namespace {
-class ReadCallback : public InputStreamCallback {
+class ReadCallback {
  public:
   struct rd_kafka_headers_deleter {
     void operator()(rd_kafka_headers_t* ptr) const noexcept {
@@ -357,7 +357,7 @@ class ReadCallback : public InputStreamCallback {
   ReadCallback(const ReadCallback&) = delete;
   ReadCallback& operator=(ReadCallback) = delete;
 
-  int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
+  int64_t operator()(const std::shared_ptr<io::BaseStream>& stream) {
     std::vector<std::byte> buffer;
 
     buffer.resize(max_seg_size_);
@@ -870,11 +870,11 @@ void PublishKafka::onTrigger(const std::shared_ptr<core::ProcessContext> &contex
 
     ReadCallback callback(max_flow_seg_size_, kafkaKey, thisTopic->getTopic(), conn_->getConnection(), *flowFile,
                                         attributeNameRegex_, messages, flow_file_index, failEmptyFlowFiles, logger_);
-    session->read(flowFile, &callback);
+    session->read(flowFile, std::ref(callback));
 
     if (!callback.called_) {
       // workaround: call callback since ProcessSession doesn't do so for empty flow files without resource claims
-      callback.process(nullptr);
+      callback(nullptr);
     }
 
     if (flowFile->getSize() == 0 && failEmptyFlowFiles) {
diff --git a/extensions/mqtt/processors/ConsumeMQTT.cpp b/extensions/mqtt/processors/ConsumeMQTT.cpp
index ba60d2a80..8290c8308 100644
--- a/extensions/mqtt/processors/ConsumeMQTT.cpp
+++ b/extensions/mqtt/processors/ConsumeMQTT.cpp
@@ -94,9 +94,20 @@ void ConsumeMQTT::onTrigger(const std::shared_ptr<core::ProcessContext>& /*conte
   while (!msg_queue.empty()) {
     MQTTClient_message *message = msg_queue.front();
     std::shared_ptr<core::FlowFile> processFlowFile = session->create();
-    ConsumeMQTT::WriteCallback callback(message);
-    session->write(processFlowFile, &callback);
-    if (callback.status_ < 0) {
+    int write_status{};
+    session->write(processFlowFile, [message, &write_status](const std::shared_ptr<io::BaseStream>& stream) -> int64_t {
+      if (message->payloadlen < 0) {
+        write_status = -1;
+        return -1;
+      }
+      const auto len = stream->write(reinterpret_cast<uint8_t*>(message->payload), gsl::narrow<size_t>(message->payloadlen));
+      if (io::isError(len)) {
+        write_status = -1;
+        return -1;
+      }
+      return gsl::narrow<int64_t>(len);
+    });
+    if (write_status < 0) {
       logger_->log_error("ConsumeMQTT fail for the flow with UUID %s", processFlowFile->getUUIDStr());
       session->remove(processFlowFile);
     } else {
diff --git a/extensions/mqtt/processors/ConsumeMQTT.h b/extensions/mqtt/processors/ConsumeMQTT.h
index f76c2af08..cad8461af 100644
--- a/extensions/mqtt/processors/ConsumeMQTT.h
+++ b/extensions/mqtt/processors/ConsumeMQTT.h
@@ -72,28 +72,6 @@ class ConsumeMQTT : public processors::AbstractMQTTProcessor {
 
   static core::Relationship Success;
 
-  // Nest Callback Class for write stream
-  class WriteCallback : public OutputStreamCallback {
-   public:
-    explicit WriteCallback(MQTTClient_message *message)
-        : message_(message) {
-    }
-    MQTTClient_message *message_;
-    int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
-      if (message_->payloadlen < 0) {
-        status_ = -1;
-        return -1;
-      }
-      const auto len = stream->write(reinterpret_cast<uint8_t*>(message_->payload), gsl::narrow<size_t>(message_->payloadlen));
-      if (io::isError(len)) {
-        status_ = -1;
-        return -1;
-      }
-      return gsl::narrow<int64_t>(len);
-    }
-    int status_ = 0;
-  };
-
  public:
   /**
    * Function that's executed when the processor is scheduled.
diff --git a/extensions/mqtt/processors/ConvertHeartBeat.cpp b/extensions/mqtt/processors/ConvertHeartBeat.cpp
index 5b7e14cd9..69446b5de 100644
--- a/extensions/mqtt/processors/ConvertHeartBeat.cpp
+++ b/extensions/mqtt/processors/ConvertHeartBeat.cpp
@@ -58,7 +58,7 @@ void ConvertHeartBeat::onTrigger(const std::shared_ptr<core::ProcessContext> &co
       minifi::utils::StreamOutputCallback byteCallback(serialized.size() + 1);
       byteCallback.write(const_cast<char*>(serialized.c_str()), serialized.size());
       auto newff = session->create();
-      session->write(newff, &byteCallback);
+      session->write(newff, std::ref(byteCallback));
       session->transfer(newff, Success);
       received_heartbeat = true;
     } else {
diff --git a/extensions/mqtt/processors/ConvertJSONAck.cpp b/extensions/mqtt/processors/ConvertJSONAck.cpp
index d46c53124..1d44905ec 100644
--- a/extensions/mqtt/processors/ConvertJSONAck.cpp
+++ b/extensions/mqtt/processors/ConvertJSONAck.cpp
@@ -33,14 +33,10 @@ std::string ConvertJSONAck::parseTopicName(const std::string &json) {
 
   try {
     rapidjson::ParseResult ok = root.Parse(json.c_str());
-    if (ok) {
-      if (root.HasMember("agentInfo")) {
-        if (root["agentInfo"].HasMember("identifier")) {
-          std::stringstream topicStr;
-          topicStr << root["agentInfo"]["identifier"].GetString() << "/in";
-          return topicStr.str();
-        }
-      }
+    if (ok && root.HasMember("agentInfo") && root["agentInfo"].HasMember("identifier")) {
+      std::stringstream topicStr;
+      topicStr << root["agentInfo"]["identifier"].GetString() << "/in";
+      return topicStr.str();
     }
   } catch (...) {
   }
@@ -68,6 +64,7 @@ void ConvertJSONAck::onTrigger(const std::shared_ptr<core::ProcessContext> &cont
     topic = parseTopicName(to_string(read_result));
     session->transfer(flow, Success);
   }
+
   flow = session->get();
   if (!flow) {
     return;
@@ -76,8 +73,8 @@ void ConvertJSONAck::onTrigger(const std::shared_ptr<core::ProcessContext> &cont
   if (!topic.empty()) {
     const auto read_result = session->readBuffer(flow);
     c2::C2Payload response_payload(c2::Operation::HEARTBEAT, state::UpdateState::READ_COMPLETE, true);
-    auto payload = parseJsonResponse(response_payload, read_result.buffer);
-    auto stream = c2::PayloadSerializer::serialize(1, payload);
+    const auto payload = parseJsonResponse(response_payload, read_result.buffer);
+    const auto stream = c2::PayloadSerializer::serialize(1, payload);
     mqtt_service_->send(topic, stream->getBuffer());
   }
 
diff --git a/extensions/mqtt/processors/ConvertJSONAck.h b/extensions/mqtt/processors/ConvertJSONAck.h
index 49cccea35..bfe458b56 100644
--- a/extensions/mqtt/processors/ConvertJSONAck.h
+++ b/extensions/mqtt/processors/ConvertJSONAck.h
@@ -28,7 +28,6 @@
 #include "core/Core.h"
 #include "core/logging/LoggerConfiguration.h"
 #include "ConvertBase.h"
-#include "utils/gsl.h"
 
 
 namespace org::apache::nifi::minifi::processors {
diff --git a/extensions/mqtt/processors/PublishMQTT.cpp b/extensions/mqtt/processors/PublishMQTT.cpp
index 1a3686e8b..54cfd0aed 100644
--- a/extensions/mqtt/processors/PublishMQTT.cpp
+++ b/extensions/mqtt/processors/PublishMQTT.cpp
@@ -90,7 +90,7 @@ void PublishMQTT::onTrigger(const std::shared_ptr<core::ProcessContext>& /*conte
   }
 
   PublishMQTT::ReadCallback callback(flowFile->getSize(), max_seg_size_, topic_, client_, qos_, retain_, delivered_token_);
-  session->read(flowFile, &callback);
+  session->read(flowFile, std::ref(callback));
   if (callback.status_ < 0) {
     logger_->log_error("Failed to send flow to MQTT topic %s", topic_);
     session->transfer(flowFile, Failure);
diff --git a/extensions/mqtt/processors/PublishMQTT.h b/extensions/mqtt/processors/PublishMQTT.h
index 834a386f9..ff58f61a0 100644
--- a/extensions/mqtt/processors/PublishMQTT.h
+++ b/extensions/mqtt/processors/PublishMQTT.h
@@ -65,7 +65,7 @@ class PublishMQTT : public processors::AbstractMQTTProcessor {
   static core::Relationship Success;
 
   // Nest Callback Class for read stream
-  class ReadCallback : public InputStreamCallback {
+  class ReadCallback {
    public:
     ReadCallback(uint64_t flow_size, uint64_t max_seg_size, const std::string &key, MQTTClient client, int qos, bool retain, MQTTClient_deliveryToken &token)
         : flow_size_(flow_size),
@@ -78,8 +78,7 @@ class PublishMQTT : public processors::AbstractMQTTProcessor {
       status_ = 0;
       read_size_ = 0;
     }
-    ~ReadCallback() override = default;
-    int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
+    int64_t operator()(const std::shared_ptr<io::BaseStream>& stream) {
       if (flow_size_ < max_seg_size_)
         max_seg_size_ = flow_size_;
       gsl_Expects(max_seg_size_ < gsl::narrow<uint64_t>(std::numeric_limits<int>::max()));
diff --git a/extensions/opc/include/fetchopc.h b/extensions/opc/include/fetchopc.h
index 64bc44354..d430ec7e9 100644
--- a/extensions/opc/include/fetchopc.h
+++ b/extensions/opc/include/fetchopc.h
@@ -63,7 +63,7 @@ class FetchOPCProcessor : public BaseOPCProcessor {
 
   void onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) override;
 
-  void initialize(void) override;
+  void initialize() override;
 
  protected:
   bool nodeFoundCallBack(opc::Client& client, const UA_ReferenceDescription *ref, const std::string& path,
@@ -71,17 +71,6 @@ class FetchOPCProcessor : public BaseOPCProcessor {
 
   void OPCData2FlowFile(const opc::NodeData& opcnode, const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session);
 
-  class WriteCallback : public OutputStreamCallback {
-    std::string data_;
-   public:
-    explicit WriteCallback(std::string&& data)
-      : data_(data) {
-    }
-    int64_t process(const std::shared_ptr<io::BaseStream>& stream) {
-      const auto write_ret = stream->write(reinterpret_cast<const uint8_t*>(data_.c_str()), data_.size());
-      return io::isError(write_ret) ? -1 : gsl::narrow<int64_t>(write_ret);
-    }
-  };
   std::string nodeID_;
   int32_t nameSpaceIdx_;
   opc::OPCNodeIDType idType_;
diff --git a/extensions/opc/src/fetchopc.cpp b/extensions/opc/src/fetchopc.cpp
index cedade7b8..60295436e 100644
--- a/extensions/opc/src/fetchopc.cpp
+++ b/extensions/opc/src/fetchopc.cpp
@@ -216,10 +216,9 @@ namespace processors {
     for (const auto& attr : opcnode.attributes) {
       flowFile->setAttribute(attr.first, attr.second);
     }
-    if (opcnode.data.size() > 0) {
+    if (!opcnode.data.empty()) {
       try {
-        FetchOPCProcessor::WriteCallback callback(opc::nodeValue2String(opcnode));
-        session->write(flowFile, &callback);
+        session->writeBuffer(flowFile, opc::nodeValue2String(opcnode));
       } catch (const std::exception& e) {
         std::string browsename;
         flowFile->getAttribute("Browsename", browsename);
diff --git a/extensions/opc/src/putopc.cpp b/extensions/opc/src/putopc.cpp
index bb04e8d50..6137b42fc 100644
--- a/extensions/opc/src/putopc.cpp
+++ b/extensions/opc/src/putopc.cpp
@@ -19,6 +19,7 @@
 
 #include <list>
 #include <memory>
+#include <set>
 #include <string>
 
 #include "opc.h"
diff --git a/extensions/opencv/CaptureRTSPFrame.cpp b/extensions/opencv/CaptureRTSPFrame.cpp
index 5b11ab76e..6c0350673 100644
--- a/extensions/opencv/CaptureRTSPFrame.cpp
+++ b/extensions/opencv/CaptureRTSPFrame.cpp
@@ -148,8 +148,6 @@ void CaptureRTSPFrame::onTrigger(const std::shared_ptr<core::ProcessContext> &co
   // retrieve a frame of your source
   if (video_capture_.read(frame)) {
     if (!frame.empty()) {
-      CaptureRTSPFrameWriteCallback write_cb(frame, image_encoding_);
-
       auto t = std::time(nullptr);
       auto tm = *std::localtime(&t);
 
@@ -161,7 +159,12 @@ void CaptureRTSPFrame::onTrigger(const std::shared_ptr<core::ProcessContext> &co
       session->putAttribute(flow_file, "filename", filename);
       session->putAttribute(flow_file, "video.backend.driver", video_backend_driver_);
 
-      session->write(flow_file, &write_cb);
+      session->write(flow_file, [&frame, this](const std::shared_ptr<io::BaseStream>& output_stream) -> int64_t {
+        std::vector<uchar> image_buf;
+        imencode(image_encoding_, frame, image_buf);
+        const auto ret = output_stream->write(image_buf.data(), image_buf.size());
+        return io::isError(ret) ? -1 : gsl::narrow<int64_t>(ret);
+      });
       session->transfer(flow_file, Success);
       logger_->log_info("A frame is captured");
     } else {
diff --git a/extensions/opencv/CaptureRTSPFrame.h b/extensions/opencv/CaptureRTSPFrame.h
index 3b9c72a8e..f0f472661 100644
--- a/extensions/opencv/CaptureRTSPFrame.h
+++ b/extensions/opencv/CaptureRTSPFrame.h
@@ -32,12 +32,7 @@
 #include "utils/gsl.h"
 #include "utils/Export.h"
 
-
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace processors {
+namespace org::apache::nifi::minifi::processors {
 
 class CaptureRTSPFrame : public core::Processor {
  public:
@@ -65,25 +60,6 @@ class CaptureRTSPFrame : public core::Processor {
 
   void notifyStop() override;
 
-  class CaptureRTSPFrameWriteCallback : public OutputStreamCallback {
-   public:
-    explicit CaptureRTSPFrameWriteCallback(cv::Mat image_mat, std::string image_encoding)
-        : image_mat_(std::move(image_mat)), image_encoding_(image_encoding) {
-    }
-    ~CaptureRTSPFrameWriteCallback() override = default;
-
-    int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
-      imencode(image_encoding_, image_mat_, image_buf_);
-      const auto ret = stream->write(image_buf_.data(), image_buf_.size());
-      return io::isError(ret) ? -1 : gsl::narrow<int64_t>(ret);
-    }
-
-   private:
-    std::vector<uchar> image_buf_;
-    cv::Mat image_mat_;
-    std::string image_encoding_;
-  };
-
  private:
   std::shared_ptr<core::logging::Logger> logger_ = core::logging::LoggerFactory<CaptureRTSPFrame>::getLogger();
   std::mutex mutex_;
@@ -132,8 +108,4 @@ class CaptureRTSPFrame : public core::Processor {
 //  std::shared_ptr<minifi::controllers::SSLContextService> ssl_service_;
 };
 
-} /* namespace processors */
-} /* namespace minifi */
-} /* namespace nifi */
-} /* namespace apache */
-} /* namespace org */
+}   // namespace org::apache::nifi::minifi::processors
diff --git a/extensions/opencv/FrameIO.h b/extensions/opencv/FrameIO.h
deleted file mode 100644
index 75c6a90ff..000000000
--- a/extensions/opencv/FrameIO.h
+++ /dev/null
@@ -1,79 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#pragma once
-
-#include <utility>
-#include <vector>
-#include <memory>
-#include <string>
-
-#include "io/StreamPipe.h"
-#include "utils/gsl.h"
-
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace opencv {
-
-class FrameWriteCallback : public OutputStreamCallback {
- public:
-  explicit FrameWriteCallback(cv::Mat image_mat, std::string image_encoding)
-      : image_mat_(std::move(image_mat)), image_encoding_(image_encoding) {
-  }
-  ~FrameWriteCallback() override = default;
-
-  int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
-    imencode(image_encoding_, image_mat_, image_buf_);
-    const auto ret = stream->write(image_buf_.data(), image_buf_.size());
-    return io::isError(ret) ? -1 : gsl::narrow<int64_t>(ret);
-  }
-
- private:
-  std::vector<uchar> image_buf_;
-  cv::Mat image_mat_;
-  std::string image_encoding_;
-};
-
-class FrameReadCallback : public InputStreamCallback {
- public:
-  explicit FrameReadCallback(cv::Mat &image_mat)
-      : image_mat_(image_mat) {
-  }
-  ~FrameReadCallback() override = default;
-
-  int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
-    std::vector<uchar> image_buf;
-    image_buf.resize(stream->size());
-    const auto ret = stream->read(gsl::make_span(image_buf).as_span<std::byte>());
-    if (ret != stream->size()) {
-      throw std::runtime_error("ImageReadCallback failed to fully read flow file input stream");
-    }
-    image_mat_ = cv::imdecode(image_buf, -1);
-    return gsl::narrow<int64_t>(ret);
-  }
-
- private:
-  cv::Mat &image_mat_;
-};
-
-} /* namespace opencv */
-} /* namespace minifi */
-} /* namespace nifi */
-} /* namespace apache */
-} /* namespace org */
diff --git a/extensions/opencv/MotionDetector.cpp b/extensions/opencv/MotionDetector.cpp
index 05f94b0e8..3114b783a 100644
--- a/extensions/opencv/MotionDetector.cpp
+++ b/extensions/opencv/MotionDetector.cpp
@@ -20,7 +20,6 @@
 #include <vector>
 
 #include "MotionDetector.h"
-#include "FrameIO.h"
 #include "core/ProcessContext.h"
 #include "core/ProcessSession.h"
 #include "core/Resource.h"
@@ -163,8 +162,16 @@ void MotionDetector::onTrigger(const std::shared_ptr<core::ProcessContext> &cont
   }
   cv::Mat frame;
 
-  opencv::FrameReadCallback cb(frame);
-  session->read(flow_file, &cb);
+  session->read(flow_file, [&frame](const std::shared_ptr<io::BaseStream>& input_stream) -> int64_t {
+    std::vector<uchar> image_buf;
+    image_buf.resize(input_stream->size());
+    const auto ret = input_stream->read(gsl::make_span(image_buf).as_span<std::byte>());
+    if (io::isError(ret) || ret != input_stream->size()) {
+      throw std::runtime_error("ImageReadCallback failed to fully read flow file input stream");
+    }
+    frame = cv::imdecode(image_buf, -1);
+    return gsl::narrow<int64_t>(ret);
+  });
 
   if (frame.empty()) {
     logger_->log_error("Empty frame.");
@@ -192,11 +199,14 @@ void MotionDetector::onTrigger(const std::shared_ptr<core::ProcessContext> &cont
 
   detectAndDraw(frame);
 
-  opencv::FrameWriteCallback write_cb(frame, image_encoding_);
-
   session->putAttribute(flow_file, "filename", filename);
 
-  session->write(flow_file, &write_cb);
+  session->write(flow_file, [&frame, this](const auto& output_stream) -> int64_t {
+    std::vector<uchar> image_buf;
+    imencode(image_encoding_, frame, image_buf);
+    const auto ret = output_stream->write(image_buf.data(), image_buf.size());
+    return io::isError(ret) ? -1 : gsl::narrow<int64_t>(ret);
+  });
   session->transfer(flow_file, Success);
   logger_->log_trace("Finish motion detecting");
 }
diff --git a/extensions/openwsman/processors/SourceInitiatedSubscriptionListener.cpp b/extensions/openwsman/processors/SourceInitiatedSubscriptionListener.cpp
index 5c76d603c..6625f42e1 100644
--- a/extensions/openwsman/processors/SourceInitiatedSubscriptionListener.cpp
+++ b/extensions/openwsman/processors/SourceInitiatedSubscriptionListener.cpp
@@ -66,11 +66,8 @@ extern "C" {
 #define WSMAN_CUSTOM_ACTION_HEARTBEAT "http://schemas.dmtf.org/wbem/wsman/1/wsman/Heartbeat"
 #define WSMAN_CUSTOM_ACTION_EVENTS "http://schemas.dmtf.org/wbem/wsman/1/wsman/Events"
 
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace processors {
+namespace org::apache::nifi::minifi::processors {
+
 core::Property SourceInitiatedSubscriptionListener::ListenHostname(
     core::PropertyBuilder::createProperty("Listen Hostname")->withDescription("The hostname or IP of this machine that will be advertised to event sources to connect to. "
                                                                               "It must be contained as a Subject Alternative Name in the server certificate, "
@@ -602,15 +599,6 @@ bool SourceInitiatedSubscriptionListener::Handler::handleSubscriptionManager(str
   return true;
 }
 
-SourceInitiatedSubscriptionListener::Handler::WriteCallback::WriteCallback(char* text)
-    : text_(text) {
-}
-
-int64_t SourceInitiatedSubscriptionListener::Handler::WriteCallback::process(const std::shared_ptr<io::BaseStream>& stream) {
-  const auto write_ret = stream->write(reinterpret_cast<uint8_t*>(text_), strlen(text_));
-  return io::isError(write_ret) ? -1 : gsl::narrow<int64_t>(write_ret);
-}
-
 int SourceInitiatedSubscriptionListener::Handler::enumerateEventCallback(WsXmlNodeH node, void* data) {
   if (data == nullptr) {
     return 1;
@@ -636,8 +624,7 @@ int SourceInitiatedSubscriptionListener::Handler::enumerateEventCallback(WsXmlNo
       return 1;
     }
 
-    WriteCallback callback(text);
-    session->write(flow_file, &callback);
+    session->writeBuffer(flow_file, std::string_view{text});
 
     session->putAttribute(flow_file, core::SpecialFlowAttribute::MIME_TYPE, "application/xml");
     flow_file->addAttribute(ATTRIBUTE_WEF_REMOTE_MACHINEID, machine_id);
@@ -906,8 +893,4 @@ void SourceInitiatedSubscriptionListener::notifyStop() {
 REGISTER_RESOURCE(SourceInitiatedSubscriptionListener, "This processor implements a Windows Event Forwarding Source Initiated Subscription server with the help of OpenWSMAN. "
                                                        "Windows hosts can be set up to connect and forward Event Logs to this processor.");
 
-} /* namespace processors */
-} /* namespace minifi */
-} /* namespace nifi */
-} /* namespace apache */
-} /* namespace org */
+}  // namespace org::apache::nifi::minifi::processors
diff --git a/extensions/openwsman/processors/SourceInitiatedSubscriptionListener.h b/extensions/openwsman/processors/SourceInitiatedSubscriptionListener.h
index 6b84ef973..927e81960 100644
--- a/extensions/openwsman/processors/SourceInitiatedSubscriptionListener.h
+++ b/extensions/openwsman/processors/SourceInitiatedSubscriptionListener.h
@@ -39,11 +39,7 @@ extern "C" {
 #include "core/logging/LoggerConfiguration.h"
 #include "utils/Id.h"
 
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace processors {
+namespace org::apache::nifi::minifi::processors {
 
 class SourceInitiatedSubscriptionListener : public core::Processor {
  public:
@@ -86,16 +82,7 @@ class SourceInitiatedSubscriptionListener : public core::Processor {
   class Handler: public CivetHandler {
    public:
     explicit Handler(SourceInitiatedSubscriptionListener& processor);
-    bool handlePost(CivetServer* server, struct mg_connection* conn);
-
-    class WriteCallback : public OutputStreamCallback {
-     public:
-      explicit WriteCallback(char* text);
-      int64_t process(const std::shared_ptr<io::BaseStream>& stream);
-
-     private:
-      char* text_;
-    };
+    bool handlePost(CivetServer* server, struct mg_connection* conn) override;
 
    private:
     SourceInitiatedSubscriptionListener& processor_;
@@ -163,8 +150,4 @@ class SourceInitiatedSubscriptionListener : public core::Processor {
   bool loadState();
 };
 
-} /* namespace processors */
-} /* namespace minifi */
-} /* namespace nifi */
-} /* namespace apache */
-} /* namespace org */
+}  // namespace org::apache::nifi::minifi::processors
diff --git a/extensions/pdh/PerformanceDataMonitor.cpp b/extensions/pdh/PerformanceDataMonitor.cpp
index 71c9618e5..4e1461d5c 100644
--- a/extensions/pdh/PerformanceDataMonitor.cpp
+++ b/extensions/pdh/PerformanceDataMonitor.cpp
@@ -123,11 +123,11 @@ void PerformanceDataMonitor::onTrigger(core::ProcessContext* context, core::Proc
   }
   if (pretty_output_) {
     utils::PrettyJsonOutputCallback callback(std::move(root), decimal_places_);
-    session->write(flowFile, &callback);
+    session->write(flowFile, std::ref(callback));
     session->transfer(flowFile, Success);
   } else {
     utils::JsonOutputCallback callback(std::move(root), decimal_places_);
-    session->write(flowFile, &callback);
+    session->write(flowFile, std::ref(callback));
     session->transfer(flowFile, Success);
   }
 }
diff --git a/extensions/procfs/processors/ProcFsMonitor.cpp b/extensions/procfs/processors/ProcFsMonitor.cpp
index 485da9f03..992d6edcd 100644
--- a/extensions/procfs/processors/ProcFsMonitor.cpp
+++ b/extensions/procfs/processors/ProcFsMonitor.cpp
@@ -120,11 +120,11 @@ void ProcFsMonitor::onTrigger(core::ProcessContext*, core::ProcessSession* sessi
 
   if (output_compactness_ == OutputCompactness::PRETTY) {
     utils::PrettyJsonOutputCallback callback(std::move(root), decimal_places_);
-    session->write(flowFile, &callback);
+    session->write(flowFile, std::ref(callback));
     session->transfer(flowFile, Success);
   } else if (output_compactness_ == OutputCompactness::COMPACT) {
     utils::JsonOutputCallback callback(std::move(root), decimal_places_);
-    session->write(flowFile, &callback);
+    session->write(flowFile, std::ref(callback));
     session->transfer(flowFile, Success);
   } else {
     throw Exception(GENERAL_EXCEPTION, "Invalid output compactness");
diff --git a/extensions/script/lua/LuaProcessSession.cpp b/extensions/script/lua/LuaProcessSession.cpp
index 4e20bb840..7b9916b8c 100644
--- a/extensions/script/lua/LuaProcessSession.cpp
+++ b/extensions/script/lua/LuaProcessSession.cpp
@@ -74,8 +74,10 @@ void LuaProcessSession::read(const std::shared_ptr<script::ScriptFlowFile> &scri
     throw std::runtime_error("Access of FlowFile after it has been released");
   }
 
-  LuaInputStreamCallback lua_callback(input_stream_callback);
-  session_->read(flow_file, &lua_callback);
+  session_->read(flow_file, [&input_stream_callback](const std::shared_ptr<io::BaseStream>& input_stream) -> int64_t {
+    sol::function callback = input_stream_callback["process"];
+    return callback(input_stream_callback, std::make_shared<LuaBaseStream>(input_stream));
+  });
 }
 
 void LuaProcessSession::write(const std::shared_ptr<script::ScriptFlowFile> &script_flow_file,
@@ -90,8 +92,10 @@ void LuaProcessSession::write(const std::shared_ptr<script::ScriptFlowFile> &scr
     throw std::runtime_error("Access of FlowFile after it has been released");
   }
 
-  LuaOutputStreamCallback lua_callback(output_stream_callback);
-  session_->write(flow_file, &lua_callback);
+  session_->write(flow_file, [&output_stream_callback](const std::shared_ptr<io::BaseStream>& output_stream) -> int64_t {
+    sol::function callback = output_stream_callback["process"];
+    return callback(output_stream_callback, std::make_shared<LuaBaseStream>(output_stream));
+  });
 }
 
 std::shared_ptr<script::ScriptFlowFile> LuaProcessSession::create() {
diff --git a/extensions/script/lua/LuaProcessSession.h b/extensions/script/lua/LuaProcessSession.h
index 48e387307..27c4709e1 100644
--- a/extensions/script/lua/LuaProcessSession.h
+++ b/extensions/script/lua/LuaProcessSession.h
@@ -53,38 +53,6 @@ class LuaProcessSession {
    */
   void releaseCoreResources();
 
-  class LuaInputStreamCallback : public InputStreamCallback {
-   public:
-    explicit LuaInputStreamCallback(const sol::table &input_stream_callback) {
-      lua_callback_ = input_stream_callback;
-    }
-
-    int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
-      auto lua_stream = std::make_shared<LuaBaseStream>(stream);
-      sol::function callback = lua_callback_["process"];
-      return callback(lua_callback_, lua_stream);
-    }
-
-   private:
-    sol::table lua_callback_;
-  };
-
-  class LuaOutputStreamCallback : public OutputStreamCallback {
-   public:
-    explicit LuaOutputStreamCallback(const sol::table &output_stream_callback) {
-      lua_callback_ = output_stream_callback;
-    }
-
-    int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
-      auto lua_stream = std::make_shared<LuaBaseStream>(stream);
-      sol::function callback = lua_callback_["process"];
-      return callback(lua_callback_, lua_stream);
-    }
-
-   private:
-    sol::table lua_callback_;
-  };
-
  private:
   std::vector<std::shared_ptr<script::ScriptFlowFile>> flow_files_;
   std::shared_ptr<core::ProcessSession> session_;
diff --git a/extensions/script/python/PyProcessSession.cpp b/extensions/script/python/PyProcessSession.cpp
index b0a5d36a2..78b92dd57 100644
--- a/extensions/script/python/PyProcessSession.cpp
+++ b/extensions/script/python/PyProcessSession.cpp
@@ -80,8 +80,9 @@ void PyProcessSession::read(std::shared_ptr<script::ScriptFlowFile> script_flow_
     throw std::runtime_error("Access of FlowFile after it has been released");
   }
 
-  PyInputStreamCallback py_callback(input_stream_callback);
-  session_->read(flow_file, &py_callback);
+  session_->read(flow_file, [&input_stream_callback](const std::shared_ptr<io::BaseStream>& input_stream) -> int64_t {
+    return input_stream_callback.attr("process")(std::make_shared<PyBaseStream>(input_stream)).cast<int64_t>();
+  });
 }
 
 void PyProcessSession::write(std::shared_ptr<script::ScriptFlowFile> script_flow_file,
@@ -96,8 +97,9 @@ void PyProcessSession::write(std::shared_ptr<script::ScriptFlowFile> script_flow
     throw std::runtime_error("Access of FlowFile after it has been released");
   }
 
-  PyOutputStreamCallback py_callback(output_stream_callback);
-  session_->write(flow_file, &py_callback);
+  session_->write(flow_file, [&output_stream_callback](const std::shared_ptr<io::BaseStream>& output_stream) -> int64_t {
+    return output_stream_callback.attr("process")(std::make_shared<PyBaseStream>(output_stream)).cast<int64_t>();
+  });
 }
 
 std::shared_ptr<script::ScriptFlowFile> PyProcessSession::create() {
diff --git a/extensions/script/python/PyProcessSession.h b/extensions/script/python/PyProcessSession.h
index 1d6867eac..2cdee8a3e 100644
--- a/extensions/script/python/PyProcessSession.h
+++ b/extensions/script/python/PyProcessSession.h
@@ -58,36 +58,6 @@ class PyProcessSession {
    */
   void releaseCoreResources();
 
-  class PyInputStreamCallback : public InputStreamCallback {
-   public:
-    explicit PyInputStreamCallback(const py::object &input_stream_callback) {
-      py_callback_ = input_stream_callback;
-    }
-
-    int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
-      auto py_stream = std::make_shared<PyBaseStream>(stream);
-      return py_callback_.attr("process")(py_stream).cast<int64_t>();
-    }
-
-   private:
-    py::object py_callback_;
-  };
-
-  class PyOutputStreamCallback : public OutputStreamCallback {
-   public:
-    explicit PyOutputStreamCallback(const py::object &output_stream_callback) {
-      py_callback_ = output_stream_callback;
-    }
-
-    int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
-      auto py_stream = std::make_shared<PyBaseStream>(stream);
-      return py_callback_.attr("process")(py_stream).cast<int64_t>();
-    }
-
-   private:
-    py::object py_callback_;
-  };
-
  private:
   std::vector<std::shared_ptr<script::ScriptFlowFile>> flow_files_;
   std::shared_ptr<core::ProcessSession> session_;
diff --git a/extensions/sensors/GetEnvironmentalSensors.cpp b/extensions/sensors/GetEnvironmentalSensors.cpp
index 900095f41..7c62cde10 100644
--- a/extensions/sensors/GetEnvironmentalSensors.cpp
+++ b/extensions/sensors/GetEnvironmentalSensors.cpp
@@ -137,8 +137,7 @@ void GetEnvironmentalSensors::onTrigger(const std::shared_ptr<core::ProcessConte
   }
 
   if (have_sensor) {
-    WriteCallback callback("GetEnvironmentalSensors");
-    session->write(flow_file_, &callback);
+    session->writeBuffer(flow_file_, "GetEnvironmentalSensors");
     session->transfer(flow_file_, Success);
   }
 }
diff --git a/extensions/sensors/GetMovementSensors.cpp b/extensions/sensors/GetMovementSensors.cpp
index 7e15f1a57..3340bf665 100644
--- a/extensions/sensors/GetMovementSensors.cpp
+++ b/extensions/sensors/GetMovementSensors.cpp
@@ -85,8 +85,7 @@ void GetMovementSensors::onTrigger(const std::shared_ptr<core::ProcessContext>&
       logger_->log_trace("Could not read gyroscope");
     }
 
-    WriteCallback callback("GetMovementSensors");
-    session->write(flow_file_, &callback);
+    session->writeBuffer(flow_file_, "GetMovementSensors");
     session->transfer(flow_file_, Success);
   }
 }
diff --git a/extensions/sensors/SensorBase.h b/extensions/sensors/SensorBase.h
index 6a889b743..b27a8e30f 100644
--- a/extensions/sensors/SensorBase.h
+++ b/extensions/sensors/SensorBase.h
@@ -33,39 +33,18 @@ namespace nifi {
 namespace minifi {
 namespace processors {
 
-// SensorBase Class
 class SensorBase : public core::Processor {
  public:
-  // Constructor
-  /*!
-   * Create a new processor
-   */
   explicit SensorBase(const std::string& name, const utils::Identifier& uuid = {})
     : Processor(name, uuid) {
   }
-  // Destructor
   ~SensorBase() override;
-  // Processor Name
   static core::Relationship Success;
-  // Supported Properties
 
   void onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) override;
   void initialize() override;
   void onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) override;
 
-  class WriteCallback : public OutputStreamCallback {
-   public:
-    explicit WriteCallback(std::string data)
-        : data_{std::move(data)} {
-    }
-    std::string data_;
-    int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
-      if (data_.empty()) return 0;
-      const auto write_ret = stream->write(reinterpret_cast<const uint8_t*>(data_.data()), data_.size());
-      return io::isError(write_ret) ? -1 : gsl::narrow<int64_t>(write_ret);
-    }
-  };
-
  protected:
   std::optional<RTIMUSettings> settings_;
   std::unique_ptr<RTIMU> imu_;
diff --git a/extensions/sftp/processors/FetchSFTP.cpp b/extensions/sftp/processors/FetchSFTP.cpp
index 1dac1d8bf..322bcd402 100644
--- a/extensions/sftp/processors/FetchSFTP.cpp
+++ b/extensions/sftp/processors/FetchSFTP.cpp
@@ -158,21 +158,6 @@ void FetchSFTP::onSchedule(const std::shared_ptr<core::ProcessContext> &context,
   startKeepaliveThreadIfNeeded();
 }
 
-FetchSFTP::WriteCallback::WriteCallback(const std::string& remote_file,
-                                    utils::SFTPClient& client)
-    : remote_file_(remote_file)
-    , client_(client) {
-}
-
-FetchSFTP::WriteCallback::~WriteCallback() = default;
-
-int64_t FetchSFTP::WriteCallback::process(const std::shared_ptr<io::BaseStream>& stream) {
-  if (!client_.getFile(remote_file_, *stream)) {
-    throw utils::SFTPException{client_.getLastError()};
-  }
-  return stream->size();
-}
-
 void FetchSFTP::onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) {
   auto flow_file = session->get();
   if (flow_file == nullptr) {
@@ -220,9 +205,13 @@ void FetchSFTP::onTrigger(const std::shared_ptr<core::ProcessContext> &context,
   };
 
   /* Download file */
-  WriteCallback write_callback(remote_file, *client);
   try {
-    session->write(flow_file, &write_callback);
+    session->write(flow_file, [&remote_file, &client](const std::shared_ptr<io::BaseStream>& stream) -> int64_t {
+      if (!client->getFile(remote_file, *stream)) {
+        throw utils::SFTPException{client->getLastError()};
+      }
+      return gsl::narrow<int64_t>(stream->size());
+    });
   } catch (const utils::SFTPException& ex) {
     logger_->log_debug(ex.what());
     switch (ex.error().value()) {
diff --git a/extensions/sftp/processors/FetchSFTP.h b/extensions/sftp/processors/FetchSFTP.h
index 5270d4b0c..b8b62a2de 100644
--- a/extensions/sftp/processors/FetchSFTP.h
+++ b/extensions/sftp/processors/FetchSFTP.h
@@ -74,19 +74,6 @@ class FetchSFTP : public SFTPProcessorBase {
   void initialize() override;
   void onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) override;
 
-  class WriteCallback : public OutputStreamCallback {
-   public:
-    WriteCallback(const std::string& remote_file,
-                 utils::SFTPClient& client);
-    ~WriteCallback() override;
-    int64_t process(const std::shared_ptr<io::BaseStream>& stream) override;
-
-   private:
-    std::shared_ptr<core::logging::Logger> logger_ = core::logging::LoggerFactory<FetchSFTP::WriteCallback>::getLogger();
-    const std::string remote_file_;
-    utils::SFTPClient& client_;
-  };
-
  private:
   core::annotation::Input getInputRequirement() const override {
     return core::annotation::Input::INPUT_REQUIRED;
diff --git a/extensions/sftp/processors/PutSFTP.cpp b/extensions/sftp/processors/PutSFTP.cpp
index a683aa901..d90687465 100644
--- a/extensions/sftp/processors/PutSFTP.cpp
+++ b/extensions/sftp/processors/PutSFTP.cpp
@@ -212,26 +212,6 @@ void PutSFTP::onSchedule(const std::shared_ptr<core::ProcessContext> &context, c
   startKeepaliveThreadIfNeeded();
 }
 
-PutSFTP::ReadCallback::ReadCallback(const std::string& target_path,
-                                    utils::SFTPClient& client,
-                                    const std::string& conflict_resolution)
-    : target_path_(target_path)
-    , client_(client)
-    , conflict_resolution_(conflict_resolution) {
-}
-
-PutSFTP::ReadCallback::~ReadCallback() = default;
-
-int64_t PutSFTP::ReadCallback::process(const std::shared_ptr<io::BaseStream>& stream) {
-  if (!client_.putFile(target_path_,
-      *stream,
-      conflict_resolution_ == CONFLICT_RESOLUTION_REPLACE /*overwrite*/,
-      stream->size() /*expected_size*/)) {
-    throw utils::SFTPException{client_.getLastError()};
-  }
-  return stream->size();
-}
-
 bool PutSFTP::processOne(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) {
   auto flow_file = session->get();
   if (flow_file == nullptr) {
@@ -436,9 +416,16 @@ bool PutSFTP::processOne(const std::shared_ptr<core::ProcessContext> &context, c
   std::string final_target_path = utils::file::concat_path(remote_path, resolved_filename, true /*force_posix*/);
   logger_->log_debug("The target path is %s, final target path is %s", target_path.c_str(), final_target_path.c_str());
 
-  ReadCallback read_callback(target_path.c_str(), *client, conflict_resolution_);
   try {
-    session->read(flow_file, &read_callback);
+    session->read(flow_file, [&client, &target_path, this](const std::shared_ptr<io::BaseStream>& stream) {
+      if (!client->putFile(target_path,
+          *stream,
+          conflict_resolution_ == CONFLICT_RESOLUTION_REPLACE /*overwrite*/,
+          stream->size() /*expected_size*/)) {
+        throw utils::SFTPException{client->getLastError()};
+      }
+      return gsl::narrow<int64_t>(stream->size());
+    });
   } catch (const utils::SFTPException& ex) {
     logger_->log_debug(ex.what());
     session->transfer(flow_file, Failure);
diff --git a/extensions/sftp/processors/PutSFTP.h b/extensions/sftp/processors/PutSFTP.h
index 9f6ff3c16..e723becb3 100644
--- a/extensions/sftp/processors/PutSFTP.h
+++ b/extensions/sftp/processors/PutSFTP.h
@@ -89,21 +89,6 @@ class PutSFTP : public SFTPProcessorBase {
   void initialize() override;
   void onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) override;
 
-  class ReadCallback : public InputStreamCallback {
-   public:
-    ReadCallback(const std::string& target_path,
-        utils::SFTPClient& client,
-        const std::string& conflict_resolution);
-    ~ReadCallback() override;
-    int64_t process(const std::shared_ptr<io::BaseStream>& stream) override;
-
-   private:
-    std::shared_ptr<core::logging::Logger> logger_ = core::logging::LoggerFactory<PutSFTP::ReadCallback>::getLogger();
-    const std::string target_path_;
-    utils::SFTPClient& client_;
-    const std::string conflict_resolution_;
-  };
-
  private:
   core::annotation::Input getInputRequirement() const override {
     return core::annotation::Input::INPUT_REQUIRED;
diff --git a/extensions/splunk/PutSplunkHTTP.cpp b/extensions/splunk/PutSplunkHTTP.cpp
index a58ef713f..224715036 100644
--- a/extensions/splunk/PutSplunkHTTP.cpp
+++ b/extensions/splunk/PutSplunkHTTP.cpp
@@ -30,6 +30,7 @@
 #include "client/HTTPClient.h"
 #include "utils/HTTPClient.h"
 #include "utils/OptionalUtils.h"
+#include "utils/ByteArrayCallback.h"
 
 #include "rapidjson/document.h"
 
@@ -131,9 +132,9 @@ void setFlowFileAsPayload(core::ProcessSession& session,
                           core::ProcessContext& context,
                           utils::HTTPClient& client,
                           const gsl::not_null<std::shared_ptr<core::FlowFile>>& flow_file,
-                          utils::ByteInputCallBack& payload_callback,
+                          utils::ByteInputCallback& payload_callback,
                           utils::HTTPUploadCallback& payload_callback_obj) {
-  session.read(flow_file, &payload_callback);
+  session.read(flow_file, std::ref(payload_callback));
   payload_callback_obj.ptr = &payload_callback;
   payload_callback_obj.pos = 0;
   client.appendHeader("Content-Length", std::to_string(flow_file->getSize()));
@@ -160,7 +161,7 @@ void PutSplunkHTTP::onTrigger(const std::shared_ptr<core::ProcessContext>& conte
   utils::HTTPClient client;
   initializeClient(client, getNetworkLocation().append(getEndpoint(*context, flow_file, client)), getSSLContextService(*context));
 
-  utils::ByteInputCallBack payload_callback;
+  utils::ByteInputCallback payload_callback;
   utils::HTTPUploadCallback payload_callback_obj;
   setFlowFileAsPayload(*session, *context, client, flow_file, payload_callback, payload_callback_obj);
 
diff --git a/extensions/sql/processors/FlowFileSource.cpp b/extensions/sql/processors/FlowFileSource.cpp
index 49c5affd4..f7d7dffd0 100644
--- a/extensions/sql/processors/FlowFileSource.cpp
+++ b/extensions/sql/processors/FlowFileSource.cpp
@@ -53,11 +53,10 @@ void FlowFileSource::FlowFileGenerator::endProcessBatch() {
     return;
   }
 
-  OutputStreamPipe writer{std::make_shared<io::BufferStream>(json_writer_.toString())};
   auto new_flow = session_.create();
   new_flow->addAttribute(FRAGMENT_INDEX, std::to_string(flow_files_.size()));
   new_flow->addAttribute(FRAGMENT_IDENTIFIER, batch_id_.to_string());
-  session_.write(new_flow, &writer);
+  session_.writeBuffer(new_flow, json_writer_.toString());
   flow_files_.push_back(std::move(new_flow));
 }
 
diff --git a/extensions/standard-processors/processors/AttributesToJSON.cpp b/extensions/standard-processors/processors/AttributesToJSON.cpp
index 69dd573d6..49b24ae48 100644
--- a/extensions/standard-processors/processors/AttributesToJSON.cpp
+++ b/extensions/standard-processors/processors/AttributesToJSON.cpp
@@ -167,8 +167,7 @@ void AttributesToJSON::onTrigger(core::ProcessContext* /*context*/, core::Proces
     session->transfer(flow_file, Success);
   } else {
     logger_->log_debug("Writing the following attribute data to flowfile: %s", json_data);
-    AttributesToJSON::WriteCallback callback(json_data);
-    session->write(flow_file, &callback);
+    session->writeBuffer(flow_file, json_data);
     session->transfer(flow_file, Success);
   }
 }
diff --git a/extensions/standard-processors/processors/AttributesToJSON.h b/extensions/standard-processors/processors/AttributesToJSON.h
index 35149adad..b085130b8 100644
--- a/extensions/standard-processors/processors/AttributesToJSON.h
+++ b/extensions/standard-processors/processors/AttributesToJSON.h
@@ -72,17 +72,6 @@ class AttributesToJSON : public core::Processor {
   }
 
  private:
-  class WriteCallback : public OutputStreamCallback {
-   public:
-    explicit WriteCallback(const std::string& json_data) : json_data_(json_data) {}
-    int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
-      const auto write_ret = stream->write(reinterpret_cast<const uint8_t*>(json_data_.data()), json_data_.length());
-      return io::isError(write_ret) ? -1 : gsl::narrow<int64_t>(write_ret);
-    }
-   private:
-    std::string json_data_;
-  };
-
   bool isCoreAttributeToBeFiltered(const std::string& attribute) const;
   std::optional<std::unordered_set<std::string>> getAttributesToBeWritten(const core::FlowFile::AttributeMap& flowfile_attributes) const;
   void addAttributeToJson(rapidjson::Document& document, const std::string& key, const std::optional<std::string>& value);
diff --git a/extensions/standard-processors/processors/DefragmentText.cpp b/extensions/standard-processors/processors/DefragmentText.cpp
index 3d5411e0a..c6c23d0df 100644
--- a/extensions/standard-processors/processors/DefragmentText.cpp
+++ b/extensions/standard-processors/processors/DefragmentText.cpp
@@ -165,19 +165,6 @@ void DefragmentText::updateAttributesForSplitFiles(const core::FlowFile& origina
 }
 
 namespace {
-class AppendFlowFileToFlowFile : public OutputStreamCallback {
- public:
-  explicit AppendFlowFileToFlowFile(const gsl::not_null<std::shared_ptr<core::FlowFile>>& flow_file_to_append, PayloadSerializer& serializer)
-      : flow_file_to_append_(flow_file_to_append), serializer_(serializer) {}
-
-  int64_t process(const std::shared_ptr<io::BaseStream> &stream) override {
-    return serializer_.serialize(flow_file_to_append_, stream);
-  }
- private:
-  const gsl::not_null<std::shared_ptr<core::FlowFile>>& flow_file_to_append_;
-  PayloadSerializer& serializer_;
-};
-
 void updateAppendedAttributes(core::FlowFile& buffered_ff) {
   std::string base_name, post_name, offset_str;
   if (!buffered_ff.getAttribute(textfragmentutils::BASE_NAME_ATTRIBUTE, base_name))
@@ -192,18 +179,6 @@ void updateAppendedAttributes(core::FlowFile& buffered_ff) {
   buffered_ff.setAttribute(core::SpecialFlowAttribute::FILENAME, buffer_new_name);
 }
 
-struct ReadFlowFileContent : public InputStreamCallback {
-  std::string content;
-
-  int64_t process(const std::shared_ptr<io::BaseStream> &stream) override {
-    content.resize(stream->size());
-    const auto ret = stream->read(gsl::make_span(content).as_span<std::byte>());
-    if (io::isError(ret))
-      return -1;
-    return gsl::narrow<int64_t>(ret);
-  }
-};
-
 size_t getSplitPosition(const utils::SMatch& last_match, DefragmentText::PatternLocation pattern_location) {
   size_t split_position = last_match.position(0);
   if (pattern_location == DefragmentText::PatternLocation::END_OF_MESSAGE) {
@@ -218,9 +193,8 @@ bool DefragmentText::splitFlowFileAtLastPattern(core::ProcessSession *session,
                                                 const gsl::not_null<std::shared_ptr<core::FlowFile>> &original_flow_file,
                                                 std::shared_ptr<core::FlowFile> &split_before_last_pattern,
                                                 std::shared_ptr<core::FlowFile> &split_after_last_pattern) const {
-  ReadFlowFileContent read_flow_file_content;
-  session->read(original_flow_file, &read_flow_file_content);
-  auto last_regex_match = utils::getLastRegexMatch(read_flow_file_content.content, pattern_);
+  const auto read_result = session->readBuffer(original_flow_file);
+  auto last_regex_match = utils::getLastRegexMatch(to_string(read_result), pattern_);
   if (!last_regex_match.ready()) {
     split_before_last_pattern = session->clone(original_flow_file);
     split_after_last_pattern = nullptr;
@@ -256,13 +230,14 @@ void DefragmentText::Buffer::append(core::ProcessSession* session, const gsl::no
     store(session, flow_file_to_append);
     return;
   }
-  auto flowFileReader = [&] (const std::shared_ptr<core::FlowFile>& ff, InputStreamCallback* cb) {
+  auto flowFileReader = [&] (const std::shared_ptr<core::FlowFile>& ff, const io::InputStreamCallback& cb) {
     return session->read(ff, cb);
   };
   PayloadSerializer serializer(flowFileReader);
-  AppendFlowFileToFlowFile append_flow_file_to_flow_file(flow_file_to_append, serializer);
   session->add(buffered_flow_file_);
-  session->append(buffered_flow_file_, &append_flow_file_to_flow_file);
+  session->append(buffered_flow_file_, [&serializer, &flow_file_to_append](const auto& output_stream) -> int64_t {
+    return serializer.serialize(flow_file_to_append, output_stream);
+  });
   updateAppendedAttributes(*buffered_flow_file_);
   session->transfer(buffered_flow_file_, Self);
 
diff --git a/extensions/standard-processors/processors/ExecuteProcess.cpp b/extensions/standard-processors/processors/ExecuteProcess.cpp
index 29d444d02..b1b5bfba2 100644
--- a/extensions/standard-processors/processors/ExecuteProcess.cpp
+++ b/extensions/standard-processors/processors/ExecuteProcess.cpp
@@ -166,13 +166,12 @@ void ExecuteProcess::onTrigger(core::ProcessContext *context, core::ProcessSessi
             if (numRead <= 0)
               break;
             logger_->log_debug("Execute Command Respond %zd", numRead);
-            ExecuteProcess::WriteCallback callback(buffer, gsl::narrow<uint64_t>(numRead));
             auto flowFile = session->create();
             if (!flowFile)
               continue;
             flowFile->addAttribute("command", _command);
             flowFile->addAttribute("command.arguments", _commandArgument);
-            session->write(flowFile, &callback);
+            session->writeBuffer(flowFile, gsl::make_span(buffer, gsl::narrow<size_t>(numRead)));
             session->transfer(flowFile, Success);
             session->commit();
           }
@@ -187,16 +186,16 @@ void ExecuteProcess::onTrigger(core::ProcessContext *context, core::ProcessSessi
               if (totalRead > 0) {
                 logger_->log_debug("Execute Command Respond %zu", totalRead);
                 // child exits and close the pipe
-                ExecuteProcess::WriteCallback callback(buffer, totalRead);
+                const auto buffer_span = gsl::make_span(buffer, totalRead);
                 if (!flowFile) {
                   flowFile = session->create();
                   if (!flowFile)
                     break;
                   flowFile->addAttribute("command", _command);
                   flowFile->addAttribute("command.arguments", _commandArgument);
-                  session->write(flowFile, &callback);
+                  session->writeBuffer(flowFile, buffer_span);
                 } else {
-                  session->append(flowFile, &callback);
+                  session->appendBuffer(flowFile, buffer_span);
                 }
                 session->transfer(flowFile, Success);
               }
@@ -205,16 +204,15 @@ void ExecuteProcess::onTrigger(core::ProcessContext *context, core::ProcessSessi
               if (numRead == static_cast<ssize_t>((sizeof(buffer) - totalRead))) {
                 // we reach the max buffer size
                 logger_->log_debug("Execute Command Max Respond %zu", sizeof(buffer));
-                ExecuteProcess::WriteCallback callback(buffer, sizeof(buffer));
                 if (!flowFile) {
                   flowFile = session->create();
                   if (!flowFile)
                     continue;
                   flowFile->addAttribute("command", _command);
                   flowFile->addAttribute("command.arguments", _commandArgument);
-                  session->write(flowFile, &callback);
+                  session->writeBuffer(flowFile, buffer);
                 } else {
-                  session->append(flowFile, &callback);
+                  session->appendBuffer(flowFile, buffer);
                 }
                 // Rewind
                 totalRead = 0;
diff --git a/extensions/standard-processors/processors/ExecuteProcess.h b/extensions/standard-processors/processors/ExecuteProcess.h
index 9960c7528..b6e14a9d3 100644
--- a/extensions/standard-processors/processors/ExecuteProcess.h
+++ b/extensions/standard-processors/processors/ExecuteProcess.h
@@ -80,23 +80,6 @@ class ExecuteProcess : public core::Processor {
   // Supported Relationships
   static core::Relationship Success;
 
-  // Nest Callback Class for write stream
-  class WriteCallback : public OutputStreamCallback {
-   public:
-    WriteCallback(char *data, uint64_t size)
-        : _data(data),
-          _dataSize(size) {
-    }
-    char *_data;
-    uint64_t _dataSize;
-    // void process(std::ofstream *stream) {
-    int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
-      if (!_data || _dataSize <= 0) return 0;
-      const auto write_ret = stream->write(reinterpret_cast<uint8_t*>(_data), _dataSize);
-      return io::isError(write_ret) ? -1 : gsl::narrow<int64_t>(write_ret);
-    }
-  };
-
  public:
   // OnTrigger method, implemented by NiFi ExecuteProcess
   void onTrigger(core::ProcessContext *context, core::ProcessSession *session) override;
diff --git a/extensions/standard-processors/processors/ExtractText.cpp b/extensions/standard-processors/processors/ExtractText.cpp
index 68cd6d4e3..6febe23e3 100644
--- a/extensions/standard-processors/processors/ExtractText.cpp
+++ b/extensions/standard-processors/processors/ExtractText.cpp
@@ -107,15 +107,16 @@ void ExtractText::onTrigger(core::ProcessContext *context, core::ProcessSession
     return;
   }
 
-  ReadCallback cb(flowFile, context, logger_);
-  session->read(flowFile, &cb);
+  session->read(flowFile, ReadCallback{flowFile, context, logger_});
   session->transfer(flowFile, Success);
 }
 
-int64_t ExtractText::ReadCallback::process(const std::shared_ptr<io::BaseStream>& stream) {
+int64_t ExtractText::ReadCallback::operator()(const std::shared_ptr<io::BaseStream>& stream) const {
   size_t read_size = 0;
   bool regex_mode;
   size_t size_limit = flowFile_->getSize();
+  std::vector<std::byte> buffer;
+  buffer.resize(std::min(gsl::narrow<size_t>(flowFile_->getSize()), MAX_BUFFER_SIZE));
 
   std::string attrKey, sizeLimitStr;
   ctx_->getProperty(Attribute.getName(), attrKey);
@@ -131,8 +132,8 @@ int64_t ExtractText::ReadCallback::process(const std::shared_ptr<io::BaseStream>
 
   while (read_size < size_limit) {
     // Don't read more than config limit or the size of the buffer
-    const auto length = std::min(size_limit - read_size, buffer_.size());
-    const auto ret = stream->read(gsl::make_span(buffer_).subspan(0, length));
+    const auto length = std::min(size_limit - read_size, buffer.size());
+    const auto ret = stream->read(gsl::make_span(buffer).subspan(0, length));
 
     if (io::isError(ret)) {
       return -1;  // Stream error
@@ -140,7 +141,7 @@ int64_t ExtractText::ReadCallback::process(const std::shared_ptr<io::BaseStream>
       break;  // End of stream, no more data
     }
 
-    contentStream.write(reinterpret_cast<const char*>(buffer_.data()), gsl::narrow<std::streamsize>(ret));
+    contentStream.write(reinterpret_cast<const char*>(buffer.data()), gsl::narrow<std::streamsize>(ret));
     read_size += ret;
     if (contentStream.fail()) {
       return -1;
@@ -219,7 +220,6 @@ ExtractText::ReadCallback::ReadCallback(std::shared_ptr<core::FlowFile> flowFile
     : flowFile_(std::move(flowFile)),
       ctx_(ctx),
       logger_(std::move(lgr)) {
-  buffer_.resize(std::min(gsl::narrow<size_t>(flowFile_->getSize()), MAX_BUFFER_SIZE));
 }
 
 REGISTER_RESOURCE(ExtractText, "Extracts the content of a FlowFile and places it into an attribute.");
diff --git a/extensions/standard-processors/processors/ExtractText.h b/extensions/standard-processors/processors/ExtractText.h
index 57cf788a5..ca84a7cdb 100644
--- a/extensions/standard-processors/processors/ExtractText.h
+++ b/extensions/standard-processors/processors/ExtractText.h
@@ -68,16 +68,14 @@ class ExtractText : public core::Processor {
     return true;
   }
 
-  class ReadCallback : public InputStreamCallback {
+  class ReadCallback {
    public:
     ReadCallback(std::shared_ptr<core::FlowFile> flowFile, core::ProcessContext *ct, std::shared_ptr<core::logging::Logger> lgr);
-    ~ReadCallback() override = default;
-    int64_t process(const std::shared_ptr<io::BaseStream>& stream) override;
+    int64_t operator()(const std::shared_ptr<io::BaseStream>& stream) const;
 
    private:
     std::shared_ptr<core::FlowFile> flowFile_;
     core::ProcessContext *ctx_;
-    std::vector<std::byte> buffer_;
     std::shared_ptr<core::logging::Logger> logger_;
   };
 
diff --git a/extensions/standard-processors/processors/GenerateFlowFile.cpp b/extensions/standard-processors/processors/GenerateFlowFile.cpp
index 654e5ff1f..7dd9d6423 100644
--- a/extensions/standard-processors/processors/GenerateFlowFile.cpp
+++ b/extensions/standard-processors/processors/GenerateFlowFile.cpp
@@ -149,11 +149,9 @@ void GenerateFlowFile::onTrigger(core::ProcessContext* /*context*/, core::Proces
       if (fileSize_ > 0) {
         generateData(data, textData_);
       }
-      GenerateFlowFile::WriteCallback callback(data);
-      session->write(flowFile, &callback);
+      session->writeBuffer(flowFile, data);
     } else {
-      GenerateFlowFile::WriteCallback callback(data_);
-      session->write(flowFile, &callback);
+      session->writeBuffer(flowFile, data_);
     }
     session->transfer(flowFile, Success);
   }
diff --git a/extensions/standard-processors/processors/GenerateFlowFile.h b/extensions/standard-processors/processors/GenerateFlowFile.h
index 343e7330f..5788e68aa 100644
--- a/extensions/standard-processors/processors/GenerateFlowFile.h
+++ b/extensions/standard-processors/processors/GenerateFlowFile.h
@@ -64,19 +64,6 @@ class GenerateFlowFile : public core::Processor {
   EXTENSIONAPI static const char *DATA_FORMAT_TEXT;
   // Supported Relationships
   EXTENSIONAPI static core::Relationship Success;
-  // Nest Callback Class for write stream
-  class WriteCallback : public OutputStreamCallback {
-   public:
-    explicit WriteCallback(const std::vector<char>& data)
-        :data_(&data)
-    { }
-    gsl::not_null<const std::vector<char>*> data_;
-    int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
-      if (data_->empty()) return 0;
-      const auto write_ret = stream->write(reinterpret_cast<const uint8_t*>(data_->data()), data_->size());
-      return io::isError(write_ret) ? -1 : gsl::narrow<int64_t>(write_ret);
-    }
-  };
 
  public:
   void onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) override;
diff --git a/extensions/standard-processors/processors/GetFile.cpp b/extensions/standard-processors/processors/GetFile.cpp
index 721860f36..2d59cdc25 100644
--- a/extensions/standard-processors/processors/GetFile.cpp
+++ b/extensions/standard-processors/processors/GetFile.cpp
@@ -189,8 +189,7 @@ void GetFile::getSingleFile(core::ProcessSession& session, const std::string& fi
   flow_file->addAttribute(core::SpecialFlowAttribute::ABSOLUTE_PATH, file_name);
 
   try {
-    utils::FileReaderCallback file_reader_callback{file_name};
-    session.write(flow_file, &file_reader_callback);
+    session.write(flow_file, utils::FileReaderCallback{file_name});
     session.transfer(flow_file, Success);
     if (!request_.keepSourceFile) {
       auto remove_status = remove(file_name.c_str());
diff --git a/extensions/standard-processors/processors/GetTCP.cpp b/extensions/standard-processors/processors/GetTCP.cpp
index 0e3e42241..f3aa31314 100644
--- a/extensions/standard-processors/processors/GetTCP.cpp
+++ b/extensions/standard-processors/processors/GetTCP.cpp
@@ -85,9 +85,7 @@ int16_t DataHandler::handle(std::string source, uint8_t *message, size_t size, b
   std::shared_ptr<core::ProcessSession> my_session = sessionFactory_->createSession();
   std::shared_ptr<core::FlowFile> flowFile = my_session->create();
 
-  DataHandlerCallback callback(message, size);
-
-  my_session->write(flowFile, &callback);
+  my_session->writeBuffer(flowFile, gsl::make_span(reinterpret_cast<const std::byte*>(message), size));
 
   my_session->putAttribute(flowFile, SOURCE_ENDPOINT_ATTRIBUTE, source);
 
diff --git a/extensions/standard-processors/processors/GetTCP.h b/extensions/standard-processors/processors/GetTCP.h
index 5ab1630a2..53845e6b7 100644
--- a/extensions/standard-processors/processors/GetTCP.h
+++ b/extensions/standard-processors/processors/GetTCP.h
@@ -89,25 +89,6 @@ class SocketAfterExecute : public utils::AfterExecute<int> {
   std::map<std::string, std::future<int>*> *list_;
 };
 
-class DataHandlerCallback : public OutputStreamCallback {
- public:
-  DataHandlerCallback(uint8_t *message, size_t size)
-      : message_(message),
-        size_(size) {
-  }
-
-  ~DataHandlerCallback() override = default;
-
-  int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
-    const auto write_ret = stream->write(message_, size_);
-    return io::isError(write_ret) ? -1 : gsl::narrow<int64_t>(write_ret);
-  }
-
- private:
-  uint8_t *message_;
-  size_t size_;
-};
-
 class DataHandler {
  public:
   DataHandler(std::shared_ptr<core::ProcessSessionFactory> sessionFactory) // NOLINT
diff --git a/extensions/standard-processors/processors/HashContent.cpp b/extensions/standard-processors/processors/HashContent.cpp
index f2237a321..58007fe30 100644
--- a/extensions/standard-processors/processors/HashContent.cpp
+++ b/extensions/standard-processors/processors/HashContent.cpp
@@ -25,6 +25,7 @@
 #include <memory>
 #include <set>
 #include <string>
+#include <utility>
 
 #include "HashContent.h"
 #include "core/ProcessContext.h"
@@ -91,28 +92,20 @@ void HashContent::onTrigger(core::ProcessContext *, core::ProcessSession *sessio
   }
 
   logger_->log_trace("attempting read");
-  ReadCallback cb(flowFile, *this);
-  session->read(flowFile, &cb);
-  session->transfer(flowFile, Success);
-}
-
-int64_t HashContent::ReadCallback::process(const std::shared_ptr<io::BaseStream>& stream) {
-  // This throws in case algo is not found, but that's fine
-  parent_.logger_->log_trace("Searching for %s", parent_.algoName_);
-  auto algo = HashAlgos.at(parent_.algoName_);
+  session->read(flowFile, [&flowFile, this](const std::shared_ptr<io::BaseStream>& stream) {
+    // This throws in case algo is not found, but that's fine
+    logger_->log_trace("Searching for %s", algoName_);
+    auto algo = HashAlgos.at(algoName_);
 
-  const auto& ret_val = algo(stream);
+    const auto& ret_val = algo(stream);
 
-  flowFile_->setAttribute(parent_.attrKey_, ret_val.first);
+    flowFile->setAttribute(attrKey_, ret_val.first);
 
-  return ret_val.second;
+    return ret_val.second;
+  });
+  session->transfer(flowFile, Success);
 }
 
-HashContent::ReadCallback::ReadCallback(std::shared_ptr<core::FlowFile> flowFile, const HashContent& parent)
-  : flowFile_(flowFile),
-    parent_(parent)
-  {}
-
 REGISTER_RESOURCE(HashContent,"HashContent calculates the checksum of the content of the flowfile and adds it as an attribute. Configuration options exist to select hashing algorithm and set the name of the attribute."); // NOLINT
 
 }  // namespace processors
diff --git a/extensions/standard-processors/processors/HashContent.h b/extensions/standard-processors/processors/HashContent.h
index 66646999c..ce0089f22 100644
--- a/extensions/standard-processors/processors/HashContent.h
+++ b/extensions/standard-processors/processors/HashContent.h
@@ -153,17 +153,6 @@ class HashContent : public core::Processor {
   //! Initialize, over write by NiFi HashContent
   void initialize() override;
 
-  class ReadCallback : public InputStreamCallback {
-   public:
-    ReadCallback(std::shared_ptr<core::FlowFile> flowFile, const HashContent& parent);
-    ~ReadCallback() override = default;
-    int64_t process(const std::shared_ptr<io::BaseStream>& stream) override;
-
-   private:
-    std::shared_ptr<core::FlowFile> flowFile_;
-    const HashContent& parent_;
-  };
-
  private:
   core::annotation::Input getInputRequirement() const override {
     return core::annotation::Input::INPUT_REQUIRED;
diff --git a/extensions/standard-processors/processors/LogAttribute.cpp b/extensions/standard-processors/processors/LogAttribute.cpp
index 6acffb933..3f3f8ab3a 100644
--- a/extensions/standard-processors/processors/LogAttribute.cpp
+++ b/extensions/standard-processors/processors/LogAttribute.cpp
@@ -144,14 +144,13 @@ void LogAttribute::onTrigger(const std::shared_ptr<core::ProcessContext> &contex
     }
     if (logPayload && flow->getSize() <= 1024 * 1024) {
       message << "\n" << "Payload:" << "\n";
-      ReadCallback callback(logger_, gsl::narrow<size_t>(flow->getSize()));
-      session->read(flow, &callback);
+      const auto read_result = session->readBuffer(flow);
 
       std::string printable_payload;
       if (hexencode_) {
-        printable_payload = utils::StringUtils::to_hex(callback.buffer_);
+        printable_payload = utils::StringUtils::to_hex(read_result.buffer);
       } else {
-        printable_payload = utils::span_to<std::string>(gsl::make_span(callback.buffer_).as_span<char>());
+        printable_payload = to_string(read_result);
       }
 
       if (max_line_length_ == 0U) {
diff --git a/extensions/standard-processors/processors/LogAttribute.h b/extensions/standard-processors/processors/LogAttribute.h
index f7f25da4d..92b30c0f4 100644
--- a/extensions/standard-processors/processors/LogAttribute.h
+++ b/extensions/standard-processors/processors/LogAttribute.h
@@ -95,25 +95,6 @@ class LogAttribute : public core::Processor {
       return false;
     }
   }
-  // Nest Callback Class for read stream
-  class ReadCallback : public InputStreamCallback {
-   public:
-    ReadCallback(std::shared_ptr<core::logging::Logger> logger, size_t size)
-        : logger_(std::move(logger))
-        , buffer_(size)  {
-    }
-    int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
-      if (buffer_.empty()) return 0U;
-      const auto ret = stream->read(buffer_);
-      if (ret != buffer_.size()) {
-        logger_->log_error("%zu bytes were requested from the stream but %zu bytes were read. Rolling back.", buffer_.size(), size_t{ret});
-        throw Exception(PROCESSOR_EXCEPTION, "Failed to read the entire FlowFile.");
-      }
-      return gsl::narrow<int64_t>(buffer_.size());
-    }
-    std::shared_ptr<core::logging::Logger> logger_;
-    std::vector<std::byte> buffer_;
-  };
 
  public:
   void onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &factory) override;
diff --git a/extensions/standard-processors/processors/PutFile.cpp b/extensions/standard-processors/processors/PutFile.cpp
index 1e2896306..c6586f331 100644
--- a/extensions/standard-processors/processors/PutFile.cpp
+++ b/extensions/standard-processors/processors/PutFile.cpp
@@ -243,7 +243,7 @@ bool PutFile::putFile(core::ProcessSession *session, std::shared_ptr<core::FlowF
 
   if (flowFile->getSize() > 0) {
     ReadCallback cb(tmpFile, destFile);
-    session->read(flowFile, &cb);
+    session->read(flowFile, std::ref(cb));
     logger_->log_debug("Committing %s", destFile);
     success = cb.commit();
   } else {
@@ -314,7 +314,7 @@ PutFile::ReadCallback::ReadCallback(std::string tmp_file, std::string dest_file)
 }
 
 // Copy the entire file contents to the temporary file
-int64_t PutFile::ReadCallback::process(const std::shared_ptr<io::BaseStream>& stream) {
+int64_t PutFile::ReadCallback::operator()(const std::shared_ptr<io::BaseStream>& stream) {
   // Copy file contents into tmp file
   write_succeeded_ = false;
   size_t size = 0;
diff --git a/extensions/standard-processors/processors/PutFile.h b/extensions/standard-processors/processors/PutFile.h
index 31899e741..959da1cbe 100644
--- a/extensions/standard-processors/processors/PutFile.h
+++ b/extensions/standard-processors/processors/PutFile.h
@@ -79,11 +79,11 @@ class PutFile : public core::Processor {
   void onTrigger(core::ProcessContext *context, core::ProcessSession *session) override;
   void initialize() override;
 
-  class ReadCallback : public InputStreamCallback {
+  class ReadCallback {
    public:
     ReadCallback(std::string tmp_file, std::string dest_file);
-    ~ReadCallback() override;
-    int64_t process(const std::shared_ptr<io::BaseStream>& stream) override;
+    ~ReadCallback();
+    int64_t operator()(const std::shared_ptr<io::BaseStream>& stream);
     bool commit();
 
    private:
diff --git a/extensions/standard-processors/processors/ReplaceText.cpp b/extensions/standard-processors/processors/ReplaceText.cpp
index 4c9b5608d..cfd7daba1 100644
--- a/extensions/standard-processors/processors/ReplaceText.cpp
+++ b/extensions/standard-processors/processors/ReplaceText.cpp
@@ -175,43 +175,13 @@ ReplaceText::Parameters ReplaceText::readParameters(const std::shared_ptr<core::
   return parameters;
 }
 
-namespace {
-
-struct ReadFlowFileIntoBuffer : public InputStreamCallback {
-  std::vector<std::byte> buffer_;
-
-  int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
-    buffer_.resize(stream->size());
-    size_t bytes_read = stream->read(buffer_);
-    return io::isError(bytes_read) ? -1 : gsl::narrow<int64_t>(bytes_read);
-  }
-};
-
-struct WriteBufferToFlowFile : public OutputStreamCallback {
-  const std::vector<uint8_t>& buffer_;
-
-  explicit WriteBufferToFlowFile(const std::vector<uint8_t>& buffer) : buffer_(buffer) {}
-
-  int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
-    size_t bytes_written = stream->write(buffer_, buffer_.size());
-    return io::isError(bytes_written) ? -1 : gsl::narrow<int64_t>(bytes_written);
-  }
-};
-
-}  // namespace
-
 void ReplaceText::replaceTextInEntireFile(const std::shared_ptr<core::FlowFile>& flow_file, const std::shared_ptr<core::ProcessSession>& session, const Parameters& parameters) const {
   gsl_Expects(flow_file);
   gsl_Expects(session);
 
   try {
-    const auto read_result = session->readBuffer(flow_file);
-    std::string output = applyReplacements(to_string(read_result), flow_file, parameters);
-    std::vector<uint8_t> modified_text{output.begin(), output.end()};
-
-    WriteBufferToFlowFile write_callback{modified_text};
-    session->write(flow_file, &write_callback);
-
+    const auto input = to_string(session->readBuffer(flow_file));
+    session->writeBuffer(flow_file, applyReplacements(input, flow_file, parameters));
     session->transfer(flow_file, Success);
   } catch (const Exception& exception) {
     logger_->log_error("Error in ReplaceText (Entire text mode): %s", exception.what());
@@ -239,7 +209,7 @@ void ReplaceText::replaceTextLineByLine(const std::shared_ptr<core::FlowFile>& f
       }
       throw Exception{PROCESSOR_EXCEPTION, utils::StringUtils::join_pack("Unsupported ", LineByLineEvaluationMode.getName(), ": ", line_by_line_evaluation_mode_.toString())};
     }};
-    session->readWrite(flow_file, &read_write_callback);
+    session->readWrite(flow_file, std::move(read_write_callback));
     session->transfer(flow_file, Success);
   } catch (const Exception& exception) {
     logger_->log_error("Error in ReplaceText (Line-by-Line mode): %s", exception.what());
diff --git a/extensions/standard-processors/processors/RouteText.cpp b/extensions/standard-processors/processors/RouteText.cpp
index 3c97339f0..c14632ede 100644
--- a/extensions/standard-processors/processors/RouteText.cpp
+++ b/extensions/standard-processors/processors/RouteText.cpp
@@ -140,14 +140,14 @@ void RouteText::onSchedule(core::ProcessContext* context, core::ProcessSessionFa
   context->getProperty(GroupingFallbackValue.getName(), group_fallback_);
 }
 
-class RouteText::ReadCallback : public InputStreamCallback {
+class RouteText::ReadCallback {
   using Fn = std::function<void(Segment)>;
 
  public:
   ReadCallback(Segmentation segmentation, size_t file_size, Fn&& fn)
     : segmentation_(segmentation), file_size_(file_size), fn_(std::move(fn)) {}
 
-  int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
+  int64_t operator()(const std::shared_ptr<io::BaseStream>& stream) const {
     std::vector<std::byte> buffer;
     buffer.resize(file_size_);
     size_t ret = stream->read(buffer);
@@ -364,7 +364,7 @@ void RouteText::onTrigger(core::ProcessContext *context, core::ProcessSession *s
     }
     throw Exception(PROCESSOR_EXCEPTION, "Unknown routing strategy");
   });
-  session->read(flow_file, &callback);
+  session->read(flow_file, std::move(callback));
 
   for (const auto& [route, content] : flow_file_contents) {
     auto new_flow_file = session->create(flow_file);
diff --git a/extensions/standard-processors/processors/TailFile.cpp b/extensions/standard-processors/processors/TailFile.cpp
index 2e065a6ba..ef0f00307 100644
--- a/extensions/standard-processors/processors/TailFile.cpp
+++ b/extensions/standard-processors/processors/TailFile.cpp
@@ -211,7 +211,7 @@ void openFile(const std::string &file_name, uint64_t offset, std::ifstream &inpu
 
 constexpr std::size_t BUFFER_SIZE = 4096;
 
-class FileReaderCallback : public OutputStreamCallback {
+class FileReaderCallback {
  public:
   FileReaderCallback(const std::string &file_name,
                      uint64_t offset,
@@ -222,7 +222,7 @@ class FileReaderCallback : public OutputStreamCallback {
     openFile(file_name, offset, input_stream_, logger_);
   }
 
-  int64_t process(const std::shared_ptr<io::BaseStream>& output_stream) override {
+  int64_t operator()(const std::shared_ptr<io::BaseStream>& output_stream) {
     io::CRCStream<io::BaseStream> crc_stream{gsl::make_not_null(output_stream.get()), checksum_};
 
     uint64_t num_bytes_written = 0;
@@ -282,7 +282,7 @@ class FileReaderCallback : public OutputStreamCallback {
   bool latest_flow_file_ends_with_delimiter_ = true;
 };
 
-class WholeFileReaderCallback : public OutputStreamCallback {
+class WholeFileReaderCallback {
  public:
   WholeFileReaderCallback(const std::string &file_name,
                           uint64_t offset,
@@ -295,7 +295,7 @@ class WholeFileReaderCallback : public OutputStreamCallback {
     return checksum_;
   }
 
-  int64_t process(const std::shared_ptr<io::BaseStream>& output_stream) override {
+  int64_t operator()(const std::shared_ptr<io::BaseStream>& output_stream) {
     std::array<char, BUFFER_SIZE> buffer;
 
     io::CRCStream<io::BaseStream> crc_stream{gsl::make_not_null(output_stream.get()), checksum_};
@@ -796,7 +796,7 @@ void TailFile::processSingleFile(const std::shared_ptr<core::ProcessSession> &se
 
     while (file_reader.hasMoreToRead()) {
       auto flow_file = session->create();
-      session->write(flow_file, &file_reader);
+      session->write(flow_file, std::ref(file_reader));
 
       if (file_reader.useLatestFlowFile()) {
         updateFlowFileAttributes(full_file_name, state_copy, fileName, baseName, extension, flow_file);
@@ -816,7 +816,7 @@ void TailFile::processSingleFile(const std::shared_ptr<core::ProcessSession> &se
   } else {
     WholeFileReaderCallback file_reader{full_file_name, state.position_, state.checksum_};
     auto flow_file = session->create();
-    session->write(flow_file, &file_reader);
+    session->write(flow_file, std::ref(file_reader));
 
     updateFlowFileAttributes(full_file_name, state, fileName, baseName, extension, flow_file);
     session->transfer(flow_file, Success);
diff --git a/extensions/standard-processors/tests/unit/DefragmentTextTests.cpp b/extensions/standard-processors/tests/unit/DefragmentTextTests.cpp
index 5398434a9..1f7c4e1e0 100644
--- a/extensions/standard-processors/tests/unit/DefragmentTextTests.cpp
+++ b/extensions/standard-processors/tests/unit/DefragmentTextTests.cpp
@@ -319,8 +319,6 @@ class FragmentGenerator : public core::Processor {
   void onTrigger(core::ProcessContext*, core::ProcessSession* session) override {
     std::vector<core::FlowFile> flow_files;
     for (const size_t max_i = i_ + batch_size_; i_ < fragment_contents_.size() && i_ < max_i; ++i_) {
-      auto& fragment_content = fragment_contents_[i_];
-      WriteCallback callback(fragment_content);
       std::shared_ptr<core::FlowFile> flow_file = session->create();
       if (base_name_attribute_)
         flow_file->addAttribute(textfragmentutils::BASE_NAME_ATTRIBUTE, *base_name_attribute_);
@@ -329,8 +327,9 @@ class FragmentGenerator : public core::Processor {
       if (absolute_path_attribute_)
         flow_file->addAttribute(core::SpecialFlowAttribute::ABSOLUTE_PATH, *absolute_path_attribute_);
       flow_file->addAttribute(textfragmentutils::OFFSET_ATTRIBUTE, std::to_string(offset_));
+      auto& fragment_content = fragment_contents_[i_];
       offset_ += fragment_content.size();
-      session->write(flow_file, &callback);
+      session->writeBuffer(flow_file, fragment_content);
       session->transfer(flow_file, Success);
     }
   }
@@ -347,17 +346,6 @@ class FragmentGenerator : public core::Processor {
   void clearBaseNameAttribute() { base_name_attribute_.reset(); }
 
  protected:
-  struct WriteCallback : public org::apache::nifi::minifi::OutputStreamCallback {
-    const gsl::span<const uint8_t> content_;
-
-    explicit WriteCallback(const std::string& content) : content_(reinterpret_cast<const uint8_t*>(content.data()), content.size()) {}
-
-    int64_t process(const std::shared_ptr<org::apache::nifi::minifi::io::BaseStream> &stream) override {
-      size_t bytes_written = stream->write(content_.begin(), content_.size());
-      return org::apache::nifi::minifi::io::isError(bytes_written) ? -1 : gsl::narrow<int64_t>(bytes_written);
-    }
-  };
-
   size_t offset_ = 0;
   size_t batch_size_ = 1;
   size_t i_ = 0;
diff --git a/extensions/usb-camera/GetUSBCamera.cpp b/extensions/usb-camera/GetUSBCamera.cpp
index 42e21f553..279037848 100644
--- a/extensions/usb-camera/GetUSBCamera.cpp
+++ b/extensions/usb-camera/GetUSBCamera.cpp
@@ -109,25 +109,18 @@ void GetUSBCamera::onFrame(uvc_frame_t *frame, void *ptr) {
     flow_file->getAttribute("filename", flow_file_name);
     cb_data->logger->log_info("Created flow file: %s", flow_file_name);
 
-    // Initialize callback according to output format
-    std::shared_ptr<OutputStreamCallback> write_cb;
-
-    if (cb_data->format == "PNG") {
-      write_cb = std::make_shared<GetUSBCamera::PNGWriteCallback>(cb_data->png_write_mtx,
-                                                                  cb_data->frame_buffer,
-                                                                  cb_data->device_width,
-                                                                  cb_data->device_height);
-    } else if (cb_data->format == "RAW") {
-      write_cb = std::make_shared<GetUSBCamera::RawWriteCallback>(cb_data->frame_buffer);
+    if (cb_data->format == "RAW") {
+      session->writeBuffer(flow_file, gsl::make_span(static_cast<const std::byte*>(cb_data->frame_buffer->data), cb_data->frame_buffer->data_bytes));
     } else {
-      cb_data->logger->log_warn("Invalid format specified (%s); defaulting to PNG", cb_data->format);
-      write_cb = std::make_shared<GetUSBCamera::PNGWriteCallback>(cb_data->png_write_mtx,
-                                                                  cb_data->frame_buffer,
-                                                                  cb_data->device_width,
-                                                                  cb_data->device_height);
+      if (cb_data->format != "PNG") {
+        cb_data->logger->log_warn("Invalid format specified (%s); defaulting to PNG", cb_data->format);
+      }
+      session->write(flow_file, GetUSBCamera::PNGWriteCallback{
+          cb_data->png_write_mtx,
+          cb_data->frame_buffer,
+          cb_data->device_width,
+          cb_data->device_height});
     }
-
-    session->write(flow_file, write_cb.get());
     session->transfer(flow_file, GetUSBCamera::Success);
     session->commit();
   } catch (std::exception &exception) {
@@ -407,7 +400,7 @@ GetUSBCamera::PNGWriteCallback::PNGWriteCallback(std::shared_ptr<std::mutex> wri
       height_(height) {
 }
 
-int64_t GetUSBCamera::PNGWriteCallback::process(const std::shared_ptr<io::BaseStream>& stream) {
+int64_t GetUSBCamera::PNGWriteCallback::operator()(const std::shared_ptr<io::BaseStream>& stream) {
   std::lock_guard<std::mutex> lock(*png_write_mtx_);
   logger_->log_info("Writing %d bytes of raw capture data to PNG output", frame_->data_bytes);
   png_structp png = png_create_write_struct(PNG_LIBPNG_VER_STRING, nullptr, nullptr, nullptr);
@@ -467,16 +460,6 @@ int64_t GetUSBCamera::PNGWriteCallback::process(const std::shared_ptr<io::BaseSt
   return io::isError(write_ret) ? -1 : gsl::narrow<int64_t>(write_ret);
 }
 
-GetUSBCamera::RawWriteCallback::RawWriteCallback(uvc_frame_t *frame)
-    : frame_(frame) {
-}
-
-int64_t GetUSBCamera::RawWriteCallback::process(const std::shared_ptr<io::BaseStream>& stream) {
-  logger_->log_info("Writing %d bytes of raw capture data", frame_->data_bytes);
-  const auto write_ret = stream->write(reinterpret_cast<uint8_t*>(frame_->data), frame_->data_bytes);
-  return io::isError(write_ret) ? -1 : gsl::narrow<int64_t>(write_ret);
-}
-
 REGISTER_RESOURCE(GetUSBCamera, "Gets images from USB Video Class (UVC)-compatible devices. Outputs one flow file per frame at the rate specified by the FPS property in the format specified by the Format property.");  // NOLINT line length
 
 } /* namespace processors */
diff --git a/extensions/usb-camera/GetUSBCamera.h b/extensions/usb-camera/GetUSBCamera.h
index 49da8c64f..047fe1ec3 100644
--- a/extensions/usb-camera/GetUSBCamera.h
+++ b/extensions/usb-camera/GetUSBCamera.h
@@ -93,10 +93,10 @@ class GetUSBCamera : public core::Processor {
   static void onFrame(uvc_frame_t *frame, void *ptr);
 
   // Write callback for storing camera capture data in PNG format
-  class PNGWriteCallback : public OutputStreamCallback {
+  class PNGWriteCallback {
    public:
     PNGWriteCallback(std::shared_ptr<std::mutex> write_mtx, uvc_frame_t *frame, uint32_t width, uint32_t height);
-    int64_t process(const std::shared_ptr<io::BaseStream>& stream) override;
+    int64_t operator()(const std::shared_ptr<io::BaseStream>& stream);
 
    private:
     std::shared_ptr<std::mutex> png_write_mtx_;
@@ -107,17 +107,6 @@ class GetUSBCamera : public core::Processor {
     std::shared_ptr<core::logging::Logger> logger_ = core::logging::LoggerFactory<PNGWriteCallback>::getLogger();
   };
 
-  // Write callback for storing camera capture data as a raw RGB pixel buffer
-  class RawWriteCallback : public OutputStreamCallback {
-   public:
-    explicit RawWriteCallback(uvc_frame_t *frame);
-    int64_t process(const std::shared_ptr<io::BaseStream>& stream) override;
-
-   private:
-    uvc_frame_t *frame_;
-    std::shared_ptr<core::logging::Logger> logger_ = core::logging::LoggerFactory<RawWriteCallback>::getLogger();
-  };
-
  private:
   std::shared_ptr<core::logging::Logger> logger_ = core::logging::LoggerFactory<GetUSBCamera>::getLogger();
   static std::shared_ptr<utils::IdGenerator> id_generator_;
diff --git a/extensions/windows-event-log/CollectorInitiatedSubscription.cpp b/extensions/windows-event-log/CollectorInitiatedSubscription.cpp
index 69f53402d..434036e75 100644
--- a/extensions/windows-event-log/CollectorInitiatedSubscription.cpp
+++ b/extensions/windows-event-log/CollectorInitiatedSubscription.cpp
@@ -627,29 +627,13 @@ void CollectorInitiatedSubscription::unsubscribe() {
 }
 
 int CollectorInitiatedSubscription::processQueue(const std::shared_ptr<core::ProcessSession> &session) {
-  struct WriteCallback: public OutputStreamCallback {
-    explicit WriteCallback(const std::string& str)
-      : str_(&str) {
-    }
-
-    int64_t process(const std::shared_ptr<io::BaseStream>& stream) {
-      const auto write_ret = stream->write(reinterpret_cast<const uint8_t*>(str_->data()), str_->size());
-      return io::isError(write_ret) ? -1 : gsl::narrow<int64_t>(write_ret);
-    }
-
-    gsl::not_null<const std::string*> str_;
-  };
-
   int flowFileCount = 0;
 
   std::string xml;
   while (renderedXMLs_.try_dequeue(xml)) {
     auto flowFile = session->create();
 
-    {
-      WriteCallback wc{ xml };
-      session->write(flowFile, &wc);
-    }
+    session->writeBuffer(flowFile, xml);
     session->putAttribute(flowFile, core::SpecialFlowAttribute::MIME_TYPE, "application/xml");
     session->getProvenanceReporter()->receive(flowFile, provenanceUri_, getUUIDStr(), "Consume windows event logs", 0ms);
     session->transfer(flowFile, s_success);
diff --git a/extensions/windows-event-log/ConsumeWindowsEventLog.cpp b/extensions/windows-event-log/ConsumeWindowsEventLog.cpp
index a2c8e7e6b..7a0033340 100644
--- a/extensions/windows-event-log/ConsumeWindowsEventLog.cpp
+++ b/extensions/windows-event-log/ConsumeWindowsEventLog.cpp
@@ -693,24 +693,8 @@ void ConsumeWindowsEventLog::refreshTimeZoneData() {
 }
 
 void ConsumeWindowsEventLog::putEventRenderFlowFileToSession(const EventRender& eventRender, core::ProcessSession& session) const {
-  struct WriteCallback : public OutputStreamCallback {
-    explicit WriteCallback(const std::string& str)
-        : str_(str) {
-    }
-
-    int64_t process(const std::shared_ptr<io::BaseStream>& stream) {
-      const auto write_ret = stream->write(reinterpret_cast<const uint8_t*>(str_.c_str()), str_.size());
-      return io::isError(write_ret) ? -1 : gsl::narrow<int64_t>(write_ret);
-    }
-
-    const std::string& str_;
-  };
-
   auto commitFlowFile = [&] (const std::shared_ptr<core::FlowFile>& flowFile, const std::string& content, const std::string& mimeType) {
-    {
-      WriteCallback wc{ content };
-      session.write(flowFile, &wc);
-    }
+    session.writeBuffer(flowFile, content);
     session.putAttribute(flowFile, core::SpecialFlowAttribute::MIME_TYPE, mimeType);
     session.putAttribute(flowFile, "Timezone name", timezone_name_);
     session.putAttribute(flowFile, "Timezone offset", timezone_offset_);
diff --git a/libminifi/include/core/ProcessSession.h b/libminifi/include/core/ProcessSession.h
index 171c446a6..ed8ba3355 100644
--- a/libminifi/include/core/ProcessSession.h
+++ b/libminifi/include/core/ProcessSession.h
@@ -95,24 +95,21 @@ class ProcessSession : public ReferenceContainer {
   // Remove Flow File
   void remove(const std::shared_ptr<core::FlowFile> &flow);
   // Execute the given read callback against the content
-  int64_t read(const std::shared_ptr<core::FlowFile> &flow, InputStreamCallback *callback);
-  int64_t read(const std::shared_ptr<core::FlowFile> &flow, InputStreamCallback&& callback) {
-    return read(flow, &callback);
-  }
+  int64_t read(const std::shared_ptr<core::FlowFile> &flow, const io::InputStreamCallback& callback);
   // Read content into buffer
   detail::ReadBufferResult readBuffer(const std::shared_ptr<core::FlowFile>& flow);
   // Execute the given write callback against the content
-  void write(const std::shared_ptr<core::FlowFile> &flow, OutputStreamCallback *callback);
-
-  void write(const std::shared_ptr<core::FlowFile> &flow, OutputStreamCallback&& callback) {
-    return write(flow, &callback);
-  }
+  void write(const std::shared_ptr<core::FlowFile> &flow, const io::OutputStreamCallback& callback);
   // Read and write the flow file at the same time (eg. for processing it line by line)
-  int64_t readWrite(const std::shared_ptr<core::FlowFile> &flow, InputOutputStreamCallback *callback);
+  int64_t readWrite(const std::shared_ptr<core::FlowFile> &flow, const io::InputOutputStreamCallback& callback);
   // Replace content with buffer
   void writeBuffer(const std::shared_ptr<core::FlowFile>& flow_file, gsl::span<const char> buffer);
+  void writeBuffer(const std::shared_ptr<core::FlowFile>& flow_file, gsl::span<const std::byte> buffer);
   // Execute the given write/append callback against the content
-  void append(const std::shared_ptr<core::FlowFile> &flow, OutputStreamCallback *callback);
+  void append(const std::shared_ptr<core::FlowFile> &flow, const io::OutputStreamCallback& callback);
+  // Append buffer to content
+  void appendBuffer(const std::shared_ptr<core::FlowFile>& flow, gsl::span<const char> buffer);
+  void appendBuffer(const std::shared_ptr<core::FlowFile>& flow, gsl::span<const std::byte> buffer);
   // Penalize the flow
   void penalize(const std::shared_ptr<core::FlowFile> &flow);
 
diff --git a/libminifi/include/core/ProcessSessionReadCallback.h b/libminifi/include/core/ProcessSessionReadCallback.h
index 00a6f93af..19a57c8c5 100644
--- a/libminifi/include/core/ProcessSessionReadCallback.h
+++ b/libminifi/include/core/ProcessSessionReadCallback.h
@@ -32,12 +32,12 @@ namespace apache {
 namespace nifi {
 namespace minifi {
 namespace core {
-class ProcessSessionReadCallback : public InputStreamCallback {
+class ProcessSessionReadCallback {
  public:
   ProcessSessionReadCallback(const std::string &tmpFile, const std::string &destFile,
       std::shared_ptr<logging::Logger> logger);
   ~ProcessSessionReadCallback();
-  virtual int64_t process(const std::shared_ptr<io::BaseStream>& stream);
+  int64_t operator()(const std::shared_ptr<io::BaseStream>& stream);
   bool commit();
 
  private:
diff --git a/libminifi/src/serialization/PayloadSerializer.cpp b/libminifi/include/io/StreamCallback.h
similarity index 57%
copy from libminifi/src/serialization/PayloadSerializer.cpp
copy to libminifi/include/io/StreamCallback.h
index 8bfa1128a..0116ea746 100644
--- a/libminifi/src/serialization/PayloadSerializer.cpp
+++ b/libminifi/include/io/StreamCallback.h
@@ -1,5 +1,4 @@
 /**
- *
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -15,21 +14,19 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+#pragma once
+
+#include <functional>
+#include <memory>
 
-#include "serialization/PayloadSerializer.h"
-#include "core/ProcessSession.h"
+namespace org::apache::nifi::minifi::io {
 
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
+class BaseStream;
 
-int64_t PayloadSerializer::serialize(const std::shared_ptr<core::FlowFile>& flowFile, const std::shared_ptr<io::OutputStream>& out) {
-  InputStreamPipe pipe(out);
-  return reader_(flowFile, &pipe);
-}
+// FlowFile IO Callback functions for input and output
+// throw exception for error
+using InputStreamCallback = std::function<int64_t(const std::shared_ptr<BaseStream>& input_stream)>;
+using OutputStreamCallback = std::function<int64_t(const std::shared_ptr<BaseStream>& output_stream)>;
+using InputOutputStreamCallback = std::function<int64_t(const std::shared_ptr<BaseStream>& input_stream, const std::shared_ptr<BaseStream>& output_stream)>;
 
-} /* namespace minifi */
-} /* namespace nifi */
-} /* namespace apache */
-} /* namespace org */
+}  // namespace org::apache::nifi::minifi::io
diff --git a/libminifi/include/io/StreamPipe.h b/libminifi/include/io/StreamPipe.h
index ab7236759..1e3a23a37 100644
--- a/libminifi/include/io/StreamPipe.h
+++ b/libminifi/include/io/StreamPipe.h
@@ -20,69 +20,26 @@
 
 #pragma once
 
+#include <functional>
 #include <memory>
 #include <utility>
 #include "BaseStream.h"
+#include "StreamCallback.h"
 
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-
-// FlowFile IO Callback functions for input and output
-// throw exception for error
-class InputStreamCallback {
- public:
-  virtual ~InputStreamCallback() = default;
-
-  virtual int64_t process(const std::shared_ptr<io::BaseStream>& stream) = 0;
-};
-class OutputStreamCallback {
- public:
-  virtual ~OutputStreamCallback() = default;
-  virtual int64_t process(const std::shared_ptr<io::BaseStream>& stream) = 0;
-};
-class InputOutputStreamCallback {
- public:
-  virtual ~InputOutputStreamCallback() = default;
-  virtual int64_t process(const std::shared_ptr<io::BaseStream>& input, const std::shared_ptr<io::BaseStream>& output) = 0;
-};
-
-class FunctionOutputStreamCallback : public OutputStreamCallback {
- public:
-  explicit FunctionOutputStreamCallback(std::function<int64_t(const std::shared_ptr<io::OutputStream>&)> fn) : fn_(std::move(fn)) {}
-
-  int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
-    return fn_(stream);
-  }
- private:
-  std::function<int64_t(const std::shared_ptr<io::OutputStream>&)> fn_;
-};
-
-class FunctionInputStreamCallback : public InputStreamCallback {
- public:
-  explicit FunctionInputStreamCallback(std::function<int64_t(const std::shared_ptr<io::InputStream>&)> fn) : fn_(std::move(fn)) {}
-
-  int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
-    return fn_(stream);
-  }
- private:
-  std::function<int64_t(const std::shared_ptr<io::InputStream>&)> fn_;
-};
-
+namespace org::apache::nifi::minifi {
 namespace internal {
 
-inline int64_t pipe(io::InputStream* src, io::OutputStream* dst) {
+inline int64_t pipe(io::InputStream& src, io::OutputStream& dst) {
   std::array<std::byte, 4096> buffer{};
   int64_t totalTransferred = 0;
   while (true) {
-    const auto readRet = src->read(buffer);
+    const auto readRet = src.read(buffer);
     if (io::isError(readRet)) return -1;
     if (readRet == 0) break;
     auto remaining = readRet;
     int transferred = 0;
     while (remaining > 0) {
-      const auto writeRet = dst->write(gsl::make_span(buffer).subspan(transferred, remaining));
+      const auto writeRet = dst.write(gsl::make_span(buffer).subspan(transferred, remaining));
       // TODO(adebreceni):
       //   write might return 0, e.g. in case of a congested server
       //   what should we return then?
@@ -99,39 +56,30 @@ inline int64_t pipe(io::InputStream* src, io::OutputStream* dst) {
   return totalTransferred;
 }
 
-inline int64_t pipe(const std::shared_ptr<io::InputStream>& src, const std::shared_ptr<io::OutputStream>& dst) {
-  return pipe(src.get(), dst.get());
-}
-
 }  // namespace internal
 
-class InputStreamPipe : public InputStreamCallback {
+class InputStreamPipe {
  public:
-  explicit InputStreamPipe(std::shared_ptr<io::OutputStream> output) : output_(std::move(output)) {}
+  explicit InputStreamPipe(io::OutputStream& output) : output_(&output) {}
 
-  int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
-    return internal::pipe(stream, output_);
+  int64_t operator()(const std::shared_ptr<io::BaseStream>& stream) const {
+    return internal::pipe(*stream, *output_);
   }
 
  private:
-  std::shared_ptr<io::OutputStream> output_;
+  gsl::not_null<io::OutputStream*> output_;
 };
 
-class OutputStreamPipe : public OutputStreamCallback {
+class OutputStreamPipe {
  public:
-  explicit OutputStreamPipe(std::shared_ptr<io::InputStream> input) : input_(std::move(input)) {}
+  explicit OutputStreamPipe(io::InputStream& input) : input_(&input) {}
 
-  int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
-    return internal::pipe(input_, stream);
+  int64_t operator()(const std::shared_ptr<io::BaseStream>& stream) const {
+    return internal::pipe(*input_, *stream);
   }
 
  private:
-  std::shared_ptr<io::InputStream> input_;
+  gsl::not_null<io::InputStream*> input_;
 };
 
-
-
-}  // namespace minifi
-}  // namespace nifi
-}  // namespace apache
-}  // namespace org
+}  // namespace org::apache::nifi::minifi
diff --git a/libminifi/include/serialization/FlowFileSerializer.h b/libminifi/include/serialization/FlowFileSerializer.h
index 539306873..6948a0042 100644
--- a/libminifi/include/serialization/FlowFileSerializer.h
+++ b/libminifi/include/serialization/FlowFileSerializer.h
@@ -21,6 +21,7 @@
 #include <memory>
 #include <utility>
 #include <functional>
+#include "io/StreamCallback.h"
 
 namespace org {
 namespace apache {
@@ -38,11 +39,9 @@ class FlowFile;
 
 } /* namespace core */
 
-class InputStreamCallback;
-
 class FlowFileSerializer {
  public:
-  using FlowFileReader = std::function<int64_t(const std::shared_ptr<core::FlowFile>&, InputStreamCallback*)>;
+  using FlowFileReader = std::function<int64_t(const std::shared_ptr<core::FlowFile>&, const io::InputStreamCallback&)>;
 
   explicit FlowFileSerializer(FlowFileReader reader) : reader_(std::move(reader)) {}
 
diff --git a/libminifi/include/sitetosite/SiteToSiteClient.h b/libminifi/include/sitetosite/SiteToSiteClient.h
index 632f4c14c..be1b3b0ab 100644
--- a/libminifi/include/sitetosite/SiteToSiteClient.h
+++ b/libminifi/include/sitetosite/SiteToSiteClient.h
@@ -266,60 +266,6 @@ class SiteToSiteClient : public core::Connectable {
   std::shared_ptr<core::logging::Logger> logger_{core::logging::LoggerFactory<SiteToSiteClient>::getLogger()};
 };
 
-// Nest Callback Class for write stream
-class WriteCallback : public OutputStreamCallback {
- public:
-  WriteCallback(DataPacket *packet) // NOLINT
-      : _packet(packet) {
-  }
-  DataPacket *_packet;
-  // void process(std::ofstream *stream) {
-  int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
-    std::array<std::byte, 16384> buffer{};
-    uint64_t len = _packet->_size;
-    uint64_t total = 0;
-    while (len > 0) {
-      const auto size = std::min(len, uint64_t{16384});
-      const auto ret = _packet->transaction_->getStream().read(buffer);
-      if (ret != size) {
-        core::logging::LOG_ERROR(_packet->logger_reference_) << "Site2Site Receive Flow Size " << size << " Failed " << ret << ", should have received " << len;
-        return -1;
-      }
-      stream->write(buffer);
-      len -= size;
-      total += size;
-    }
-    core::logging::LOG_INFO(_packet->logger_reference_) << "Received " << total << " from stream";
-    return gsl::narrow<int64_t>(len);
-  }
-};
-// Nest Callback Class for read stream
-class ReadCallback : public InputStreamCallback {
- public:
-  ReadCallback(DataPacket *packet) // NOLINT
-      : _packet(packet) {
-  }
-  DataPacket *_packet;
-  int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
-    _packet->_size = 0;
-    std::array<std::byte, 8192> buffer{};
-    size_t size = 0;
-    do {
-      const auto readSize = stream->read(buffer);
-      if (readSize == 0) break;
-      if (io::isError(readSize)) return -1;
-      const auto ret = _packet->transaction_->getStream().write(gsl::make_span(buffer).subspan(0, readSize));
-      if (io::isError(ret) || gsl::narrow<size_t>(ret) != readSize) {
-        core::logging::LOG_INFO(_packet->logger_reference_) << "Site2Site Send Flow Size " << readSize << " Failed " << ret;
-        return -1;
-      }
-      size += readSize;
-    } while (size < stream->size());
-    _packet->_size = size;
-    return gsl::narrow<int64_t>(size);
-  }
-};
-
 }  // namespace sitetosite
 }  // namespace minifi
 }  // namespace nifi
diff --git a/libminifi/include/utils/ByteArrayCallback.h b/libminifi/include/utils/ByteArrayCallback.h
index d07e842da..bbf6dbb8c 100644
--- a/libminifi/include/utils/ByteArrayCallback.h
+++ b/libminifi/include/utils/ByteArrayCallback.h
@@ -31,12 +31,11 @@ namespace org::apache::nifi::minifi::utils {
 /**
  * General vector based uint8_t callback.
  */
-class ByteInputCallBack : public InputStreamCallback {
+class ByteInputCallback {
  public:
-  ByteInputCallBack() = default;
-  ~ByteInputCallBack() override = default;
+  virtual ~ByteInputCallback() = default;
 
-  int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
+  virtual int64_t operator()(const std::shared_ptr<io::BaseStream>& stream) {
     stream->seek(0);
 
     if (stream->size() > 0) {
@@ -76,13 +75,13 @@ class ByteInputCallBack : public InputStreamCallback {
  * While calls are thread safe, the class is intended to have
  * a single consumer.
  */
-class ByteOutputCallback : public OutputStreamCallback {
+class ByteOutputCallback {
  public:
   ByteOutputCallback() = delete;
 
   explicit ByteOutputCallback(size_t max_size, bool wait_on_read = false)
       : max_size_(max_size),
-        read_started_(wait_on_read ? false : true),
+        read_started_(!wait_on_read),
         logger_(core::logging::LoggerFactory<ByteOutputCallback>::getLogger()) {
     current_str_pos = 0;
     size_ = 0;
@@ -95,7 +94,7 @@ class ByteOutputCallback : public OutputStreamCallback {
     close();
   }
 
-  virtual int64_t process(const std::shared_ptr<io::BaseStream>& stream);
+  virtual int64_t operator()(const std::shared_ptr<io::BaseStream>& stream);
 
   virtual std::vector<char> to_string();
 
@@ -141,9 +140,9 @@ class StreamOutputCallback : public ByteOutputCallback {
       : ByteOutputCallback(max_size, wait_on_read) {
   }
 
-  virtual void write(char *data, size_t size);
+  void write(char *data, size_t size) override;
 
-  virtual int64_t process(const std::shared_ptr<io::BaseStream>& stream);
+  int64_t operator()(const std::shared_ptr<io::BaseStream>& stream) override;
 };
 
 }  // namespace org::apache::nifi::minifi::utils
diff --git a/libminifi/include/utils/FileReaderCallback.h b/libminifi/include/utils/FileReaderCallback.h
index a36822221..e1030a547 100644
--- a/libminifi/include/utils/FileReaderCallback.h
+++ b/libminifi/include/utils/FileReaderCallback.h
@@ -33,13 +33,13 @@ namespace utils {
 /**
  * Simple callback to read a file, to be used with ProcessSession::write().
  */
-class FileReaderCallback : public OutputStreamCallback {
+class FileReaderCallback {
  public:
-  explicit FileReaderCallback(const std::string& file_name);
-  int64_t process(const std::shared_ptr<io::BaseStream>& output_stream) override;
+  explicit FileReaderCallback(std::string file_name);
+  int64_t operator()(const std::shared_ptr<io::BaseStream>& output_stream) const;
 
  private:
-  std::ifstream input_stream_;
+  std::string file_name_;
   std::shared_ptr<core::logging::Logger> logger_;
 };
 
diff --git a/libminifi/include/utils/HTTPClient.h b/libminifi/include/utils/HTTPClient.h
index 6d07ef521..91dc71394 100644
--- a/libminifi/include/utils/HTTPClient.h
+++ b/libminifi/include/utils/HTTPClient.h
@@ -59,7 +59,7 @@ struct HTTPUploadCallback {
   }
   std::mutex mutex;
   std::atomic<bool> stop;
-  ByteInputCallBack *ptr;
+  ByteInputCallback *ptr;
   size_t pos;
 
   size_t getPos() {
diff --git a/libminifi/include/utils/JsonCallback.h b/libminifi/include/utils/JsonCallback.h
index 3fb278d75..03887704d 100644
--- a/libminifi/include/utils/JsonCallback.h
+++ b/libminifi/include/utils/JsonCallback.h
@@ -33,18 +33,19 @@ namespace nifi {
 namespace minifi {
 namespace utils {
 
-class JsonOutputCallback : public OutputStreamCallback {
+class JsonOutputCallback {
  public:
   explicit JsonOutputCallback(rapidjson::Document&& root, std::optional<uint8_t> decimal_places)
       : root_(std::move(root)), decimal_places_(decimal_places) {}
 
-  int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
+  int64_t operator()(const std::shared_ptr<io::BaseStream>& stream) const {
     rapidjson::StringBuffer buffer;
     rapidjson::Writer<rapidjson::StringBuffer> writer(buffer);
     if (decimal_places_.has_value())
       writer.SetMaxDecimalPlaces(decimal_places_.value());
     root_.Accept(writer);
-    return stream->write(reinterpret_cast<const uint8_t*>(buffer.GetString()), gsl::narrow<int>(buffer.GetSize()));
+    const auto write_return = stream->write(reinterpret_cast<const uint8_t*>(buffer.GetString()), buffer.GetSize());
+    return !io::isError(write_return) ? gsl::narrow<int64_t>(write_return) : -1;
   }
 
  protected:
@@ -52,18 +53,19 @@ class JsonOutputCallback : public OutputStreamCallback {
   std::optional<uint8_t> decimal_places_;
 };
 
-class PrettyJsonOutputCallback : public OutputStreamCallback {
+class PrettyJsonOutputCallback {
  public:
   explicit PrettyJsonOutputCallback(rapidjson::Document&& root, std::optional<uint8_t> decimal_places)
       : root_(std::move(root)), decimal_places_(decimal_places) {}
 
-  int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
+  int64_t operator()(const std::shared_ptr<io::BaseStream>& stream) const {
     rapidjson::StringBuffer buffer;
     rapidjson::PrettyWriter<rapidjson::StringBuffer> writer(buffer);
     if (decimal_places_.has_value())
       writer.SetMaxDecimalPlaces(decimal_places_.value());
     root_.Accept(writer);
-    return stream->write(reinterpret_cast<const uint8_t*>(buffer.GetString()), gsl::narrow<int>(buffer.GetSize()));
+    const auto write_return = stream->write(reinterpret_cast<const uint8_t*>(buffer.GetString()), buffer.GetSize());
+    return !io::isError(write_return) ? gsl::narrow<int64_t>(write_return) : -1;
   }
 
  protected:
diff --git a/libminifi/include/utils/LineByLineInputOutputStreamCallback.h b/libminifi/include/utils/LineByLineInputOutputStreamCallback.h
index cf0db4b8b..6538020e2 100644
--- a/libminifi/include/utils/LineByLineInputOutputStreamCallback.h
+++ b/libminifi/include/utils/LineByLineInputOutputStreamCallback.h
@@ -28,11 +28,11 @@
 
 namespace org::apache::nifi::minifi::utils {
 
-class LineByLineInputOutputStreamCallback : public InputOutputStreamCallback {
+class LineByLineInputOutputStreamCallback {
  public:
   using CallbackType = std::function<std::string(const std::string& input_line, bool is_first_line, bool is_last_line)>;
   explicit LineByLineInputOutputStreamCallback(CallbackType callback);
-  int64_t process(const std::shared_ptr<io::BaseStream>& input, const std::shared_ptr<io::BaseStream>& output) override;
+  int64_t operator()(const std::shared_ptr<io::BaseStream>& input, const std::shared_ptr<io::BaseStream>& output);
 
  private:
   int64_t readInput(io::InputStream& stream);
diff --git a/libminifi/include/utils/StringUtils.h b/libminifi/include/utils/StringUtils.h
index d43486bc2..cf8d8c691 100644
--- a/libminifi/include/utils/StringUtils.h
+++ b/libminifi/include/utils/StringUtils.h
@@ -401,6 +401,7 @@ class StringUtils {
     return to_hex(gsl::make_span(str).as_span<const std::byte>(), uppercase);
   }
 
+
   /**
    * Decodes the Base64 encoded string into data
    * @param data the output buffer where the decoded bytes will be written. Must be at least (base64_length / 4 + 1) * 3 bytes long.
diff --git a/libminifi/src/c2/C2Agent.cpp b/libminifi/src/c2/C2Agent.cpp
index dbc349ea2..d3c635a1d 100644
--- a/libminifi/src/c2/C2Agent.cpp
+++ b/libminifi/src/c2/C2Agent.cpp
@@ -669,7 +669,7 @@ C2Payload C2Agent::bundleDebugInfo(std::map<std::string, std::unique_ptr<io::Inp
     if (!archiver->newEntry({filename, file_size})) {
       throw C2DebugBundleError("Couldn't initialize archive entry for '" + filename + "'");
     }
-    if (gsl::narrow<int64_t>(file_size) != internal::pipe(stream.get(), archiver.get())) {
+    if (gsl::narrow<int64_t>(file_size) != internal::pipe(*stream, *archiver)) {
       // we have touched the input streams, they cannot be reused
       throw C2DebugBundleError("Error while writing file '" + filename + "' into the debug bundle");
     }
diff --git a/libminifi/src/core/ProcessSession.cpp b/libminifi/src/core/ProcessSession.cpp
index 1c3f0c87d..eabebf5d5 100644
--- a/libminifi/src/core/ProcessSession.cpp
+++ b/libminifi/src/core/ProcessSession.cpp
@@ -232,7 +232,7 @@ void ProcessSession::transfer(const std::shared_ptr<core::FlowFile> &flow, Relat
   flow->setDeleted(false);
 }
 
-void ProcessSession::write(const std::shared_ptr<core::FlowFile> &flow, OutputStreamCallback *callback) {
+void ProcessSession::write(const std::shared_ptr<core::FlowFile> &flow, const io::OutputStreamCallback& callback) {
   auto flow_file_equality_checker = [&flow](const auto& flow_file) { return flow == flow_file; };
   gsl_ExpectsAudit(_updatedFlowFiles.contains(flow->getUUID())
       || _addedFlowFiles.contains(flow->getUUID())
@@ -247,7 +247,7 @@ void ProcessSession::write(const std::shared_ptr<core::FlowFile> &flow, OutputSt
     if (nullptr == stream) {
       throw Exception(FILE_OPERATION_EXCEPTION, "Failed to open flowfile content for write");
     }
-    if (callback->process(stream) < 0) {
+    if (callback(stream) < 0) {
       throw Exception(FILE_OPERATION_EXCEPTION, "Failed to process flowfile content");
     }
 
@@ -269,18 +269,16 @@ void ProcessSession::write(const std::shared_ptr<core::FlowFile> &flow, OutputSt
 }
 
 void ProcessSession::writeBuffer(const std::shared_ptr<core::FlowFile>& flow_file, gsl::span<const char> buffer) {
-  struct BufferOutputStreamCallback : OutputStreamCallback {
-    explicit BufferOutputStreamCallback(gsl::span<const char> buffer) :buffer{buffer} {}
-    int64_t process(const std::shared_ptr<io::BaseStream>& stream) final {
-      return stream->write(reinterpret_cast<const uint8_t*>(buffer.data()), buffer.size());
-    }
-    gsl::span<const char> buffer;
-  };
-  BufferOutputStreamCallback cb{ buffer };
-  write(flow_file, &cb);
+  writeBuffer(flow_file, buffer.as_span<const std::byte>());
+}
+void ProcessSession::writeBuffer(const std::shared_ptr<core::FlowFile>& flow_file, gsl::span<const std::byte> buffer) {
+  write(flow_file, [buffer](const std::shared_ptr<io::BaseStream>& output_stream) {
+    const auto write_status = output_stream->write(buffer);
+    return io::isError(write_status) ? -1 : gsl::narrow<int64_t>(write_status);
+  });
 }
 
-void ProcessSession::append(const std::shared_ptr<core::FlowFile> &flow, OutputStreamCallback *callback) {
+void ProcessSession::append(const std::shared_ptr<core::FlowFile> &flow, const io::OutputStreamCallback& callback) {
   auto flow_file_equality_checker = [&flow](const auto& flow_file) { return flow == flow_file; };
   gsl_ExpectsAudit(_updatedFlowFiles.contains(flow->getUUID())
       || _addedFlowFiles.contains(flow->getUUID())
@@ -305,7 +303,7 @@ void ProcessSession::append(const std::shared_ptr<core::FlowFile> &flow, OutputS
     // this prevents an issue if we write, above, with zero length.
     if (stream_size_before_callback > 0)
       stream->seek(stream_size_before_callback + 1);
-    if (callback->process(stream) < 0) {
+    if (callback(stream) < 0) {
       throw Exception(FILE_OPERATION_EXCEPTION, "Failed to process flowfile content");
     }
     flow->setSize(flow_file_size + (stream->size() - stream_size_before_callback));
@@ -322,8 +320,17 @@ void ProcessSession::append(const std::shared_ptr<core::FlowFile> &flow, OutputS
     throw;
   }
 }
+void ProcessSession::appendBuffer(const std::shared_ptr<core::FlowFile>& flow_file, gsl::span<const char> buffer) {
+  appendBuffer(flow_file, buffer.as_span<const std::byte>());
+}
+void ProcessSession::appendBuffer(const std::shared_ptr<core::FlowFile>& flow_file, gsl::span<const std::byte> buffer) {
+  append(flow_file, [buffer](const std::shared_ptr<io::BaseStream>& output_stream) {
+    const auto write_status = output_stream->write(buffer);
+    return io::isError(write_status) ? -1 : gsl::narrow<int64_t>(write_status);
+  });
+}
 
-int64_t ProcessSession::read(const std::shared_ptr<core::FlowFile> &flow, InputStreamCallback *callback) {
+int64_t ProcessSession::read(const std::shared_ptr<core::FlowFile> &flow, const io::InputStreamCallback& callback) {
   try {
     std::shared_ptr<ResourceClaim> claim = nullptr;
 
@@ -345,7 +352,7 @@ int64_t ProcessSession::read(const std::shared_ptr<core::FlowFile> &flow, InputS
     }
 
     auto flow_file_stream = std::make_shared<io::StreamSlice>(stream, flow->getOffset(), flow->getSize());
-    auto ret = callback->process(flow_file_stream);
+    auto ret = callback(flow_file_stream);
     if (ret < 0) {
       throw Exception(FILE_OPERATION_EXCEPTION, "Failed to process flowfile content");
     }
@@ -359,7 +366,8 @@ int64_t ProcessSession::read(const std::shared_ptr<core::FlowFile> &flow, InputS
   }
 }
 
-int64_t ProcessSession::readWrite(const std::shared_ptr<core::FlowFile> &flow, InputOutputStreamCallback *callback) {
+
+int64_t ProcessSession::readWrite(const std::shared_ptr<core::FlowFile> &flow, const io::InputOutputStreamCallback& callback) {
   gsl_Expects(callback);
 
   try {
@@ -384,7 +392,7 @@ int64_t ProcessSession::readWrite(const std::shared_ptr<core::FlowFile> &flow, I
       throw Exception(FILE_OPERATION_EXCEPTION, "Failed to open flowfile content for write");
     }
 
-    int64_t bytes_written = callback->process(input_stream, output_stream);
+    int64_t bytes_written = callback(input_stream, output_stream);
     if (bytes_written < 0) {
       throw Exception(FILE_OPERATION_EXCEPTION, "Failed to process flowfile content");
     }
@@ -408,25 +416,15 @@ int64_t ProcessSession::readWrite(const std::shared_ptr<core::FlowFile> &flow, I
 
 detail::ReadBufferResult ProcessSession::readBuffer(const std::shared_ptr<core::FlowFile>& flow) {
   detail::ReadBufferResult result;
-  struct Callback : InputStreamCallback {
-    detail::ReadBufferResult& result;
-    ProcessSession& session;
-    Callback(detail::ReadBufferResult& result, ProcessSession& session)
-      :result{result}, session{session}
-    {}
-    int64_t process(const std::shared_ptr<io::BaseStream>& inputStream) final {
-      gsl_Expects(inputStream);
-      result.buffer.resize(inputStream->size());
-      const auto read_status = inputStream->read(result.buffer);
-      if (read_status != result.buffer.size()) {
-        session.logger_->log_error("readBuffer: %zu bytes were requested from the stream but %zu bytes were read. Rolling back.", result.buffer.size(), read_status);
-        throw Exception(PROCESSOR_EXCEPTION, "Failed to read the entire FlowFile.");
-      }
-      return gsl::narrow<int64_t>(read_status);
+  result.status = read(flow, [&result, this](const std::shared_ptr<io::BaseStream>& input_stream) {
+    result.buffer.resize(input_stream->size());
+    const auto read_status = input_stream->read(result.buffer);
+    if (read_status != result.buffer.size()) {
+      logger_->log_error("readBuffer: %zu bytes were requested from the stream but %zu bytes were read. Rolling back.", result.buffer.size(), read_status);
+      throw Exception(PROCESSOR_EXCEPTION, "Failed to read the entire FlowFile.");
     }
-  };
-  Callback cb{result, *this};
-  result.status = read(flow, &cb);
+    return gsl::narrow<int64_t>(read_status);
+  });
   return result;
 }
 
@@ -686,7 +684,7 @@ bool ProcessSession::exportContent(const std::string &destination, const std::st
   logger_->log_debug("Exporting content of %s to %s", flow->getUUIDStr(), destination);
 
   ProcessSessionReadCallback cb(tmpFile, destination, logger_);
-  read(flow, &cb);
+  read(flow, std::ref(cb));
 
   logger_->log_info("Committing %s", destination);
   bool commit_ok = cb.commit();
@@ -1022,8 +1020,8 @@ void ProcessSession::ensureNonNullResourceClaim(
         logger_->log_debug("Processor %s (%s) did not create a ResourceClaim, creating an empty one",
                            process_context_->getProcessorNode()->getUUIDStr(),
                            process_context_->getProcessorNode()->getName());
-        OutputStreamPipe emptyStreamCallback(std::make_shared<io::BufferStream>());
-        write(flowFile, &emptyStreamCallback);
+        io::BufferStream emptyBufferStream;
+        write(flowFile, OutputStreamPipe{emptyBufferStream});
       }
     }
   }
diff --git a/libminifi/src/core/ProcessSessionReadCallback.cpp b/libminifi/src/core/ProcessSessionReadCallback.cpp
index 7d925b595..810e46c89 100644
--- a/libminifi/src/core/ProcessSessionReadCallback.cpp
+++ b/libminifi/src/core/ProcessSessionReadCallback.cpp
@@ -43,7 +43,7 @@ ProcessSessionReadCallback::ProcessSessionReadCallback(const std::string &tmpFil
 }
 
 // Copy the entire file contents to the temporary file
-int64_t ProcessSessionReadCallback::process(const std::shared_ptr<io::BaseStream>& stream) {
+int64_t ProcessSessionReadCallback::operator()(const std::shared_ptr<io::BaseStream>& stream) {
   // Copy file contents into tmp file
   _writeSucceeded = false;
   size_t size = 0;
diff --git a/libminifi/src/serialization/FlowFileV3Serializer.cpp b/libminifi/src/serialization/FlowFileV3Serializer.cpp
index 588c36ffc..5bd69c40e 100644
--- a/libminifi/src/serialization/FlowFileV3Serializer.cpp
+++ b/libminifi/src/serialization/FlowFileV3Serializer.cpp
@@ -92,9 +92,8 @@ int64_t FlowFileV3Serializer::serialize(const std::shared_ptr<core::FlowFile>& f
     if (io::isError(ret)) return -1;
     sum += ret;
   }
-  InputStreamPipe pipe(out);
   {
-    const auto ret = reader_(flowFile, &pipe);
+    const auto ret = reader_(flowFile, InputStreamPipe{*out});
     if (ret < 0) return -1;
     sum += gsl::narrow<size_t>(ret);
   }
diff --git a/libminifi/src/serialization/PayloadSerializer.cpp b/libminifi/src/serialization/PayloadSerializer.cpp
index 8bfa1128a..4da0d0bf3 100644
--- a/libminifi/src/serialization/PayloadSerializer.cpp
+++ b/libminifi/src/serialization/PayloadSerializer.cpp
@@ -25,8 +25,7 @@ namespace nifi {
 namespace minifi {
 
 int64_t PayloadSerializer::serialize(const std::shared_ptr<core::FlowFile>& flowFile, const std::shared_ptr<io::OutputStream>& out) {
-  InputStreamPipe pipe(out);
-  return reader_(flowFile, &pipe);
+  return reader_(flowFile, InputStreamPipe{*out});
 }
 
 } /* namespace minifi */
diff --git a/libminifi/src/sitetosite/SiteToSiteClient.cpp b/libminifi/src/sitetosite/SiteToSiteClient.cpp
index 968ea397c..fbe118ce4 100644
--- a/libminifi/src/sitetosite/SiteToSiteClient.cpp
+++ b/libminifi/src/sitetosite/SiteToSiteClient.cpp
@@ -471,8 +471,12 @@ int16_t SiteToSiteClient::send(const utils::Identifier &transactionID, DataPacke
       return -1;
     }
     if (flowFile->getSize() > 0) {
-      sitetosite::ReadCallback callback(packet);
-      session->read(flowFile, &callback);
+      session->read(flowFile, [packet](const std::shared_ptr<io::BaseStream>& input_stream) -> int64_t {
+        const auto result = internal::pipe(*input_stream, packet->transaction_->getStream());
+        if (result == -1) return -1;
+        packet->_size = gsl::narrow<size_t>(result);
+        return result;
+      });
       if (flowFile->getSize() != packet->_size) {
         logger_->log_debug("Mismatched sizes %llu %llu", flowFile->getSize(), packet->_size);
         return -2;
@@ -694,8 +698,9 @@ bool SiteToSiteClient::receiveFlowFiles(const std::shared_ptr<core::ProcessConte
       }
 
       if (packet._size > 0) {
-        sitetosite::WriteCallback callback(&packet);
-        session->write(flowFile, &callback);
+        session->write(flowFile, [&packet](const std::shared_ptr<io::BaseStream>& output_stream) -> int64_t {
+          return internal::pipe(packet.transaction_->getStream(), *output_stream);
+        });
         if (flowFile->getSize() != packet._size) {
           std::stringstream message;
           message << "Receive size not correct, expected to send " << flowFile->getSize() << " bytes, but actually sent " << packet._size;
diff --git a/libminifi/src/utils/ByteArrayCallback.cpp b/libminifi/src/utils/ByteArrayCallback.cpp
index 429cba73b..8fe16a946 100644
--- a/libminifi/src/utils/ByteArrayCallback.cpp
+++ b/libminifi/src/utils/ByteArrayCallback.cpp
@@ -30,7 +30,7 @@ namespace nifi {
 namespace minifi {
 namespace utils {
 
-int64_t ByteOutputCallback::process(const std::shared_ptr<io::BaseStream>& stream) {
+int64_t ByteOutputCallback::operator()(const std::shared_ptr<io::BaseStream>& stream) {
   stream->seek(0);
   if (stream->size() > 0) {
     std::vector<std::byte> buffer;
@@ -42,7 +42,7 @@ int64_t ByteOutputCallback::process(const std::shared_ptr<io::BaseStream>& strea
   return gsl::narrow<int64_t>(size_.load());
 }
 
-int64_t StreamOutputCallback::process(const std::shared_ptr<io::BaseStream>& stream) {
+int64_t StreamOutputCallback::operator()(const std::shared_ptr<io::BaseStream>& stream) {
   stream->seek(0);
   std::unique_ptr<char> buffer = std::unique_ptr<char>(new char[size_.load()]);
   auto written = readFully(buffer.get(), size_);
diff --git a/libminifi/src/utils/FileReaderCallback.cpp b/libminifi/src/utils/FileReaderCallback.cpp
index ee5958655..825ae3647 100644
--- a/libminifi/src/utils/FileReaderCallback.cpp
+++ b/libminifi/src/utils/FileReaderCallback.cpp
@@ -35,31 +35,32 @@ namespace nifi {
 namespace minifi {
 namespace utils {
 
-FileReaderCallback::FileReaderCallback(const std::string& file_name)
-    : logger_(core::logging::LoggerFactory<FileReaderCallback>::getLogger()) {
-  logger_->log_debug("Opening %s", file_name);
-  input_stream_.open(file_name.c_str(), std::fstream::in | std::fstream::binary);
-  if (!input_stream_.is_open()) {
-    throw FileReaderCallbackIOError(StringUtils::join_pack("Error opening file: ", std::strerror(errno)), errno);
-  }
+FileReaderCallback::FileReaderCallback(std::string file_name)
+    : file_name_{std::move(file_name)},
+    logger_(core::logging::LoggerFactory<FileReaderCallback>::getLogger()) {
 }
 
-int64_t FileReaderCallback::process(const std::shared_ptr<io::BaseStream>& output_stream) {
+int64_t FileReaderCallback::operator()(const std::shared_ptr<io::BaseStream>& output_stream) const {
   std::array<char, BUFFER_SIZE> buffer;
   uint64_t num_bytes_written = 0;
 
-  while (input_stream_.good()) {
-    input_stream_.read(buffer.data(), buffer.size());
-    if (input_stream_.bad()) {
+  std::ifstream input_stream{file_name_, std::ifstream::in | std::ifstream::binary};
+  if (!input_stream.is_open()) {
+    throw FileReaderCallbackIOError(StringUtils::join_pack("Error opening file: ", std::strerror(errno)), errno);
+  }
+  logger_->log_debug("Opening %s", file_name_);
+  while (input_stream.good()) {
+    input_stream.read(buffer.data(), buffer.size());
+    if (input_stream.bad()) {
       throw FileReaderCallbackIOError(StringUtils::join_pack("Error reading file: ", std::strerror(errno)), errno);
     }
-    const auto num_bytes_read = input_stream_.gcount();
+    const auto num_bytes_read = input_stream.gcount();
     logger_->log_trace("Read %jd bytes of input", std::intmax_t{num_bytes_read});
-    const int len = gsl::narrow<int>(num_bytes_read);
+    const auto len = gsl::narrow<size_t>(num_bytes_read);
     output_stream->write(reinterpret_cast<uint8_t*>(buffer.data()), len);
     num_bytes_written += len;
   }
-  input_stream_.close();
+  input_stream.close();
 
   logger_->log_debug("Finished reading %" PRIu64 " bytes from the file", num_bytes_written);
   return num_bytes_written;
diff --git a/libminifi/src/utils/LineByLineInputOutputStreamCallback.cpp b/libminifi/src/utils/LineByLineInputOutputStreamCallback.cpp
index 4df55fde4..f06ba7786 100644
--- a/libminifi/src/utils/LineByLineInputOutputStreamCallback.cpp
+++ b/libminifi/src/utils/LineByLineInputOutputStreamCallback.cpp
@@ -25,7 +25,7 @@ LineByLineInputOutputStreamCallback::LineByLineInputOutputStreamCallback(Callbac
   : callback_(std::move(callback)) {
 }
 
-int64_t LineByLineInputOutputStreamCallback::process(const std::shared_ptr<io::BaseStream>& input, const std::shared_ptr<io::BaseStream>& output) {
+int64_t LineByLineInputOutputStreamCallback::operator()(const std::shared_ptr<io::BaseStream>& input, const std::shared_ptr<io::BaseStream>& output) {
   gsl_Expects(input);
   gsl_Expects(output);
 
diff --git a/libminifi/test/ReadFromFlowFileTestProcessor.cpp b/libminifi/test/ReadFromFlowFileTestProcessor.cpp
index ba8564eff..72434b258 100644
--- a/libminifi/test/ReadFromFlowFileTestProcessor.cpp
+++ b/libminifi/test/ReadFromFlowFileTestProcessor.cpp
@@ -16,10 +16,7 @@
  */
 #include "ReadFromFlowFileTestProcessor.h"
 
-#include <string>
 #include <utility>
-#include <vector>
-
 #include "utils/gsl.h"
 
 namespace org::apache::nifi::minifi::processors {
@@ -34,18 +31,6 @@ void ReadFromFlowFileTestProcessor::onSchedule(core::ProcessContext*, core::Proc
   logger_->log_info("%s", ON_SCHEDULE_LOG_STR);
 }
 
-namespace {
-struct ReadFlowFileIntoBuffer : public InputStreamCallback {
-  std::vector<std::byte> buffer_;
-
-  int64_t process(const std::shared_ptr<io::BaseStream> &stream) override {
-    buffer_.resize(stream->size());
-    size_t bytes_read = stream->read(buffer_);
-    return io::isError(bytes_read) ? -1 : gsl::narrow<int64_t>(bytes_read);
-  }
-};
-}  // namespace
-
 void ReadFromFlowFileTestProcessor::onTrigger(core::ProcessContext* context, core::ProcessSession* session) {
   gsl_Expects(context && session);
   logger_->log_info("%s", ON_TRIGGER_LOG_STR);
@@ -53,8 +38,6 @@ void ReadFromFlowFileTestProcessor::onTrigger(core::ProcessContext* context, cor
     clear();
 
   while (std::shared_ptr<core::FlowFile> flow_file = session->get()) {
-    ReadFlowFileIntoBuffer callback;
-    session->read(flow_file, &callback);
     session->transfer(flow_file, Success);
     flow_files_read_.emplace_back(session, gsl::not_null(std::move(flow_file)));
   }
@@ -65,9 +48,7 @@ void ReadFromFlowFileTestProcessor::onUnSchedule() {
 }
 
 ReadFromFlowFileTestProcessor::FlowFileData::FlowFileData(core::ProcessSession* session, const gsl::not_null<std::shared_ptr<core::FlowFile>>& flow_file) {
-  ReadFlowFileIntoBuffer callback;
-  session->read(flow_file, &callback);
-  content_ = utils::span_to<std::string>(gsl::make_span(callback.buffer_).as_span<char>());
+  content_ = to_string(session->readBuffer(flow_file));
   attributes_ = flow_file->getAttributes();
 }
 
diff --git a/libminifi/test/TestBase.cpp b/libminifi/test/TestBase.cpp
index efa20c362..6e94d80e9 100644
--- a/libminifi/test/TestBase.cpp
+++ b/libminifi/test/TestBase.cpp
@@ -603,7 +603,7 @@ std::string TestPlan::getContent(const minifi::core::FlowFile& file) const {
   auto content_claim = file.getResourceClaim();
   auto content_stream = content_repo_->read(*content_claim);
   auto output_stream = std::make_shared<minifi::io::BufferStream>();
-  minifi::InputStreamPipe{output_stream}.process(content_stream);
+  minifi::InputStreamPipe{*output_stream}(content_stream);
   return utils::span_to<std::string>(output_stream->getBuffer().as_span<const char>());
 }
 
diff --git a/libminifi/test/WriteToFlowFileTestProcessor.cpp b/libminifi/test/WriteToFlowFileTestProcessor.cpp
index 90a509029..f8fa19026 100644
--- a/libminifi/test/WriteToFlowFileTestProcessor.cpp
+++ b/libminifi/test/WriteToFlowFileTestProcessor.cpp
@@ -17,9 +17,6 @@
 
 #include "WriteToFlowFileTestProcessor.h"
 
-#include <string>
-#include <vector>
-
 #include "utils/gsl.h"
 
 namespace org::apache::nifi::minifi::processors {
@@ -35,19 +32,6 @@ void WriteToFlowFileTestProcessor::onSchedule(core::ProcessContext*, core::Proce
   logger_->log_info("%s", ON_SCHEDULE_LOG_STR);
 }
 
-namespace {
-struct WriteToFlowFileCallback : public OutputStreamCallback {
-  const gsl::span<const uint8_t> content_;
-
-  explicit WriteToFlowFileCallback(const std::string& content) : content_(reinterpret_cast<const uint8_t*>(content.data()), content.size()) {}
-
-  int64_t process(const std::shared_ptr<io::BaseStream> &stream) override {
-    size_t bytes_written = stream->write(content_.begin(), content_.size());
-    return io::isError(bytes_written) ? -1 : gsl::narrow<int64_t>(bytes_written);
-  }
-};
-}  // namespace
-
 void WriteToFlowFileTestProcessor::onTrigger(core::ProcessContext* context, core::ProcessSession* session) {
   gsl_Expects(context && session);
   logger_->log_info("%s", ON_TRIGGER_LOG_STR);
@@ -60,8 +44,7 @@ void WriteToFlowFileTestProcessor::onTrigger(core::ProcessContext* context, core
     logger_->log_error("Failed to create flowfile!");
     return;
   }
-  WriteToFlowFileCallback callback(content_);
-  session->write(flow_file, &callback);
+  session->writeBuffer(flow_file, content_);
   session->transfer(flow_file, Success);
 }
 
diff --git a/libminifi/test/archive-tests/CompressContentTests.cpp b/libminifi/test/archive-tests/CompressContentTests.cpp
index 6804e3a45..2792b416c 100644
--- a/libminifi/test/archive-tests/CompressContentTests.cpp
+++ b/libminifi/test/archive-tests/CompressContentTests.cpp
@@ -44,7 +44,7 @@
 #include "../Utils.h"
 #include "utils/gsl.h"
 
-class ReadCallback : public minifi::InputStreamCallback {
+class ReadCallback {
  public:
   explicit ReadCallback(size_t size)
       :buffer_{size}
@@ -54,7 +54,7 @@ class ReadCallback : public minifi::InputStreamCallback {
   ReadCallback& operator=(const ReadCallback&) = delete;
   ReadCallback& operator=(ReadCallback&&) = delete;
 
-  int64_t process(const std::shared_ptr<minifi::io::BaseStream>& stream) override {
+  int64_t operator()(const std::shared_ptr<minifi::io::BaseStream>& stream) {
     int64_t total_read = 0;
     do {
       const auto ret = stream->read(gsl::make_span(buffer_).subspan(read_size_));
@@ -175,7 +175,7 @@ class CompressDecompressionTestController : public TestController{
   }
 
   void read(const std::shared_ptr<core::FlowFile>& file, ReadCallback& reader) {
-    helper_session->read(file, &reader);
+    helper_session->read(file, std::ref(reader));
   }
 
  public:
diff --git a/libminifi/test/archive-tests/MergeFileTests.cpp b/libminifi/test/archive-tests/MergeFileTests.cpp
index 932ec04c5..0ed9fa877 100644
--- a/libminifi/test/archive-tests/MergeFileTests.cpp
+++ b/libminifi/test/archive-tests/MergeFileTests.cpp
@@ -65,7 +65,7 @@ void init_file_paths() {
   static Initializer initializer;
 }
 
-class FixedBuffer : public minifi::InputStreamCallback {
+class FixedBuffer {
  public:
   explicit FixedBuffer(std::size_t capacity) : capacity_(capacity) {
     buf_.reset(new uint8_t[capacity_]);
@@ -97,7 +97,7 @@ class FixedBuffer : public minifi::InputStreamCallback {
     } while (size_ != capacity_);
     return total_read;
   }
-  int64_t process(const std::shared_ptr<minifi::io::BaseStream>& stream) override {
+  int64_t operator()(const std::shared_ptr<minifi::io::BaseStream>& stream) {
     return write(*stream, capacity_);
   }
 
@@ -246,13 +246,13 @@ TEST_CASE_METHOD(MergeTestController, "MergeFileDefragment", "[mergefiletest1]")
   REQUIRE(flow1->getSize() == 96);
   {
     FixedBuffer callback(gsl::narrow<size_t>(flow1->getSize()));
-    sessionGenFlowFile.read(flow1, &callback);
+    sessionGenFlowFile.read(flow1, std::ref(callback));
     REQUIRE(callback.to_string() == expected[0]);
   }
   REQUIRE(flow2->getSize() == 96);
   {
     FixedBuffer callback(gsl::narrow<size_t>(flow2->getSize()));
-    sessionGenFlowFile.read(flow2, &callback);
+    sessionGenFlowFile.read(flow2, std::ref(callback));
     REQUIRE(callback.to_string() == expected[1]);
   }
 }
@@ -308,13 +308,13 @@ TEST_CASE_METHOD(MergeTestController, "MergeFileDefragmentDelimiter", "[mergefil
   REQUIRE(flow1->getSize() == 128);
   {
     FixedBuffer callback(gsl::narrow<size_t>(flow1->getSize()));
-    sessionGenFlowFile.read(flow1, &callback);
+    sessionGenFlowFile.read(flow1, std::ref(callback));
     REQUIRE(callback.to_string() == expected[0]);
   }
   REQUIRE(flow2->getSize() == 128);
   {
     FixedBuffer callback(gsl::narrow<size_t>(flow2->getSize()));
-    sessionGenFlowFile.read(flow2, &callback);
+    sessionGenFlowFile.read(flow2, std::ref(callback));
     REQUIRE(callback.to_string() == expected[1]);
   }
 }
@@ -371,13 +371,13 @@ TEST_CASE_METHOD(MergeTestController, "MergeFileDefragmentDropFlow", "[mergefile
   REQUIRE(flow1->getSize() == 96);
   {
     FixedBuffer callback(gsl::narrow<size_t>(flow1->getSize()));
-    sessionGenFlowFile.read(flow1, &callback);
+    sessionGenFlowFile.read(flow1, std::ref(callback));
     REQUIRE(callback.to_string() == expected[0]);
   }
   REQUIRE(flow2->getSize() == 64);
   {
     FixedBuffer callback(gsl::narrow<size_t>(flow2->getSize()));
-    sessionGenFlowFile.read(flow2, &callback);
+    sessionGenFlowFile.read(flow2, std::ref(callback));
     REQUIRE(callback.to_string() == expected[1]);
   }
 }
@@ -419,13 +419,13 @@ TEST_CASE_METHOD(MergeTestController, "MergeFileBinPack", "[mergefiletest4]") {
   REQUIRE(flow1->getSize() == 96);
   {
     FixedBuffer callback(gsl::narrow<size_t>(flow1->getSize()));
-    sessionGenFlowFile.read(flow1, &callback);
+    sessionGenFlowFile.read(flow1, std::ref(callback));
     REQUIRE(callback.to_string() == expected[0]);
   }
   REQUIRE(flow2->getSize() == 96);
   {
     FixedBuffer callback(gsl::narrow<size_t>(flow2->getSize()));
-    sessionGenFlowFile.read(flow2, &callback);
+    sessionGenFlowFile.read(flow2, std::ref(callback));
     REQUIRE(callback.to_string() == expected[1]);
   }
 }
@@ -463,7 +463,7 @@ TEST_CASE_METHOD(MergeTestController, "MergeFileTar", "[mergefiletest4]") {
   REQUIRE(flow1->getSize() > 0);
   {
     FixedBuffer callback(gsl::narrow<size_t>(flow1->getSize()));
-    sessionGenFlowFile.read(flow1, &callback);
+    sessionGenFlowFile.read(flow1, std::ref(callback));
     auto archives = read_archives(callback);
     REQUIRE(archives.size() == 3);
     for (int i = 0; i < 3; i++) {
@@ -473,7 +473,7 @@ TEST_CASE_METHOD(MergeTestController, "MergeFileTar", "[mergefiletest4]") {
   REQUIRE(flow2->getSize() > 0);
   {
     FixedBuffer callback(gsl::narrow<size_t>(flow2->getSize()));
-    sessionGenFlowFile.read(flow2, &callback);
+    sessionGenFlowFile.read(flow2, std::ref(callback));
     auto archives = read_archives(callback);
     REQUIRE(archives.size() == 3);
     for (int i = 3; i < 6; i++) {
@@ -514,7 +514,7 @@ TEST_CASE_METHOD(MergeTestController, "MergeFileZip", "[mergefiletest5]") {
   REQUIRE(flow1->getSize() > 0);
   {
     FixedBuffer callback(gsl::narrow<size_t>(flow1->getSize()));
-    sessionGenFlowFile.read(flow1, &callback);
+    sessionGenFlowFile.read(flow1, std::ref(callback));
     auto archives = read_archives(callback);
     REQUIRE(archives.size() == 3);
     for (int i = 0; i < 3; i++) {
@@ -524,7 +524,7 @@ TEST_CASE_METHOD(MergeTestController, "MergeFileZip", "[mergefiletest5]") {
   REQUIRE(flow2->getSize() > 0);
   {
     FixedBuffer callback(gsl::narrow<size_t>(flow2->getSize()));
-    sessionGenFlowFile.read(flow2, &callback);
+    sessionGenFlowFile.read(flow2, std::ref(callback));
     auto archives = read_archives(callback);
     REQUIRE(archives.size() == 3);
     for (int i = 3; i < 6; i++) {
@@ -572,12 +572,12 @@ TEST_CASE_METHOD(MergeTestController, "MergeFileOnAttribute", "[mergefiletest5]"
   std::shared_ptr<core::FlowFile> flow2 = output_->poll(expiredFlowRecords);
   {
     FixedBuffer callback(flow1->getSize());
-    sessionGenFlowFile.read(flow1, &callback);
+    sessionGenFlowFile.read(flow1, std::ref(callback));
     REQUIRE(callback.to_string() == expected[0]);
   }
   {
     FixedBuffer callback(flow2->getSize());
-    sessionGenFlowFile.read(flow2, &callback);
+    sessionGenFlowFile.read(flow2, std::ref(callback));
     REQUIRE(callback.to_string() == expected[1]);
   }
 }
@@ -697,10 +697,10 @@ TEST_CASE("FlowFile serialization", "[testFlowFileSerialization]") {
 
   core::ProcessSession session(context);
 
-  minifi::PayloadSerializer payloadSerializer([&] (const std::shared_ptr<core::FlowFile>& ff, minifi::InputStreamCallback* cb) {
+  minifi::PayloadSerializer payloadSerializer([&] (const std::shared_ptr<core::FlowFile>& ff, const minifi::io::InputStreamCallback& cb) {
     return session.read(ff, cb);
   });
-  minifi::FlowFileV3Serializer ffV3Serializer([&] (const std::shared_ptr<core::FlowFile>& ff, minifi::InputStreamCallback* cb) {
+  minifi::FlowFileV3Serializer ffV3Serializer([&] (const std::shared_ptr<core::FlowFile>& ff, const minifi::io::InputStreamCallback& cb) {
     return session.read(ff, cb);
   });
 
@@ -768,7 +768,7 @@ TEST_CASE("FlowFile serialization", "[testFlowFileSerialization]") {
   REQUIRE(expiredFlowRecords.empty());
   {
     FixedBuffer callback(flow->getSize());
-    session.read(flow, &callback);
+    session.read(flow, std::ref(callback));
     REQUIRE(callback.to_string() == expected);
   }
 
@@ -812,13 +812,13 @@ TEST_CASE_METHOD(MergeTestController, "Batch Size", "[testMergeFileBatchSize]")
   REQUIRE(flow1);
   {
     FixedBuffer callback(gsl::narrow<size_t>(flow1->getSize()));
-    sessionGenFlowFile.read(flow1, &callback);
+    sessionGenFlowFile.read(flow1, std::ref(callback));
     REQUIRE(callback.to_string() == expected[0]);
   }
   REQUIRE(flow2);
   {
     FixedBuffer callback(gsl::narrow<size_t>(flow2->getSize()));
-    sessionGenFlowFile.read(flow2, &callback);
+    sessionGenFlowFile.read(flow2, std::ref(callback));
     REQUIRE(callback.to_string() == expected[1]);
   }
 }
diff --git a/libminifi/test/persistence-tests/PersistenceTests.cpp b/libminifi/test/persistence-tests/PersistenceTests.cpp
index 276a06445..19e51af29 100644
--- a/libminifi/test/persistence-tests/PersistenceTests.cpp
+++ b/libminifi/test/persistence-tests/PersistenceTests.cpp
@@ -109,8 +109,7 @@ struct TestFlow{
     return flow;
   }
   std::string read(const std::shared_ptr<core::FlowFile>& file) {
-    core::ProcessSession session(processorContext);
-    return to_string(session.readBuffer(file));
+    return to_string(core::ProcessSession{processorContext}.readBuffer(file));
   }
   void trigger() {
     auto session = std::make_shared<core::ProcessSession>(processorContext);
diff --git a/libminifi/test/unit/ContentRepositoryDependentTests.h b/libminifi/test/unit/ContentRepositoryDependentTests.h
index 49c735e7d..dc4d656ac 100644
--- a/libminifi/test/unit/ContentRepositoryDependentTests.h
+++ b/libminifi/test/unit/ContentRepositoryDependentTests.h
@@ -33,21 +33,10 @@
 
 namespace ContentRepositoryDependentTests {
 
-struct WriteStringToFlowFile : public minifi::OutputStreamCallback {
-  const std::vector<uint8_t> buffer_;
-
-  explicit WriteStringToFlowFile(const std::string& buffer) : buffer_(buffer.begin(), buffer.end()) {}
-
-  int64_t process(const std::shared_ptr<minifi::io::BaseStream> &stream) override {
-    size_t bytes_written = stream->write(buffer_, buffer_.size());
-    return minifi::io::isError(bytes_written) ? -1 : gsl::narrow<int64_t>(bytes_written);
-  }
-};
-
-struct ReadUntilItCan : public minifi::InputStreamCallback {
+struct ReadUntilItCan {
   std::string value_;
 
-  int64_t process(const std::shared_ptr<minifi::io::BaseStream> &stream) override {
+  int64_t operator()(const std::shared_ptr<minifi::io::BaseStream> &stream) {
     value_.clear();
     std::array<std::byte, 1024> buffer{};
     size_t bytes_read = 0;
@@ -93,14 +82,12 @@ class Fixture {
   }
 
   void writeToFlowFile(const std::shared_ptr<core::FlowFile>& flow_file, const std::string content) {
-    WriteStringToFlowFile callback(content);
-    process_session_->write(flow_file, &callback);
+    process_session_->writeBuffer(flow_file, content);
   }
 
   void appendToFlowFile(const std::shared_ptr<core::FlowFile>& flow_file, const std::string content_to_append) {
-    WriteStringToFlowFile callback(content_to_append);
     process_session_->add(flow_file);
-    process_session_->append(flow_file, &callback);
+    process_session_->appendBuffer(flow_file, content_to_append);
   }
 
  private:
@@ -125,17 +112,17 @@ void testReadOnSmallerClonedFlowFiles(std::shared_ptr<core::ContentRepository> c
   REQUIRE(clone_second_half != nullptr);
   ReadUntilItCan read_until_it_can_callback;
   const auto read_result_original = process_session.readBuffer(original_ff);
-  process_session.read(original_ff, &read_until_it_can_callback);
+  process_session.read(original_ff, std::ref(read_until_it_can_callback));
   CHECK(original_ff->getSize() == 6);
   CHECK(to_string(read_result_original) == "foobar");
   CHECK(read_until_it_can_callback.value_ == "foobar");
   const auto read_result_first_half = process_session.readBuffer(clone_first_half);
-  process_session.read(clone_first_half, &read_until_it_can_callback);
+  process_session.read(clone_first_half, std::ref(read_until_it_can_callback));
   CHECK(clone_first_half->getSize() == 3);
   CHECK(to_string(read_result_first_half) == "foo");
   CHECK(read_until_it_can_callback.value_ == "foo");
   const auto read_result_second_half = process_session.readBuffer(clone_second_half);
-  process_session.read(clone_second_half, &read_until_it_can_callback);
+  process_session.read(clone_second_half, std::ref(read_until_it_can_callback));
   CHECK(clone_second_half->getSize() == 3);
   CHECK(to_string(read_result_second_half) == "bar");
   CHECK(read_until_it_can_callback.value_ == "bar");
@@ -155,7 +142,7 @@ void testAppendToUnmanagedFlowFile(std::shared_ptr<core::ContentRepository> cont
   CHECK(flow_file->getSize() == 8);
   ReadUntilItCan read_until_it_can_callback;
   const auto read_result = process_session.readBuffer(flow_file);
-  process_session.read(flow_file, &read_until_it_can_callback);
+  process_session.read(flow_file, std::ref(read_until_it_can_callback));
   CHECK(to_string(read_result) == "myfoobar");
   CHECK(read_until_it_can_callback.value_ == "myfoobar");
 }
@@ -173,8 +160,8 @@ void testAppendToManagedFlowFile(std::shared_ptr<core::ContentRepository> conten
   CHECK(flow_file->getSize() == 8);
   const auto read_result = process_session.readBuffer(flow_file);
   ReadUntilItCan read_until_it_can_callback;
-  process_session.read(flow_file, &read_until_it_can_callback);
   CHECK(to_string(read_result) == "myfoobar");
+  process_session.read(flow_file, std::ref(read_until_it_can_callback));
   CHECK(read_until_it_can_callback.value_ == "myfoobar");
 }
 
@@ -187,7 +174,6 @@ void testReadFromZeroLengthFlowFile(std::shared_ptr<core::ContentRepository> con
 
   CHECK(flow_file->getSize() == 0);
   REQUIRE_NOTHROW(process_session.readBuffer(flow_file));
-  ReadUntilItCan read_until_it_can_callback;
-  REQUIRE_NOTHROW(process_session.read(flow_file, &read_until_it_can_callback));
+  REQUIRE_NOTHROW(process_session.read(flow_file, ReadUntilItCan{}));
 }
 }  // namespace ContentRepositoryDependentTests
diff --git a/libminifi/test/unit/FlowFileSerializationTests.cpp b/libminifi/test/unit/FlowFileSerializationTests.cpp
index aa8ffa3d3..dcfca26eb 100644
--- a/libminifi/test/unit/FlowFileSerializationTests.cpp
+++ b/libminifi/test/unit/FlowFileSerializationTests.cpp
@@ -47,8 +47,8 @@ TEST_CASE("Payload Serializer", "[testPayload]") {
   flowFile->addAttribute("first", "one");
   flowFile->addAttribute("second", "two");
 
-  minifi::PayloadSerializer serializer([&] (const std::shared_ptr<core::FlowFile>&, minifi::InputStreamCallback* cb) {
-    return gsl::narrow<int>(cb->process(contentStream));
+  minifi::PayloadSerializer serializer([&] (const std::shared_ptr<core::FlowFile>&, const minifi::io::InputStreamCallback& cb) {
+    return cb(contentStream);
   });
   serializer.serialize(flowFile, result);
   const auto serialized = utils::span_to<std::string>(result->getBuffer().as_span<const char>());
@@ -67,8 +67,8 @@ TEST_CASE("FFv3 Serializer", "[testFFv3]") {
   flowFile->addAttribute("first", "one");
   flowFile->addAttribute("second", "two");
 
-  minifi::FlowFileV3Serializer serializer([&] (const std::shared_ptr<core::FlowFile>&, minifi::InputStreamCallback* cb) {
-    return gsl::narrow<int>(cb->process(contentStream));
+  minifi::FlowFileV3Serializer serializer([&] (const std::shared_ptr<core::FlowFile>&, const minifi::io::InputStreamCallback& cb) {
+    return cb(contentStream);
   });
   serializer.serialize(flowFile, result);
   const auto serialized = utils::span_to<std::string>(result->getBuffer().as_span<const char>());
diff --git a/libminifi/test/unit/LineByLineInputOutputStreamCallbackTests.cpp b/libminifi/test/unit/LineByLineInputOutputStreamCallbackTests.cpp
index 1266ec615..74e59f1c6 100644
--- a/libminifi/test/unit/LineByLineInputOutputStreamCallbackTests.cpp
+++ b/libminifi/test/unit/LineByLineInputOutputStreamCallbackTests.cpp
@@ -70,7 +70,7 @@ TEST_CASE("LineByLineInputOutputStreamCallback can process a stream line by line
   }
 
   LineByLineInputOutputStreamCallback line_by_line_input_output_stream_callback{line_processor};
-  line_by_line_input_output_stream_callback.process(input_stream, output_stream);
+  line_by_line_input_output_stream_callback(input_stream, output_stream);
   const auto output_data = utils::span_to<std::string>(output_stream->getBuffer().as_span<const char>());
   CHECK(output_data == expected_output);
 }
@@ -91,7 +91,7 @@ TEST_CASE("LineByLineInputOutputStreamCallback can handle Windows line endings",
                                "3: Five six, picking up sticks\r\n";
 
   LineByLineInputOutputStreamCallback line_by_line_input_output_stream_callback{line_processor};
-  line_by_line_input_output_stream_callback.process(input_stream, output_stream);
+  line_by_line_input_output_stream_callback(input_stream, output_stream);
   const auto output_data = utils::span_to<std::string>(output_stream->getBuffer().as_span<const char>());
   CHECK(output_data == expected_output);
 }
@@ -101,6 +101,6 @@ TEST_CASE("LineByLineInputOutputStreamCallback can handle an empty input", "[pro
   const auto output_stream = std::make_shared<minifi::io::BufferStream>();
   const auto line_processor = [](const std::string& input_line, bool, bool) { return input_line; };
   LineByLineInputOutputStreamCallback line_by_line_input_output_stream_callback{line_processor};
-  line_by_line_input_output_stream_callback.process(input_stream, output_stream);
+  line_by_line_input_output_stream_callback(input_stream, output_stream);
   CHECK(output_stream->size() == 0);
 }
diff --git a/libminifi/test/unit/LoggerTests.cpp b/libminifi/test/unit/LoggerTests.cpp
index 73f4751b2..deec59861 100644
--- a/libminifi/test/unit/LoggerTests.cpp
+++ b/libminifi/test/unit/LoggerTests.cpp
@@ -156,7 +156,7 @@ using namespace minifi::io;
 std::string decompress(const std::shared_ptr<InputStream>& input) {
   auto output = std::make_unique<BufferStream>();
   auto decompressor = std::make_shared<ZlibDecompressStream>(gsl::make_not_null(output.get()));
-  minifi::internal::pipe(input, decompressor);
+  minifi::internal::pipe(*input, *decompressor);
   decompressor->close();
   return utils::span_to<std::string>(output->getBuffer().as_span<const char>());
 }


[nifi-minifi-cpp] 02/03: MINIFICPP-1721 Fix GetFileTests transient failure

Posted by ma...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

martinzink pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit b9545e7dc94c0953c6e1c3a403638041448bc7db
Author: Gabor Gyimesi <ga...@gmail.com>
AuthorDate: Tue May 3 14:47:01 2022 +0200

    MINIFICPP-1721 Fix GetFileTests transient failure
    
    Closes #1317
    Signed-off-by: Martin Zink <ma...@apache.org>
---
 extensions/standard-processors/tests/unit/GetFileTests.cpp | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/extensions/standard-processors/tests/unit/GetFileTests.cpp b/extensions/standard-processors/tests/unit/GetFileTests.cpp
index cf00ea5e5..d6a33c51e 100644
--- a/extensions/standard-processors/tests/unit/GetFileTests.cpp
+++ b/extensions/standard-processors/tests/unit/GetFileTests.cpp
@@ -251,8 +251,8 @@ TEST_CASE("Test if GetFile honors PollInterval property when triggered multiple
   test_controller.setProperty(minifi::processors::GetFile::PollInterval, "100 ms");
   test_controller.setProperty(minifi::processors::GetFile::KeepSourceFile, "true");
 
-  test_controller.runSession();
   auto start_time = std::chrono::steady_clock::now();
+  test_controller.runSession();
   while (LogTestController::getInstance().countOccurrences("Logged 2 flow files") < 2) {
     test_controller.test_plan_->reset();
     test_controller.runSession();