You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by sz...@apache.org on 2022/10/12 15:22:23 UTC

[nifi-minifi-cpp] branch main updated (d93b84e5a -> 7ab10859f)

This is an automated email from the ASF dual-hosted git repository.

szaszm pushed a change to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git


    from d93b84e5a MINIFICPP-1945 Fix memory leak in ConsumeWindowsEventLog
     new 0014de517 MINIFICPP-1938 Enable parallel onTrigger calls for Azure and AWS processors
     new 7ab10859f MINIFICPP-1648 Input/OutputStreamCallback should use Input/OutputStream instead of BaseStream

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 controller/MiNiFiController.cpp                    |  1 -
 extensions/aws/processors/DeleteS3Object.h         |  2 +-
 extensions/aws/processors/FetchS3Object.cpp        | 16 ++-----
 extensions/aws/processors/FetchS3Object.h          |  2 +-
 extensions/aws/processors/PutS3Object.h            |  4 +-
 extensions/aws/s3/S3Wrapper.cpp                    |  6 +--
 extensions/aws/s3/S3Wrapper.h                      |  7 ++-
 .../azure/processors/DeleteAzureBlobStorage.h      |  2 +-
 .../azure/processors/DeleteAzureDataLakeStorage.h  |  2 +-
 .../azure/processors/FetchAzureBlobStorage.cpp     |  2 +-
 .../azure/processors/FetchAzureDataLakeStorage.cpp |  2 +-
 .../azure/processors/FetchAzureDataLakeStorage.h   |  2 +-
 extensions/azure/processors/PutAzureBlobStorage.h  |  2 +-
 .../azure/processors/PutAzureDataLakeStorage.cpp   |  2 +-
 .../azure/processors/PutAzureDataLakeStorage.h     |  4 +-
 extensions/azure/storage/AzureBlobStorage.cpp      |  2 +-
 extensions/azure/storage/AzureBlobStorage.h        |  2 +-
 .../azure/storage/AzureBlobStorageClient.cpp       | 40 +++++++----------
 extensions/azure/storage/AzureBlobStorageClient.h  |  5 +--
 extensions/azure/storage/AzureDataLakeStorage.cpp  |  2 +-
 extensions/azure/storage/AzureDataLakeStorage.h    |  2 +-
 .../azure/storage/AzureDataLakeStorageClient.cpp   | 26 ++++-------
 .../azure/storage/AzureDataLakeStorageClient.h     | 13 +++---
 extensions/coap/protocols/CoapC2Protocol.cpp       |  6 +--
 extensions/coap/protocols/CoapC2Protocol.h         | 14 +++---
 extensions/coap/tests/CoapC2VerifyHeartbeat.cpp    |  1 -
 extensions/gcp/processors/FetchGCSObject.cpp       |  2 +-
 extensions/gcp/processors/PutGCSObject.cpp         |  2 +-
 extensions/http-curl/client/HTTPCallback.h         |  2 +-
 extensions/http-curl/sitetosite/HTTPProtocol.cpp   |  2 +-
 extensions/http-curl/sitetosite/HTTPProtocol.h     |  2 +-
 extensions/http-curl/tests/CivetStream.h           |  1 -
 extensions/http-curl/tests/HTTPHandlers.h          |  2 +-
 .../http-curl/tests/unit/InvokeHTTPTests.cpp       |  1 -
 extensions/jni/jvm/JniProcessSession.h             |  1 -
 extensions/jni/jvm/JniReferenceObjects.h           |  4 +-
 extensions/libarchive/CompressContent.h            |  4 +-
 extensions/libarchive/FocusArchiveEntry.cpp        |  6 +--
 extensions/libarchive/FocusArchiveEntry.h          |  2 +-
 extensions/libarchive/MergeContent.h               |  6 +--
 extensions/libarchive/UnfocusArchiveEntry.cpp      |  4 +-
 extensions/libarchive/UnfocusArchiveEntry.h        |  2 +-
 extensions/librdkafka/PublishKafka.cpp             |  4 +-
 extensions/mqtt/processors/ConsumeMQTT.cpp         |  2 +-
 extensions/mqtt/processors/PublishMQTT.cpp         |  2 +-
 extensions/mqtt/processors/PublishMQTT.h           |  2 +-
 extensions/opencv/CaptureRTSPFrame.cpp             | 14 ++----
 extensions/opencv/MotionDetector.cpp               |  2 +-
 .../lua/{LuaBaseStream.cpp => LuaInputStream.cpp}  | 22 +++-------
 .../lua/{LuaBaseStream.h => LuaInputStream.h}      | 27 +++---------
 .../script/lua/LuaOutputStream.cpp                 | 31 ++++++--------
 .../lua/{LuaBaseStream.h => LuaOutputStream.h}     | 26 +++--------
 extensions/script/lua/LuaProcessSession.cpp        | 22 +++-------
 extensions/script/lua/LuaProcessSession.h          | 17 +++-----
 extensions/script/lua/LuaScriptEngine.cpp          | 10 +++--
 .../python/{PyBaseStream.cpp => PyInputStream.cpp} | 12 ++----
 .../python/{PyBaseStream.h => PyInputStream.h}     |  9 ++--
 .../python/{PyBaseStream.h => PyOutputStream.cpp}  | 27 +++++-------
 .../python/{PyBaseStream.h => PyOutputStream.h}    | 10 ++---
 extensions/script/python/PyProcessSession.cpp      |  8 ++--
 extensions/script/python/PyProcessSession.h        |  3 +-
 extensions/script/python/PythonBindings.h          | 13 +++---
 extensions/sftp/client/SFTPClient.cpp              | 24 +++++------
 extensions/sftp/client/SFTPClient.h                |  6 +--
 extensions/sftp/processors/FetchSFTP.cpp           |  7 +--
 extensions/sftp/processors/PutSFTP.cpp             |  2 +-
 .../processors/ExecuteProcess.h                    |  1 -
 .../standard-processors/processors/ExtractText.cpp |  2 +-
 .../standard-processors/processors/ExtractText.h   |  2 +-
 .../standard-processors/processors/HashContent.cpp | 14 ++----
 .../standard-processors/processors/HashContent.h   | 14 +++---
 .../standard-processors/processors/PutFile.cpp     |  4 +-
 .../standard-processors/processors/PutFile.h       | 21 +++------
 .../standard-processors/processors/RouteText.cpp   |  2 +-
 .../standard-processors/processors/TailFile.cpp    |  8 ++--
 extensions/tensorflow/TFApplyGraph.cpp             |  6 +--
 extensions/tensorflow/TFApplyGraph.h               |  8 ++--
 extensions/tensorflow/TFConvertImageToTensor.cpp   |  4 +-
 extensions/tensorflow/TFConvertImageToTensor.h     |  6 ++-
 extensions/tensorflow/TFExtractTopLabels.cpp       |  4 +-
 extensions/tensorflow/TFExtractTopLabels.h         |  6 ++-
 extensions/usb-camera/GetUSBCamera.cpp             |  2 +-
 extensions/usb-camera/GetUSBCamera.h               |  2 +-
 libminifi/include/FlowFileRecord.h                 |  1 -
 libminifi/include/c2/PayloadSerializer.h           | 15 ++++---
 libminifi/include/core/ContentRepository.h         |  1 -
 libminifi/include/core/ProcessSession.h            |  6 +--
 .../include/core/ProcessSessionReadCallback.h      | 23 +++-------
 libminifi/include/io/BaseStream.h                  | 12 +-----
 libminifi/include/io/CRCStream.h                   |  3 +-
 libminifi/include/io/StreamCallback.h              |  9 ++--
 libminifi/include/io/StreamPipe.h                  |  7 +--
 libminifi/include/io/StreamSlice.h                 | 12 +++---
 libminifi/include/io/ZlibStream.h                  |  2 +-
 libminifi/include/sitetosite/SiteToSiteClient.h    | 19 ++------
 libminifi/include/utils/ByteArrayCallback.h        | 20 ++-------
 libminifi/include/utils/FileReaderCallback.h       | 14 ++----
 libminifi/include/utils/JsonCallback.h             |  6 +--
 .../utils/LineByLineInputOutputStreamCallback.h    |  5 ++-
 libminifi/src/core/ProcessSession.cpp              | 14 +++---
 libminifi/src/core/ProcessSessionReadCallback.cpp  | 21 +++------
 libminifi/src/io/BaseStream.cpp                    | 33 --------------
 libminifi/src/io/StreamSlice.cpp                   |  2 +-
 libminifi/src/sitetosite/SiteToSiteClient.cpp      |  6 +--
 libminifi/src/utils/ByteArrayCallback.cpp          | 29 ++-----------
 libminifi/src/utils/FileReaderCallback.cpp         | 14 ++----
 .../utils/LineByLineInputOutputStreamCallback.cpp  |  2 +-
 .../test/archive-tests/CompressContentTests.cpp    |  2 +-
 libminifi/test/archive-tests/MergeFileTests.cpp    |  4 +-
 .../test/rocksdb-tests/ContentSessionTests.cpp     |  4 +-
 libminifi/test/rocksdb-tests/SwapTests.cpp         |  2 +-
 libminifi/test/unit/CRCTests.cpp                   | 22 +++++-----
 .../test/unit/ContentRepositoryDependentTests.h    |  2 +-
 libminifi/test/unit/FlowFileSerializationTests.cpp |  1 -
 libminifi/test/unit/SerializationTests.cpp         |  1 -
 libminifi/test/unit/StreamTests.cpp                |  8 ++--
 nanofi/include/cxx/CallbackProcessor.h             |  1 -
 nanofi/include/cxx/ReflexiveSession.h              | 50 +++++++---------------
 118 files changed, 360 insertions(+), 633 deletions(-)
 rename extensions/script/lua/{LuaBaseStream.cpp => LuaInputStream.cpp} (77%)
 copy extensions/script/lua/{LuaBaseStream.h => LuaInputStream.h} (67%)
 copy libminifi/src/core/repository/VolatileRepository.cpp => extensions/script/lua/LuaOutputStream.cpp (68%)
 rename extensions/script/lua/{LuaBaseStream.h => LuaOutputStream.h} (67%)
 rename extensions/script/python/{PyBaseStream.cpp => PyInputStream.cpp} (80%)
 copy extensions/script/python/{PyBaseStream.h => PyInputStream.h} (84%)
 copy extensions/script/python/{PyBaseStream.h => PyOutputStream.cpp} (72%)
 rename extensions/script/python/{PyBaseStream.h => PyOutputStream.h} (83%)
 delete mode 100644 libminifi/src/io/BaseStream.cpp


[nifi-minifi-cpp] 02/02: MINIFICPP-1648 Input/OutputStreamCallback should use Input/OutputStream instead of BaseStream

Posted by sz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

szaszm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit 7ab10859fbec2a33f9edab4302f0998fe6f123d3
Author: Gabor Gyimesi <ga...@gmail.com>
AuthorDate: Wed Oct 12 16:52:21 2022 +0200

    MINIFICPP-1648 Input/OutputStreamCallback should use Input/OutputStream instead of BaseStream
    
    + Change additional basestreams to specific input/output streams
    
    Closes #1428
    Signed-off-by: Marton Szasz <sz...@apache.org>
---
 controller/MiNiFiController.cpp                    |  1 -
 extensions/aws/processors/FetchS3Object.cpp        | 16 ++-----
 extensions/aws/processors/PutS3Object.h            |  2 +-
 extensions/aws/s3/S3Wrapper.cpp                    |  6 +--
 extensions/aws/s3/S3Wrapper.h                      |  7 ++-
 .../azure/processors/FetchAzureBlobStorage.cpp     |  2 +-
 .../azure/processors/FetchAzureDataLakeStorage.cpp |  2 +-
 extensions/azure/processors/PutAzureBlobStorage.h  |  2 +-
 .../azure/processors/PutAzureDataLakeStorage.cpp   |  2 +-
 .../azure/processors/PutAzureDataLakeStorage.h     |  2 +-
 extensions/azure/storage/AzureBlobStorage.cpp      |  2 +-
 extensions/azure/storage/AzureBlobStorage.h        |  2 +-
 extensions/azure/storage/AzureDataLakeStorage.cpp  |  2 +-
 extensions/azure/storage/AzureDataLakeStorage.h    |  2 +-
 extensions/coap/protocols/CoapC2Protocol.cpp       |  6 +--
 extensions/coap/protocols/CoapC2Protocol.h         | 14 +++---
 extensions/coap/tests/CoapC2VerifyHeartbeat.cpp    |  1 -
 extensions/gcp/processors/FetchGCSObject.cpp       |  2 +-
 extensions/gcp/processors/PutGCSObject.cpp         |  2 +-
 extensions/http-curl/client/HTTPCallback.h         |  2 +-
 extensions/http-curl/sitetosite/HTTPProtocol.cpp   |  2 +-
 extensions/http-curl/sitetosite/HTTPProtocol.h     |  2 +-
 extensions/http-curl/tests/CivetStream.h           |  1 -
 extensions/http-curl/tests/HTTPHandlers.h          |  2 +-
 .../http-curl/tests/unit/InvokeHTTPTests.cpp       |  1 -
 extensions/jni/jvm/JniProcessSession.h             |  1 -
 extensions/jni/jvm/JniReferenceObjects.h           |  4 +-
 extensions/libarchive/CompressContent.h            |  4 +-
 extensions/libarchive/FocusArchiveEntry.cpp        |  6 +--
 extensions/libarchive/FocusArchiveEntry.h          |  2 +-
 extensions/libarchive/MergeContent.h               |  6 +--
 extensions/libarchive/UnfocusArchiveEntry.cpp      |  4 +-
 extensions/libarchive/UnfocusArchiveEntry.h        |  2 +-
 extensions/librdkafka/PublishKafka.cpp             |  4 +-
 extensions/mqtt/processors/ConsumeMQTT.cpp         |  2 +-
 extensions/mqtt/processors/PublishMQTT.cpp         |  2 +-
 extensions/mqtt/processors/PublishMQTT.h           |  2 +-
 extensions/opencv/CaptureRTSPFrame.cpp             | 14 ++----
 extensions/opencv/MotionDetector.cpp               |  2 +-
 .../lua/{LuaBaseStream.cpp => LuaInputStream.cpp}  | 22 +++-------
 .../lua/{LuaBaseStream.h => LuaInputStream.h}      | 27 +++---------
 .../script/lua/LuaOutputStream.cpp                 | 28 ++++++------
 .../lua/{LuaBaseStream.h => LuaOutputStream.h}     | 26 +++--------
 extensions/script/lua/LuaProcessSession.cpp        | 22 +++-------
 extensions/script/lua/LuaProcessSession.h          | 17 +++-----
 extensions/script/lua/LuaScriptEngine.cpp          | 10 +++--
 .../python/{PyBaseStream.cpp => PyInputStream.cpp} | 12 ++----
 .../python/{PyBaseStream.h => PyInputStream.h}     |  9 ++--
 .../python/{PyBaseStream.h => PyOutputStream.cpp}  | 27 +++++-------
 .../python/{PyBaseStream.h => PyOutputStream.h}    | 10 ++---
 extensions/script/python/PyProcessSession.cpp      |  8 ++--
 extensions/script/python/PyProcessSession.h        |  3 +-
 extensions/script/python/PythonBindings.h          | 13 +++---
 extensions/sftp/client/SFTPClient.cpp              | 24 +++++------
 extensions/sftp/client/SFTPClient.h                |  6 +--
 extensions/sftp/processors/FetchSFTP.cpp           |  7 +--
 extensions/sftp/processors/PutSFTP.cpp             |  2 +-
 .../processors/ExecuteProcess.h                    |  1 -
 .../standard-processors/processors/ExtractText.cpp |  2 +-
 .../standard-processors/processors/ExtractText.h   |  2 +-
 .../standard-processors/processors/HashContent.cpp | 14 ++----
 .../standard-processors/processors/HashContent.h   | 14 +++---
 .../standard-processors/processors/PutFile.cpp     |  4 +-
 .../standard-processors/processors/PutFile.h       | 21 +++------
 .../standard-processors/processors/RouteText.cpp   |  2 +-
 .../standard-processors/processors/TailFile.cpp    |  8 ++--
 extensions/tensorflow/TFApplyGraph.cpp             |  6 +--
 extensions/tensorflow/TFApplyGraph.h               |  8 ++--
 extensions/tensorflow/TFConvertImageToTensor.cpp   |  4 +-
 extensions/tensorflow/TFConvertImageToTensor.h     |  6 ++-
 extensions/tensorflow/TFExtractTopLabels.cpp       |  4 +-
 extensions/tensorflow/TFExtractTopLabels.h         |  6 ++-
 extensions/usb-camera/GetUSBCamera.cpp             |  2 +-
 extensions/usb-camera/GetUSBCamera.h               |  2 +-
 libminifi/include/FlowFileRecord.h                 |  1 -
 libminifi/include/c2/PayloadSerializer.h           | 15 ++++---
 libminifi/include/core/ContentRepository.h         |  1 -
 libminifi/include/core/ProcessSession.h            |  6 +--
 .../include/core/ProcessSessionReadCallback.h      | 23 +++-------
 libminifi/include/io/BaseStream.h                  | 12 +-----
 libminifi/include/io/CRCStream.h                   |  3 +-
 libminifi/include/io/StreamCallback.h              |  9 ++--
 libminifi/include/io/StreamPipe.h                  |  7 +--
 libminifi/include/io/StreamSlice.h                 | 12 +++---
 libminifi/include/io/ZlibStream.h                  |  2 +-
 libminifi/include/sitetosite/SiteToSiteClient.h    | 19 ++------
 libminifi/include/utils/ByteArrayCallback.h        | 20 ++-------
 libminifi/include/utils/FileReaderCallback.h       | 14 ++----
 libminifi/include/utils/JsonCallback.h             |  6 +--
 .../utils/LineByLineInputOutputStreamCallback.h    |  5 ++-
 libminifi/src/core/ProcessSession.cpp              | 14 +++---
 libminifi/src/core/ProcessSessionReadCallback.cpp  | 21 +++------
 libminifi/src/io/StreamSlice.cpp                   |  2 +-
 libminifi/src/sitetosite/SiteToSiteClient.cpp      |  6 +--
 libminifi/src/utils/ByteArrayCallback.cpp          | 29 ++-----------
 libminifi/src/utils/FileReaderCallback.cpp         | 14 ++----
 .../utils/LineByLineInputOutputStreamCallback.cpp  |  2 +-
 .../test/archive-tests/CompressContentTests.cpp    |  2 +-
 libminifi/test/archive-tests/MergeFileTests.cpp    |  4 +-
 .../test/rocksdb-tests/ContentSessionTests.cpp     |  4 +-
 libminifi/test/rocksdb-tests/SwapTests.cpp         |  2 +-
 libminifi/test/unit/CRCTests.cpp                   | 22 +++++-----
 .../test/unit/ContentRepositoryDependentTests.h    |  2 +-
 libminifi/test/unit/FlowFileSerializationTests.cpp |  1 -
 libminifi/test/unit/SerializationTests.cpp         |  1 -
 libminifi/test/unit/StreamTests.cpp                |  8 ++--
 nanofi/include/cxx/CallbackProcessor.h             |  1 -
 nanofi/include/cxx/ReflexiveSession.h              | 50 +++++++---------------
 108 files changed, 325 insertions(+), 534 deletions(-)

diff --git a/controller/MiNiFiController.cpp b/controller/MiNiFiController.cpp
index 6f55e9b45..fd3438859 100644
--- a/controller/MiNiFiController.cpp
+++ b/controller/MiNiFiController.cpp
@@ -23,7 +23,6 @@
 #include <queue>
 #include <map>
 #include <iostream>
-#include "io/BaseStream.h"
 
 #include "core/Core.h"
 
diff --git a/extensions/aws/processors/FetchS3Object.cpp b/extensions/aws/processors/FetchS3Object.cpp
index 9ee0fd27a..10166f58d 100644
--- a/extensions/aws/processors/FetchS3Object.cpp
+++ b/extensions/aws/processors/FetchS3Object.cpp
@@ -28,12 +28,7 @@
 #include "core/Resource.h"
 #include "utils/OptionalUtils.h"
 
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace aws {
-namespace processors {
+namespace org::apache::nifi::minifi::aws::processors {
 
 void FetchS3Object::initialize() {
   setSupportedProperties(properties());
@@ -90,7 +85,7 @@ void FetchS3Object::onTrigger(const std::shared_ptr<core::ProcessContext> &conte
   }
 
   std::optional<minifi::aws::s3::GetObjectResult> result;
-  session->write(flow_file, [&get_object_params, &result, this](const std::shared_ptr<io::BaseStream>& stream) -> int64_t {
+  session->write(flow_file, [&get_object_params, &result, this](const std::shared_ptr<io::OutputStream>& stream) -> int64_t {
     result = s3_wrapper_.getObject(*get_object_params, *stream);
     return (result | minifi::utils::map(&s3::GetObjectResult::write_size)).value_or(0);
   });
@@ -120,9 +115,4 @@ void FetchS3Object::onTrigger(const std::shared_ptr<core::ProcessContext> &conte
   }
 }
 
-}  // namespace processors
-}  // namespace aws
-}  // namespace minifi
-}  // namespace nifi
-}  // namespace apache
-}  // namespace org
+}  // namespace org::apache::nifi::minifi::aws::processors
diff --git a/extensions/aws/processors/PutS3Object.h b/extensions/aws/processors/PutS3Object.h
index f04f89671..42ef343f3 100644
--- a/extensions/aws/processors/PutS3Object.h
+++ b/extensions/aws/processors/PutS3Object.h
@@ -101,7 +101,7 @@ class PutS3Object : public S3Processor {
       , s3_wrapper_(s3_wrapper) {
     }
 
-    int64_t operator()(const std::shared_ptr<io::BaseStream>& stream) {
+    int64_t operator()(const std::shared_ptr<io::InputStream>& stream) {
       if (flow_size_ > MAX_SIZE) {
         return -1;
       }
diff --git a/extensions/aws/s3/S3Wrapper.cpp b/extensions/aws/s3/S3Wrapper.cpp
index 6e76676ba..e8f573c8d 100644
--- a/extensions/aws/s3/S3Wrapper.cpp
+++ b/extensions/aws/s3/S3Wrapper.cpp
@@ -74,7 +74,7 @@ std::string S3Wrapper::getEncryptionString(Aws::S3::Model::ServerSideEncryption
   return "";
 }
 
-std::optional<PutObjectResult> S3Wrapper::putObject(const PutObjectRequestParameters& put_object_params, std::shared_ptr<Aws::IOStream> data_stream) {
+std::optional<PutObjectResult> S3Wrapper::putObject(const PutObjectRequestParameters& put_object_params, const std::shared_ptr<Aws::IOStream>& data_stream) {
   Aws::S3::Model::PutObjectRequest request;
   request.SetBucket(put_object_params.bucket);
   request.SetKey(put_object_params.object_key);
@@ -116,7 +116,7 @@ bool S3Wrapper::deleteObject(const DeleteObjectRequestParameters& params) {
   return request_sender_->sendDeleteObjectRequest(request, params.credentials, params.client_config);
 }
 
-int64_t S3Wrapper::writeFetchedBody(Aws::IOStream& source, const int64_t data_size, io::BaseStream& output) {
+int64_t S3Wrapper::writeFetchedBody(Aws::IOStream& source, const int64_t data_size, io::OutputStream& output) {
   std::vector<uint8_t> buffer(4096);
   size_t write_size = 0;
   if (data_size < 0) return 0;
@@ -134,7 +134,7 @@ int64_t S3Wrapper::writeFetchedBody(Aws::IOStream& source, const int64_t data_si
   return gsl::narrow<int64_t>(write_size);
 }
 
-std::optional<GetObjectResult> S3Wrapper::getObject(const GetObjectRequestParameters& get_object_params, io::BaseStream& out_body) {
+std::optional<GetObjectResult> S3Wrapper::getObject(const GetObjectRequestParameters& get_object_params, io::OutputStream& out_body) {
   auto request = createFetchObjectRequest<Aws::S3::Model::GetObjectRequest>(get_object_params);
   auto aws_result = request_sender_->sendGetObjectRequest(request, get_object_params.credentials, get_object_params.client_config);
   if (!aws_result) {
diff --git a/extensions/aws/s3/S3Wrapper.h b/extensions/aws/s3/S3Wrapper.h
index 5424319e1..676857f82 100644
--- a/extensions/aws/s3/S3Wrapper.h
+++ b/extensions/aws/s3/S3Wrapper.h
@@ -40,7 +40,6 @@
 #include "utils/StringUtils.h"
 #include "utils/ListingStateManager.h"
 #include "utils/gsl.h"
-#include "io/BaseStream.h"
 #include "S3RequestSender.h"
 
 namespace org::apache::nifi::minifi::aws::s3 {
@@ -200,9 +199,9 @@ class S3Wrapper {
   S3Wrapper();
   explicit S3Wrapper(std::unique_ptr<S3RequestSender>&& request_sender);
 
-  std::optional<PutObjectResult> putObject(const PutObjectRequestParameters& put_object_params, std::shared_ptr<Aws::IOStream> data_stream);
+  std::optional<PutObjectResult> putObject(const PutObjectRequestParameters& put_object_params, const std::shared_ptr<Aws::IOStream>& data_stream);
   bool deleteObject(const DeleteObjectRequestParameters& params);
-  std::optional<GetObjectResult> getObject(const GetObjectRequestParameters& get_object_params, io::BaseStream& out_body);
+  std::optional<GetObjectResult> getObject(const GetObjectRequestParameters& get_object_params, io::OutputStream& out_body);
   std::optional<std::vector<ListedObjectAttributes>> listBucket(const ListRequestParameters& params);
   std::optional<std::map<std::string, std::string>> getObjectTags(const GetObjectTagsParameters& params);
   std::optional<HeadObjectResult> headObject(const HeadObjectRequestParameters& head_object_params);
@@ -213,7 +212,7 @@ class S3Wrapper {
   static Expiration getExpiration(const std::string& expiration);
 
   void setCannedAcl(Aws::S3::Model::PutObjectRequest& request, const std::string& canned_acl) const;
-  static int64_t writeFetchedBody(Aws::IOStream& source, const int64_t data_size, io::BaseStream& output);
+  static int64_t writeFetchedBody(Aws::IOStream& source, const int64_t data_size, io::OutputStream& output);
   static std::string getEncryptionString(Aws::S3::Model::ServerSideEncryption encryption);
 
   std::optional<std::vector<ListedObjectAttributes>> listVersions(const ListRequestParameters& params);
diff --git a/extensions/azure/processors/FetchAzureBlobStorage.cpp b/extensions/azure/processors/FetchAzureBlobStorage.cpp
index 0ed7ab951..ecd67e748 100644
--- a/extensions/azure/processors/FetchAzureBlobStorage.cpp
+++ b/extensions/azure/processors/FetchAzureBlobStorage.cpp
@@ -70,7 +70,7 @@ void FetchAzureBlobStorage::onTrigger(const std::shared_ptr<core::ProcessContext
 
   auto fetched_flow_file = session->create(flow_file);
   std::optional<int64_t> result_size;
-  session->write(fetched_flow_file, [&, this](const std::shared_ptr<io::BaseStream>& stream) -> int64_t {
+  session->write(fetched_flow_file, [&, this](const std::shared_ptr<io::OutputStream>& stream) -> int64_t {
     result_size = azure_blob_storage_.fetchBlob(*params, *stream);
     if (!result_size) {
       return 0;
diff --git a/extensions/azure/processors/FetchAzureDataLakeStorage.cpp b/extensions/azure/processors/FetchAzureDataLakeStorage.cpp
index 06de9ca8d..2b2ff5188 100644
--- a/extensions/azure/processors/FetchAzureDataLakeStorage.cpp
+++ b/extensions/azure/processors/FetchAzureDataLakeStorage.cpp
@@ -74,7 +74,7 @@ void FetchAzureDataLakeStorage::onTrigger(const std::shared_ptr<core::ProcessCon
 
   auto fetched_flow_file = session->create(flow_file);
   std::optional<uint64_t> result;
-  session->write(fetched_flow_file, [&, this](const std::shared_ptr<io::BaseStream>& output_stream) -> int64_t {
+  session->write(fetched_flow_file, [&, this](const std::shared_ptr<io::OutputStream>& output_stream) -> int64_t {
     result = azure_data_lake_storage_.fetchFile(*params, *output_stream);
     if (!result) {
       return 0;
diff --git a/extensions/azure/processors/PutAzureBlobStorage.h b/extensions/azure/processors/PutAzureBlobStorage.h
index 619269f04..3f9574deb 100644
--- a/extensions/azure/processors/PutAzureBlobStorage.h
+++ b/extensions/azure/processors/PutAzureBlobStorage.h
@@ -73,7 +73,7 @@ class PutAzureBlobStorage final : public AzureBlobStorageSingleBlobProcessorBase
       , params_(params) {
     }
 
-    int64_t operator()(const std::shared_ptr<io::BaseStream>& stream) {
+    int64_t operator()(const std::shared_ptr<io::InputStream>& stream) {
       std::vector<std::byte> buffer;
       buffer.resize(flow_size_);
       size_t read_ret = stream->read(buffer);
diff --git a/extensions/azure/processors/PutAzureDataLakeStorage.cpp b/extensions/azure/processors/PutAzureDataLakeStorage.cpp
index ccf5c6754..027d51524 100644
--- a/extensions/azure/processors/PutAzureDataLakeStorage.cpp
+++ b/extensions/azure/processors/PutAzureDataLakeStorage.cpp
@@ -116,7 +116,7 @@ PutAzureDataLakeStorage::ReadCallback::ReadCallback(
     logger_(std::move(logger)) {
 }
 
-int64_t PutAzureDataLakeStorage::ReadCallback::operator()(const std::shared_ptr<io::BaseStream>& stream) {
+int64_t PutAzureDataLakeStorage::ReadCallback::operator()(const std::shared_ptr<io::InputStream>& stream) {
   std::vector<std::byte> buffer;
   buffer.resize(flow_size_);
   size_t read_ret = stream->read(buffer);
diff --git a/extensions/azure/processors/PutAzureDataLakeStorage.h b/extensions/azure/processors/PutAzureDataLakeStorage.h
index ac674dbcf..8fa3fffc4 100644
--- a/extensions/azure/processors/PutAzureDataLakeStorage.h
+++ b/extensions/azure/processors/PutAzureDataLakeStorage.h
@@ -75,7 +75,7 @@ class PutAzureDataLakeStorage final : public AzureDataLakeStorageFileProcessorBa
   class ReadCallback {
    public:
     ReadCallback(uint64_t flow_size, storage::AzureDataLakeStorage& azure_data_lake_storage, const storage::PutAzureDataLakeStorageParameters& params, std::shared_ptr<core::logging::Logger> logger);
-    int64_t operator()(const std::shared_ptr<io::BaseStream>& stream);
+    int64_t operator()(const std::shared_ptr<io::InputStream>& stream);
 
     [[nodiscard]] storage::UploadDataLakeStorageResult getResult() const {
       return result_;
diff --git a/extensions/azure/storage/AzureBlobStorage.cpp b/extensions/azure/storage/AzureBlobStorage.cpp
index d881ecdb1..a5d9ec5b7 100644
--- a/extensions/azure/storage/AzureBlobStorage.cpp
+++ b/extensions/azure/storage/AzureBlobStorage.cpp
@@ -75,7 +75,7 @@ bool AzureBlobStorage::deleteBlob(const DeleteAzureBlobStorageParameters& params
   }
 }
 
-std::optional<uint64_t> AzureBlobStorage::fetchBlob(const FetchAzureBlobStorageParameters& params, io::BaseStream& stream) {
+std::optional<uint64_t> AzureBlobStorage::fetchBlob(const FetchAzureBlobStorageParameters& params, io::OutputStream& stream) {
   try {
     auto fetch_res = blob_storage_client_->fetchBlob(params);
     return internal::pipe(*fetch_res, stream);
diff --git a/extensions/azure/storage/AzureBlobStorage.h b/extensions/azure/storage/AzureBlobStorage.h
index 5b1acb804..8b7f9b16e 100644
--- a/extensions/azure/storage/AzureBlobStorage.h
+++ b/extensions/azure/storage/AzureBlobStorage.h
@@ -66,7 +66,7 @@ class AzureBlobStorage {
   std::optional<bool> createContainerIfNotExists(const PutAzureBlobStorageParameters& params);
   std::optional<UploadBlobResult> uploadBlob(const PutAzureBlobStorageParameters& params, gsl::span<const std::byte> buffer);
   bool deleteBlob(const DeleteAzureBlobStorageParameters& params);
-  std::optional<uint64_t> fetchBlob(const FetchAzureBlobStorageParameters& params, io::BaseStream& stream);
+  std::optional<uint64_t> fetchBlob(const FetchAzureBlobStorageParameters& params, io::OutputStream& stream);
   std::optional<ListContainerResult> listContainer(const ListAzureBlobStorageParameters& params);
 
  private:
diff --git a/extensions/azure/storage/AzureDataLakeStorage.cpp b/extensions/azure/storage/AzureDataLakeStorage.cpp
index 1546b8f05..b53bc201a 100644
--- a/extensions/azure/storage/AzureDataLakeStorage.cpp
+++ b/extensions/azure/storage/AzureDataLakeStorage.cpp
@@ -92,7 +92,7 @@ bool AzureDataLakeStorage::deleteFile(const DeleteAzureDataLakeStorageParameters
   }
 }
 
-std::optional<uint64_t> AzureDataLakeStorage::fetchFile(const FetchAzureDataLakeStorageParameters& params, io::BaseStream& stream) {
+std::optional<uint64_t> AzureDataLakeStorage::fetchFile(const FetchAzureDataLakeStorageParameters& params, io::OutputStream& stream) {
   try {
     auto result = data_lake_storage_client_->fetchFile(params);
     return internal::pipe(*result, stream);
diff --git a/extensions/azure/storage/AzureDataLakeStorage.h b/extensions/azure/storage/AzureDataLakeStorage.h
index 8495a4d18..d7e05f7db 100644
--- a/extensions/azure/storage/AzureDataLakeStorage.h
+++ b/extensions/azure/storage/AzureDataLakeStorage.h
@@ -69,7 +69,7 @@ class AzureDataLakeStorage {
 
   storage::UploadDataLakeStorageResult uploadFile(const storage::PutAzureDataLakeStorageParameters& params, gsl::span<const std::byte> buffer);
   bool deleteFile(const storage::DeleteAzureDataLakeStorageParameters& params);
-  std::optional<uint64_t> fetchFile(const FetchAzureDataLakeStorageParameters& params, io::BaseStream& stream);
+  std::optional<uint64_t> fetchFile(const FetchAzureDataLakeStorageParameters& params, io::OutputStream& stream);
   std::optional<ListDataLakeStorageResult> listDirectory(const ListAzureDataLakeStorageParameters& params);
 
  private:
diff --git a/extensions/coap/protocols/CoapC2Protocol.cpp b/extensions/coap/protocols/CoapC2Protocol.cpp
index 74c4421f5..a08b5fd72 100644
--- a/extensions/coap/protocols/CoapC2Protocol.cpp
+++ b/extensions/coap/protocols/CoapC2Protocol.cpp
@@ -19,7 +19,7 @@
 #include "c2/PayloadParser.h"
 #include "coap_functions.h"
 #include "coap_message.h"
-#include "io/BaseStream.h"
+#include "io/OutputStream.h"
 #include "core/Resource.h"
 #include "utils/gsl.h"
 
@@ -50,7 +50,7 @@ minifi::c2::C2Payload CoapProtocol::consumePayload(const std::string &url, const
   return RESTSender::consumePayload(url, payload, direction, false);
 }
 
-int CoapProtocol::writeAcknowledgement(io::BaseStream *stream, const minifi::c2::C2Payload &payload) {
+int CoapProtocol::writeAcknowledgement(io::OutputStream *stream, const minifi::c2::C2Payload &payload) {
   auto ident = payload.getIdentifier();
   auto state = payload.getStatus().getState();
   stream->write(ident);
@@ -78,7 +78,7 @@ int CoapProtocol::writeAcknowledgement(io::BaseStream *stream, const minifi::c2:
   return 0;
 }
 
-int CoapProtocol::writeHeartbeat(io::BaseStream *stream, const minifi::c2::C2Payload &payload) {
+int CoapProtocol::writeHeartbeat(io::OutputStream *stream, const minifi::c2::C2Payload &payload) {
   bool byte;
   uint16_t size = 0;
 
diff --git a/extensions/coap/protocols/CoapC2Protocol.h b/extensions/coap/protocols/CoapC2Protocol.h
index c39275522..b1b9de93d 100644
--- a/extensions/coap/protocols/CoapC2Protocol.h
+++ b/extensions/coap/protocols/CoapC2Protocol.h
@@ -28,7 +28,7 @@
 #include <mutex>
 
 #include "c2/C2Protocol.h"
-#include "io/BaseStream.h"
+#include "io/OutputStream.h"
 #include "agent/agent_version.h"
 #include "CoapConnector.h"
 
@@ -92,20 +92,20 @@ class CoapProtocol : public minifi::c2::RESTSender {
   static minifi::c2::Operation getOperation(int type);
 
   /**
-   * Writes a heartbeat to the provided BaseStream ptr.
-   * @param stream BaseStream
+   * Writes a heartbeat to the provided OutputStream ptr.
+   * @param stream OutputStream
    * @param payload payload to serialize
    * @return result 0 if success failure otherwise
    */
-  int writeHeartbeat(io::BaseStream *stream, const minifi::c2::C2Payload &payload);
+  int writeHeartbeat(io::OutputStream *stream, const minifi::c2::C2Payload &payload);
 
   /**
-   * Writes a acknowledgement to the provided BaseStream ptr.
-   * @param stream BaseStream
+   * Writes a acknowledgement to the provided OutputStream ptr.
+   * @param stream OutputStream
    * @param payload payload to serialize
    * @return result 0 if success failure otherwise
    */
-  static int writeAcknowledgement(io::BaseStream *stream, const minifi::c2::C2Payload &payload);
+  static int writeAcknowledgement(io::OutputStream *stream, const minifi::c2::C2Payload &payload);
 
   minifi::c2::C2Payload serialize(const minifi::c2::C2Payload &payload);
 
diff --git a/extensions/coap/tests/CoapC2VerifyHeartbeat.cpp b/extensions/coap/tests/CoapC2VerifyHeartbeat.cpp
index 6bc87aaec..703f5cff3 100644
--- a/extensions/coap/tests/CoapC2VerifyHeartbeat.cpp
+++ b/extensions/coap/tests/CoapC2VerifyHeartbeat.cpp
@@ -52,7 +52,6 @@
 #include "processors/LogAttribute.h"
 #include "CoapC2Protocol.h"
 #include "CoapServer.h"
-#include "io/BaseStream.h"
 #include "concurrentqueue.h"
 #include "utils/IntegrationTestUtils.h"
 
diff --git a/extensions/gcp/processors/FetchGCSObject.cpp b/extensions/gcp/processors/FetchGCSObject.cpp
index 33f4b85d9..13ad382a3 100644
--- a/extensions/gcp/processors/FetchGCSObject.cpp
+++ b/extensions/gcp/processors/FetchGCSObject.cpp
@@ -37,7 +37,7 @@ class FetchFromGCSCallback {
         client_(client) {
   }
 
-  int64_t operator()(const std::shared_ptr<io::BaseStream>& stream) {
+  int64_t operator()(const std::shared_ptr<io::OutputStream>& stream) {
     auto reader = client_.ReadObject(bucket_, key_, encryption_key_, generation_, gcs::IfGenerationNotMatch(0));
     auto set_members = gsl::finally([&]{
       status_ = reader.status();
diff --git a/extensions/gcp/processors/PutGCSObject.cpp b/extensions/gcp/processors/PutGCSObject.cpp
index 2bee2ae43..52a3c433f 100644
--- a/extensions/gcp/processors/PutGCSObject.cpp
+++ b/extensions/gcp/processors/PutGCSObject.cpp
@@ -37,7 +37,7 @@ class UploadToGCSCallback {
         client_(client) {
   }
 
-  int64_t operator()(const std::shared_ptr<io::BaseStream>& stream) {
+  int64_t operator()(const std::shared_ptr<io::InputStream>& stream) {
     std::string content;
     content.resize(stream->size());
     const auto read_ret = stream->read(gsl::make_span(content).as_span<std::byte>());
diff --git a/extensions/http-curl/client/HTTPCallback.h b/extensions/http-curl/client/HTTPCallback.h
index 9c4a20c08..8f95712bc 100644
--- a/extensions/http-curl/client/HTTPCallback.h
+++ b/extensions/http-curl/client/HTTPCallback.h
@@ -65,7 +65,7 @@ class HttpStreamingCallback final : public utils::HTTPUploadCallback {
     seekInner(lock, pos);
   }
 
-  int64_t operator()(const std::shared_ptr<io::BaseStream>& stream) override {
+  int64_t operator()(const std::shared_ptr<io::InputStream>& stream) override {
     std::vector<std::byte> vec;
 
     if (stream->size() > 0) {
diff --git a/extensions/http-curl/sitetosite/HTTPProtocol.cpp b/extensions/http-curl/sitetosite/HTTPProtocol.cpp
index 9c7b01809..4e713bd45 100644
--- a/extensions/http-curl/sitetosite/HTTPProtocol.cpp
+++ b/extensions/http-curl/sitetosite/HTTPProtocol.cpp
@@ -164,7 +164,7 @@ int HttpSiteToSiteClient::readResponse(const std::shared_ptr<sitetosite::Transac
   return SiteToSiteClient::readResponse(transaction, code, message);
 }
 // write respond
-int HttpSiteToSiteClient::writeResponse(const std::shared_ptr<sitetosite::Transaction> &transaction, sitetosite::RespondCode code, std::string message) {
+int HttpSiteToSiteClient::writeResponse(const std::shared_ptr<sitetosite::Transaction> &transaction, sitetosite::RespondCode code, const std::string& message) {
   current_code = code;
   if (code == sitetosite::CONFIRM_TRANSACTION || code == sitetosite::FINISH_TRANSACTION) {
     return 1;
diff --git a/extensions/http-curl/sitetosite/HTTPProtocol.h b/extensions/http-curl/sitetosite/HTTPProtocol.h
index 19fd8b38a..b313f7993 100644
--- a/extensions/http-curl/sitetosite/HTTPProtocol.h
+++ b/extensions/http-curl/sitetosite/HTTPProtocol.h
@@ -88,7 +88,7 @@ class HttpSiteToSiteClient : public sitetosite::SiteToSiteClient {
 
   int readResponse(const std::shared_ptr<sitetosite::Transaction> &transaction, sitetosite::RespondCode &code, std::string &message) override;
   // write respond
-  int writeResponse(const std::shared_ptr<sitetosite::Transaction> &transaction, sitetosite::RespondCode code, std::string message) override;
+  int writeResponse(const std::shared_ptr<sitetosite::Transaction> &transaction, sitetosite::RespondCode code, const std::string& message) override;
 
   /**
    * Bootstrapping is not really required for the HTTP Site To Site so we will set the peer state and return true.
diff --git a/extensions/http-curl/tests/CivetStream.h b/extensions/http-curl/tests/CivetStream.h
index 16d5c3c5d..a963503ee 100644
--- a/extensions/http-curl/tests/CivetStream.h
+++ b/extensions/http-curl/tests/CivetStream.h
@@ -19,7 +19,6 @@
 
 #include <memory>
 
-#include "io/BaseStream.h"
 #include "civetweb.h"
 #include "utils/gsl.h"
 
diff --git a/extensions/http-curl/tests/HTTPHandlers.h b/extensions/http-curl/tests/HTTPHandlers.h
index a49090e72..c0f8a9ad2 100644
--- a/extensions/http-curl/tests/HTTPHandlers.h
+++ b/extensions/http-curl/tests/HTTPHandlers.h
@@ -301,7 +301,7 @@ class FlowFileResponder : public ServerAwareHandler {
           "Connection: close\r\n\r\n",
           total);
       minifi::io::BufferStream serializer;
-      minifi::io::CRCStream < minifi::io::BaseStream > stream(gsl::make_not_null(&serializer));
+      minifi::io::CRCStream <minifi::io::OutputStream> stream(gsl::make_not_null(&serializer));
       for (const auto& flow : flows) {
         uint32_t num_attributes = gsl::narrow<uint32_t>(flow->attributes.size());
         stream.write(num_attributes);
diff --git a/extensions/http-curl/tests/unit/InvokeHTTPTests.cpp b/extensions/http-curl/tests/unit/InvokeHTTPTests.cpp
index 9d04be6c4..4fcad4071 100644
--- a/extensions/http-curl/tests/unit/InvokeHTTPTests.cpp
+++ b/extensions/http-curl/tests/unit/InvokeHTTPTests.cpp
@@ -19,7 +19,6 @@
 #include <memory>
 #include <utility>
 #include <string>
-#include "io/BaseStream.h"
 #include "TestBase.h"
 #include "Catch.h"
 #include "core/Core.h"
diff --git a/extensions/jni/jvm/JniProcessSession.h b/extensions/jni/jvm/JniProcessSession.h
index 8bd286427..534466b3f 100644
--- a/extensions/jni/jvm/JniProcessSession.h
+++ b/extensions/jni/jvm/JniProcessSession.h
@@ -25,7 +25,6 @@
 #include <iterator>
 #include <algorithm>
 
-#include "io/BaseStream.h"
 #include "FlowFileRecord.h"
 #include "core/ProcessSession.h"
 #include "core/ProcessSessionFactory.h"
diff --git a/extensions/jni/jvm/JniReferenceObjects.h b/extensions/jni/jvm/JniReferenceObjects.h
index 366e89546..1d16a1f81 100644
--- a/extensions/jni/jvm/JniReferenceObjects.h
+++ b/extensions/jni/jvm/JniReferenceObjects.h
@@ -105,7 +105,7 @@ class JniByteInputStream {
       : buffer_(size),
         read_size_(0) {
   }
-  int64_t operator()(const std::shared_ptr<minifi::io::BaseStream>& stream) {
+  int64_t operator()(const std::shared_ptr<minifi::io::InputStream>& stream) {
     stream_ = stream;
     return 0;
   }
@@ -145,7 +145,7 @@ class JniByteInputStream {
     return stream_->read(arr);
   }
 
-  std::shared_ptr<minifi::io::BaseStream> stream_;
+  std::shared_ptr<minifi::io::InputStream> stream_;
   std::vector<std::byte> buffer_;
   uint64_t read_size_;
 };
diff --git a/extensions/libarchive/CompressContent.h b/extensions/libarchive/CompressContent.h
index 9694fe30d..d3ac8531e 100644
--- a/extensions/libarchive/CompressContent.h
+++ b/extensions/libarchive/CompressContent.h
@@ -112,14 +112,14 @@ class CompressContent : public core::Processor {
     std::shared_ptr<core::ProcessSession> session_;
     bool success_{false};
 
-    int64_t operator()(const std::shared_ptr<io::BaseStream>& output_stream) {
+    int64_t operator()(const std::shared_ptr<io::OutputStream>& output_stream) {
       std::shared_ptr<io::ZlibBaseStream> filterStream;
       if (compress_mode_ == CompressionMode::Compress) {
         filterStream = std::make_shared<io::ZlibCompressStream>(gsl::make_not_null(output_stream.get()), io::ZlibCompressionFormat::GZIP, compress_level_);
       } else {
         filterStream = std::make_shared<io::ZlibDecompressStream>(gsl::make_not_null(output_stream.get()), io::ZlibCompressionFormat::GZIP);
       }
-      session_->read(flow_, [this, &filterStream](const std::shared_ptr<io::BaseStream>& input_stream) -> int64_t {
+      session_->read(flow_, [this, &filterStream](const std::shared_ptr<io::InputStream>& input_stream) -> int64_t {
         std::vector<std::byte> buffer(16 * 1024U);
         size_t read_size = 0;
         while (read_size < flow_->getSize()) {
diff --git a/extensions/libarchive/FocusArchiveEntry.cpp b/extensions/libarchive/FocusArchiveEntry.cpp
index 9d11dd7c9..cd057f2d9 100644
--- a/extensions/libarchive/FocusArchiveEntry.cpp
+++ b/extensions/libarchive/FocusArchiveEntry.cpp
@@ -85,7 +85,7 @@ void FocusArchiveEntry::onTrigger(core::ProcessContext *context, core::ProcessSe
   }
 
   // Restore target archive entry
-  if (targetEntryStashKey != "") {
+  if (!targetEntryStashKey.empty()) {
     session->restore(targetEntryStashKey, flowFile);
   } else {
     logger_->log_warn("FocusArchiveEntry failed to locate target entry: %s",
@@ -127,7 +127,7 @@ void FocusArchiveEntry::onTrigger(core::ProcessContext *context, core::ProcessSe
 }
 
 struct FocusArchiveEntryReadData {
-  std::shared_ptr<io::BaseStream> stream;
+  std::shared_ptr<io::InputStream> stream;
   core::Processor *processor;
   std::array<std::byte, 8196> buf;
 };
@@ -152,7 +152,7 @@ la_ssize_t FocusArchiveEntry::ReadCallback::read_cb(struct archive * a, void *d,
   return gsl::narrow<la_ssize_t>(read);
 }
 
-int64_t FocusArchiveEntry::ReadCallback::operator()(const std::shared_ptr<io::BaseStream>& stream) const {
+int64_t FocusArchiveEntry::ReadCallback::operator()(const std::shared_ptr<io::InputStream>& stream) const {
   auto inputArchive = archive_read_new();
   struct archive_entry *entry;
   int64_t nlen = 0;
diff --git a/extensions/libarchive/FocusArchiveEntry.h b/extensions/libarchive/FocusArchiveEntry.h
index 8db05eeb4..944de6661 100644
--- a/extensions/libarchive/FocusArchiveEntry.h
+++ b/extensions/libarchive/FocusArchiveEntry.h
@@ -62,7 +62,7 @@ class FocusArchiveEntry : public core::Processor {
   class ReadCallback {
    public:
     explicit ReadCallback(core::Processor*, utils::file::FileManager *file_man, ArchiveMetadata *archiveMetadata);
-    int64_t operator()(const std::shared_ptr<io::BaseStream>& stream) const;
+    int64_t operator()(const std::shared_ptr<io::InputStream>& stream) const;
     bool isRunning() {return proc_->isRunning();}
 
    private:
diff --git a/extensions/libarchive/MergeContent.h b/extensions/libarchive/MergeContent.h
index 40bf1096b..7880c13da 100644
--- a/extensions/libarchive/MergeContent.h
+++ b/extensions/libarchive/MergeContent.h
@@ -78,7 +78,7 @@ class BinaryConcatenationMerge : public MergeBin {
     std::deque<std::shared_ptr<core::FlowFile>> &flows_;
     FlowFileSerializer& serializer_;
 
-    int64_t operator()(const std::shared_ptr<io::BaseStream>& stream) const {
+    int64_t operator()(const std::shared_ptr<io::OutputStream>& stream) const {
       size_t write_size_sum = 0;
       if (!header_.empty()) {
         const auto write_ret = stream->write(reinterpret_cast<const uint8_t*>(header_.data()), header_.size());
@@ -164,7 +164,7 @@ class ArchiveMerge {
 
     std::string merge_type_;
     std::deque<std::shared_ptr<core::FlowFile>> &flows_;
-    std::shared_ptr<io::BaseStream> stream_;
+    std::shared_ptr<io::OutputStream> stream_;
     size_t size_;
     std::shared_ptr<core::logging::Logger> logger_ = core::logging::LoggerFactory<ArchiveMerge>::getLogger();
     FlowFileSerializer& serializer_;
@@ -190,7 +190,7 @@ class ArchiveMerge {
       return totalWrote;
     }
 
-    int64_t operator()(const std::shared_ptr<io::BaseStream>& stream) {
+    int64_t operator()(const std::shared_ptr<io::OutputStream>& stream) {
       struct archive *arch;
 
       arch = archive_write_new();
diff --git a/extensions/libarchive/UnfocusArchiveEntry.cpp b/extensions/libarchive/UnfocusArchiveEntry.cpp
index 02222b98b..fe1f86f36 100644
--- a/extensions/libarchive/UnfocusArchiveEntry.cpp
+++ b/extensions/libarchive/UnfocusArchiveEntry.cpp
@@ -140,7 +140,7 @@ UnfocusArchiveEntry::WriteCallback::WriteCallback(ArchiveMetadata *archiveMetada
 }
 
 struct UnfocusArchiveEntryWriteData {
-  std::shared_ptr<io::BaseStream> stream;
+  std::shared_ptr<io::OutputStream> stream;
 };
 
 la_ssize_t UnfocusArchiveEntry::WriteCallback::write_cb(struct archive *, void *d, const void *buffer, size_t length) {
@@ -149,7 +149,7 @@ la_ssize_t UnfocusArchiveEntry::WriteCallback::write_cb(struct archive *, void *
   return io::isError(write_ret) ? -1 : gsl::narrow<la_ssize_t>(write_ret);
 }
 
-int64_t UnfocusArchiveEntry::WriteCallback::operator()(const std::shared_ptr<io::BaseStream>& stream) const {
+int64_t UnfocusArchiveEntry::WriteCallback::operator()(const std::shared_ptr<io::OutputStream>& stream) const {
   auto outputArchive = archive_write_new();
   int64_t nlen = 0;
 
diff --git a/extensions/libarchive/UnfocusArchiveEntry.h b/extensions/libarchive/UnfocusArchiveEntry.h
index 587109afa..65733817a 100644
--- a/extensions/libarchive/UnfocusArchiveEntry.h
+++ b/extensions/libarchive/UnfocusArchiveEntry.h
@@ -64,7 +64,7 @@ class UnfocusArchiveEntry : public core::Processor {
   class WriteCallback {
    public:
     explicit WriteCallback(ArchiveMetadata *archiveMetadata);
-    int64_t operator()(const std::shared_ptr<io::BaseStream>& stream) const;
+    int64_t operator()(const std::shared_ptr<io::OutputStream>& stream) const;
    private:
     //! Logger
     std::shared_ptr<Logger> logger_ = core::logging::LoggerFactory<UnfocusArchiveEntry>::getLogger();
diff --git a/extensions/librdkafka/PublishKafka.cpp b/extensions/librdkafka/PublishKafka.cpp
index 470ed661c..6c141fc24 100644
--- a/extensions/librdkafka/PublishKafka.cpp
+++ b/extensions/librdkafka/PublishKafka.cpp
@@ -258,7 +258,7 @@ class ReadCallback {
   ReadCallback(const ReadCallback&) = delete;
   ReadCallback& operator=(ReadCallback) = delete;
 
-  int64_t operator()(const std::shared_ptr<io::BaseStream>& stream) {
+  int64_t operator()(const std::shared_ptr<io::InputStream>& stream) {
     std::vector<std::byte> buffer;
 
     buffer.resize(max_seg_size_);
@@ -734,7 +734,7 @@ void PublishKafka::onTrigger(const std::shared_ptr<core::ProcessContext> &contex
     context->getProperty(FailEmptyFlowFiles.getName(), failEmptyFlowFiles);
 
     ReadCallback callback(max_flow_seg_size_, kafkaKey, thisTopic->getTopic(), conn_->getConnection(), *flowFile,
-                                        attributeNameRegex_, messages, flow_file_index, failEmptyFlowFiles, logger_);
+                          attributeNameRegex_, messages, flow_file_index, failEmptyFlowFiles, logger_);
     session->read(flowFile, std::ref(callback));
 
     if (!callback.called_) {
diff --git a/extensions/mqtt/processors/ConsumeMQTT.cpp b/extensions/mqtt/processors/ConsumeMQTT.cpp
index 2de3fdf08..ba82871ff 100644
--- a/extensions/mqtt/processors/ConsumeMQTT.cpp
+++ b/extensions/mqtt/processors/ConsumeMQTT.cpp
@@ -80,7 +80,7 @@ void ConsumeMQTT::onTrigger(const std::shared_ptr<core::ProcessContext>& /*conte
     const auto& message = msg_queue.front();
     std::shared_ptr<core::FlowFile> processFlowFile = session->create();
     int write_status{};
-    session->write(processFlowFile, [&message, &write_status](const std::shared_ptr<io::BaseStream>& stream) -> int64_t {
+    session->write(processFlowFile, [&message, &write_status](const std::shared_ptr<io::OutputStream>& stream) -> int64_t {
       if (message->payloadlen < 0) {
         write_status = -1;
         return -1;
diff --git a/extensions/mqtt/processors/PublishMQTT.cpp b/extensions/mqtt/processors/PublishMQTT.cpp
index e73544016..218a4220a 100644
--- a/extensions/mqtt/processors/PublishMQTT.cpp
+++ b/extensions/mqtt/processors/PublishMQTT.cpp
@@ -70,7 +70,7 @@ void PublishMQTT::onTrigger(const std::shared_ptr<core::ProcessContext>& /*conte
   }
 }
 
-int64_t PublishMQTT::ReadCallback::operator()(const std::shared_ptr<io::BaseStream>& stream) {
+int64_t PublishMQTT::ReadCallback::operator()(const std::shared_ptr<io::InputStream>& stream) {
   if (flow_size_ < max_seg_size_)
     max_seg_size_ = flow_size_;
   gsl_Expects(max_seg_size_ < gsl::narrow<uint64_t>(std::numeric_limits<int>::max()));
diff --git a/extensions/mqtt/processors/PublishMQTT.h b/extensions/mqtt/processors/PublishMQTT.h
index f940b73b5..38736ba2d 100644
--- a/extensions/mqtt/processors/PublishMQTT.h
+++ b/extensions/mqtt/processors/PublishMQTT.h
@@ -74,7 +74,7 @@ class PublishMQTT : public processors::AbstractMQTTProcessor {
           retain_(retain) {
     }
 
-    int64_t operator()(const std::shared_ptr<io::BaseStream>& stream);
+    int64_t operator()(const std::shared_ptr<io::InputStream>& stream);
 
     size_t read_size_ = 0;
     int status_ = 0;
diff --git a/extensions/opencv/CaptureRTSPFrame.cpp b/extensions/opencv/CaptureRTSPFrame.cpp
index 8f1d87a1c..eb0968ade 100644
--- a/extensions/opencv/CaptureRTSPFrame.cpp
+++ b/extensions/opencv/CaptureRTSPFrame.cpp
@@ -20,11 +20,7 @@
 #include "core/ProcessSession.h"
 #include "core/Resource.h"
 
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace processors {
+namespace org::apache::nifi::minifi::processors {
 
 const core::Property CaptureRTSPFrame::RTSPUsername(
     "RTSP Username",
@@ -140,7 +136,7 @@ void CaptureRTSPFrame::onTrigger(const std::shared_ptr<core::ProcessContext> &co
       session->putAttribute(flow_file, "filename", filename);
       session->putAttribute(flow_file, "video.backend.driver", video_backend_driver_);
 
-      session->write(flow_file, [&frame, this](const std::shared_ptr<io::BaseStream>& output_stream) -> int64_t {
+      session->write(flow_file, [&frame, this](const std::shared_ptr<io::OutputStream>& output_stream) -> int64_t {
         std::vector<uchar> image_buf;
         imencode(image_encoding_, frame, image_buf);
         const auto ret = output_stream->write(image_buf.data(), image_buf.size());
@@ -163,8 +159,4 @@ void CaptureRTSPFrame::notifyStop() {
 
 REGISTER_RESOURCE(CaptureRTSPFrame, Processor);
 
-} /* namespace processors */
-} /* namespace minifi */
-} /* namespace nifi */
-} /* namespace apache */
-} /* namespace org */
+}  // namespace org::apache::nifi::minifi::processors
diff --git a/extensions/opencv/MotionDetector.cpp b/extensions/opencv/MotionDetector.cpp
index 2a1d5bb17..75c7cf5be 100644
--- a/extensions/opencv/MotionDetector.cpp
+++ b/extensions/opencv/MotionDetector.cpp
@@ -152,7 +152,7 @@ void MotionDetector::onTrigger(const std::shared_ptr<core::ProcessContext> &cont
   }
   cv::Mat frame;
 
-  session->read(flow_file, [&frame](const std::shared_ptr<io::BaseStream>& input_stream) -> int64_t {
+  session->read(flow_file, [&frame](const std::shared_ptr<io::InputStream>& input_stream) -> int64_t {
     std::vector<uchar> image_buf;
     image_buf.resize(input_stream->size());
     const auto ret = input_stream->read(gsl::make_span(image_buf).as_span<std::byte>());
diff --git a/extensions/script/lua/LuaBaseStream.cpp b/extensions/script/lua/LuaInputStream.cpp
similarity index 77%
rename from extensions/script/lua/LuaBaseStream.cpp
rename to extensions/script/lua/LuaInputStream.cpp
index 76b75619e..5e7e4a466 100644
--- a/extensions/script/lua/LuaBaseStream.cpp
+++ b/extensions/script/lua/LuaInputStream.cpp
@@ -21,19 +21,15 @@
 #include <utility>
 #include <string>
 
-#include "LuaBaseStream.h"
+#include "LuaInputStream.h"
 
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace lua {
+namespace org::apache::nifi::minifi::lua {
 
-LuaBaseStream::LuaBaseStream(std::shared_ptr<io::BaseStream> stream)
+LuaInputStream::LuaInputStream(std::shared_ptr<io::InputStream> stream)
     : stream_(std::move(stream)) {
 }
 
-std::string LuaBaseStream::read(size_t len) {
+std::string LuaInputStream::read(size_t len) {
   if (len == 0) {
     len = stream_->size();
   }
@@ -59,12 +55,4 @@ std::string LuaBaseStream::read(size_t len) {
   return io::isError(read) ? std::string{} : buffer;
 }
 
-size_t LuaBaseStream::write(std::string buf) {
-  return stream_->write(reinterpret_cast<const uint8_t*>(buf.data()), buf.length());
-}
-
-} /* namespace lua */
-} /* namespace minifi */
-} /* namespace nifi */
-} /* namespace apache */
-} /* namespace org */
+}  // namespace org::apache::nifi::minifi::lua
diff --git a/extensions/script/lua/LuaBaseStream.h b/extensions/script/lua/LuaInputStream.h
similarity index 67%
copy from extensions/script/lua/LuaBaseStream.h
copy to extensions/script/lua/LuaInputStream.h
index 47b5017df..24df118c1 100644
--- a/extensions/script/lua/LuaBaseStream.h
+++ b/extensions/script/lua/LuaInputStream.h
@@ -21,17 +21,13 @@
 #include <string>
 
 #include "sol/sol.hpp"
-#include "io/BaseStream.h"
+#include "io/InputStream.h"
 
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace lua {
+namespace org::apache::nifi::minifi::lua {
 
-class LuaBaseStream {
+class LuaInputStream {
  public:
-  explicit LuaBaseStream(std::shared_ptr<io::BaseStream> stream);
+  explicit LuaInputStream(std::shared_ptr<io::InputStream> stream);
 
   /**
    * Read n bytes of data (returns string, to follow Lua idioms)
@@ -39,19 +35,8 @@ class LuaBaseStream {
    */
   std::string read(size_t len = 0);
 
-  /**
-   * Write data (receives string, to follow Lua idioms)
-   * @param buf
-   * @return
-   */
-  size_t write(std::string buf);
-
  private:
-  std::shared_ptr<io::BaseStream> stream_;
+  std::shared_ptr<io::InputStream> stream_;
 };
 
-} /* namespace lua */
-} /* namespace minifi */
-} /* namespace nifi */
-} /* namespace apache */
-} /* namespace org */
+}  // namespace org::apache::nifi::minifi::lua
diff --git a/libminifi/src/io/BaseStream.cpp b/extensions/script/lua/LuaOutputStream.cpp
similarity index 66%
rename from libminifi/src/io/BaseStream.cpp
rename to extensions/script/lua/LuaOutputStream.cpp
index 013747376..92e91ee16 100644
--- a/libminifi/src/io/BaseStream.cpp
+++ b/extensions/script/lua/LuaOutputStream.cpp
@@ -15,19 +15,21 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-#include "io/BaseStream.h"
-#include <vector>
+
+#include <memory>
+#include <utility>
 #include <string>
-#include "core/expect.h"
 
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace io {
+#include "LuaOutputStream.h"
+
+namespace org::apache::nifi::minifi::lua {
+
+LuaOutputStream::LuaOutputStream(std::shared_ptr<io::OutputStream> stream)
+    : stream_(std::move(stream)) {
+}
+
+size_t LuaOutputStream::write(std::string buf) {
+  return stream_->write(reinterpret_cast<const uint8_t*>(buf.data()), buf.length());
+}
 
-} /* namespace io */
-} /* namespace minifi */
-} /* namespace nifi */
-} /* namespace apache */
-} /* namespace org */
+}  // namespace org::apache::nifi::minifi::lua
diff --git a/extensions/script/lua/LuaBaseStream.h b/extensions/script/lua/LuaOutputStream.h
similarity index 67%
rename from extensions/script/lua/LuaBaseStream.h
rename to extensions/script/lua/LuaOutputStream.h
index 47b5017df..8bd8c49a6 100644
--- a/extensions/script/lua/LuaBaseStream.h
+++ b/extensions/script/lua/LuaOutputStream.h
@@ -21,23 +21,13 @@
 #include <string>
 
 #include "sol/sol.hpp"
-#include "io/BaseStream.h"
+#include "io/OutputStream.h"
 
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace lua {
+namespace org::apache::nifi::minifi::lua {
 
-class LuaBaseStream {
+class LuaOutputStream {
  public:
-  explicit LuaBaseStream(std::shared_ptr<io::BaseStream> stream);
-
-  /**
-   * Read n bytes of data (returns string, to follow Lua idioms)
-   * @return
-   */
-  std::string read(size_t len = 0);
+  explicit LuaOutputStream(std::shared_ptr<io::OutputStream> stream);
 
   /**
    * Write data (receives string, to follow Lua idioms)
@@ -47,11 +37,7 @@ class LuaBaseStream {
   size_t write(std::string buf);
 
  private:
-  std::shared_ptr<io::BaseStream> stream_;
+  std::shared_ptr<io::OutputStream> stream_;
 };
 
-} /* namespace lua */
-} /* namespace minifi */
-} /* namespace nifi */
-} /* namespace apache */
-} /* namespace org */
+}  // namespace org::apache::nifi::minifi::lua
diff --git a/extensions/script/lua/LuaProcessSession.cpp b/extensions/script/lua/LuaProcessSession.cpp
index 7b9916b8c..44a683711 100644
--- a/extensions/script/lua/LuaProcessSession.cpp
+++ b/extensions/script/lua/LuaProcessSession.cpp
@@ -20,11 +20,7 @@
 
 #include "LuaProcessSession.h"
 
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace lua {
+namespace org::apache::nifi::minifi::lua {
 
 LuaProcessSession::LuaProcessSession(std::shared_ptr<core::ProcessSession> session)
     : session_(std::move(session)) {
@@ -48,7 +44,7 @@ std::shared_ptr<script::ScriptFlowFile> LuaProcessSession::get() {
 }
 
 void LuaProcessSession::transfer(const std::shared_ptr<script::ScriptFlowFile> &script_flow_file,
-                                 core::Relationship relationship) {
+                                 const core::Relationship& relationship) {
   if (!session_) {
     throw std::runtime_error("Access of ProcessSession after it has been released");
   }
@@ -74,9 +70,9 @@ void LuaProcessSession::read(const std::shared_ptr<script::ScriptFlowFile> &scri
     throw std::runtime_error("Access of FlowFile after it has been released");
   }
 
-  session_->read(flow_file, [&input_stream_callback](const std::shared_ptr<io::BaseStream>& input_stream) -> int64_t {
+  session_->read(flow_file, [&input_stream_callback](const std::shared_ptr<io::InputStream>& input_stream) -> int64_t {
     sol::function callback = input_stream_callback["process"];
-    return callback(input_stream_callback, std::make_shared<LuaBaseStream>(input_stream));
+    return callback(input_stream_callback, std::make_shared<LuaInputStream>(input_stream));
   });
 }
 
@@ -92,9 +88,9 @@ void LuaProcessSession::write(const std::shared_ptr<script::ScriptFlowFile> &scr
     throw std::runtime_error("Access of FlowFile after it has been released");
   }
 
-  session_->write(flow_file, [&output_stream_callback](const std::shared_ptr<io::BaseStream>& output_stream) -> int64_t {
+  session_->write(flow_file, [&output_stream_callback](const std::shared_ptr<io::OutputStream>& output_stream) -> int64_t {
     sol::function callback = output_stream_callback["process"];
-    return callback(output_stream_callback, std::make_shared<LuaBaseStream>(output_stream));
+    return callback(output_stream_callback, std::make_shared<LuaOutputStream>(output_stream));
   });
 }
 
@@ -135,8 +131,4 @@ void LuaProcessSession::releaseCoreResources() {
   session_.reset();
 }
 
-} /* namespace lua */
-} /* namespace minifi */
-} /* namespace nifi */
-} /* namespace apache */
-} /* namespace org */
+}  // namespace org::apache::nifi::minifi::lua
diff --git a/extensions/script/lua/LuaProcessSession.h b/extensions/script/lua/LuaProcessSession.h
index 27c4709e1..c428239fb 100644
--- a/extensions/script/lua/LuaProcessSession.h
+++ b/extensions/script/lua/LuaProcessSession.h
@@ -24,13 +24,10 @@
 #include "../ScriptFlowFile.h"
 
 #include "sol/sol.hpp"
-#include "LuaBaseStream.h"
+#include "LuaInputStream.h"
+#include "LuaOutputStream.h"
 
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace lua {
+namespace org::apache::nifi::minifi::lua {
 
 class LuaProcessSession {
  public:
@@ -39,7 +36,7 @@ class LuaProcessSession {
   std::shared_ptr<script::ScriptFlowFile> get();
   std::shared_ptr<script::ScriptFlowFile> create();
   std::shared_ptr<script::ScriptFlowFile> create(const std::shared_ptr<script::ScriptFlowFile> &flow_file);
-  void transfer(const std::shared_ptr<script::ScriptFlowFile> &flow_file, core::Relationship relationship);
+  void transfer(const std::shared_ptr<script::ScriptFlowFile> &flow_file, const core::Relationship& relationship);
   void read(const std::shared_ptr<script::ScriptFlowFile> &script_flow_file, sol::table input_stream_callback);
   void write(const std::shared_ptr<script::ScriptFlowFile> &flow_file, sol::table output_stream_callback);
 
@@ -58,8 +55,4 @@ class LuaProcessSession {
   std::shared_ptr<core::ProcessSession> session_;
 };
 
-} /* namespace lua */
-} /* namespace minifi */
-} /* namespace nifi */
-} /* namespace apache */
-} /* namespace org */
+}  // namespace org::apache::nifi::minifi::lua
diff --git a/extensions/script/lua/LuaScriptEngine.cpp b/extensions/script/lua/LuaScriptEngine.cpp
index 28142c760..309a89d80 100644
--- a/extensions/script/lua/LuaScriptEngine.cpp
+++ b/extensions/script/lua/LuaScriptEngine.cpp
@@ -54,10 +54,12 @@ LuaScriptEngine::LuaScriptEngine() {
       "removeAttribute", &script::ScriptFlowFile::removeAttribute,
       "updateAttribute", &script::ScriptFlowFile::updateAttribute,
       "setAttribute", &script::ScriptFlowFile::setAttribute);
-  lua_.new_usertype<lua::LuaBaseStream>(
-      "BaseStream",
-      "read", &lua::LuaBaseStream::read,
-      "write", &lua::LuaBaseStream::write);
+  lua_.new_usertype<lua::LuaInputStream>(
+      "InputStream",
+      "read", &lua::LuaInputStream::read);
+  lua_.new_usertype<lua::LuaOutputStream>(
+      "OutputStream",
+      "write", &lua::LuaOutputStream::write);
 }
 
 void LuaScriptEngine::executeScriptWithAppendedModulePaths(std::string& script) {
diff --git a/extensions/script/python/PyBaseStream.cpp b/extensions/script/python/PyInputStream.cpp
similarity index 80%
rename from extensions/script/python/PyBaseStream.cpp
rename to extensions/script/python/PyInputStream.cpp
index bc51c1ec5..1bc74ceb3 100644
--- a/extensions/script/python/PyBaseStream.cpp
+++ b/extensions/script/python/PyInputStream.cpp
@@ -21,21 +21,21 @@
 #include <string>
 #include <vector>
 
-#include "PyBaseStream.h"
+#include "PyInputStream.h"
 
 #include "utils/gsl.h"
 
 namespace org::apache::nifi::minifi::python {
 
-PyBaseStream::PyBaseStream(std::shared_ptr<io::BaseStream> stream)
+PyInputStream::PyInputStream(std::shared_ptr<io::InputStream> stream)
     : stream_(std::move(stream)) {
 }
 
-py::bytes PyBaseStream::read() {
+py::bytes PyInputStream::read() {
   return read(stream_->size());
 }
 
-py::bytes PyBaseStream::read(size_t len) {
+py::bytes PyInputStream::read(size_t len) {
   if (len == 0) {
     len = stream_->size();
   }
@@ -50,8 +50,4 @@ py::bytes PyBaseStream::read(size_t len) {
   return {reinterpret_cast<char *>(buffer.data()), read};
 }
 
-size_t PyBaseStream::write(const py::bytes& buf) {
-  return stream_->write(gsl::make_span(static_cast<std::string>(buf)).as_span<const std::byte>());
-}
-
 }  // namespace org::apache::nifi::minifi::python
diff --git a/extensions/script/python/PyBaseStream.h b/extensions/script/python/PyInputStream.h
similarity index 84%
copy from extensions/script/python/PyBaseStream.h
copy to extensions/script/python/PyInputStream.h
index ac8bc3dca..d6265a8dc 100644
--- a/extensions/script/python/PyBaseStream.h
+++ b/extensions/script/python/PyInputStream.h
@@ -21,22 +21,21 @@
 #include <memory>
 
 #include "pybind11/embed.h"
-#include "io/BaseStream.h"
+#include "io/InputStream.h"
 
 namespace org::apache::nifi::minifi::python {
 
 namespace py = pybind11;
 
-class PyBaseStream {
+class PyInputStream {
  public:
-  explicit PyBaseStream(std::shared_ptr<io::BaseStream> stream);
+  explicit PyInputStream(std::shared_ptr<io::InputStream> stream);
 
   py::bytes read();
   py::bytes read(size_t len = 0);
-  size_t write(const py::bytes& buf);
 
  private:
-  std::shared_ptr<io::BaseStream> stream_;
+  std::shared_ptr<io::InputStream> stream_;
 };
 
 }  // namespace org::apache::nifi::minifi::python
diff --git a/extensions/script/python/PyBaseStream.h b/extensions/script/python/PyOutputStream.cpp
similarity index 72%
copy from extensions/script/python/PyBaseStream.h
copy to extensions/script/python/PyOutputStream.cpp
index ac8bc3dca..8f357ed76 100644
--- a/extensions/script/python/PyBaseStream.h
+++ b/extensions/script/python/PyOutputStream.cpp
@@ -16,27 +16,22 @@
  * limitations under the License.
  */
 
-#pragma once
-
 #include <memory>
+#include <utility>
+#include <string>
 
-#include "pybind11/embed.h"
-#include "io/BaseStream.h"
-
-namespace org::apache::nifi::minifi::python {
+#include "PyOutputStream.h"
 
-namespace py = pybind11;
+#include "utils/gsl.h"
 
-class PyBaseStream {
- public:
-  explicit PyBaseStream(std::shared_ptr<io::BaseStream> stream);
+namespace org::apache::nifi::minifi::python {
 
-  py::bytes read();
-  py::bytes read(size_t len = 0);
-  size_t write(const py::bytes& buf);
+PyOutputStream::PyOutputStream(std::shared_ptr<io::OutputStream> stream)
+    : stream_(std::move(stream)) {
+}
 
- private:
-  std::shared_ptr<io::BaseStream> stream_;
-};
+size_t PyOutputStream::write(const py::bytes& buf) {
+  return stream_->write(gsl::make_span(static_cast<std::string>(buf)).as_span<const std::byte>());
+}
 
 }  // namespace org::apache::nifi::minifi::python
diff --git a/extensions/script/python/PyBaseStream.h b/extensions/script/python/PyOutputStream.h
similarity index 83%
rename from extensions/script/python/PyBaseStream.h
rename to extensions/script/python/PyOutputStream.h
index ac8bc3dca..c983aa6e2 100644
--- a/extensions/script/python/PyBaseStream.h
+++ b/extensions/script/python/PyOutputStream.h
@@ -21,22 +21,20 @@
 #include <memory>
 
 #include "pybind11/embed.h"
-#include "io/BaseStream.h"
+#include "io/OutputStream.h"
 
 namespace org::apache::nifi::minifi::python {
 
 namespace py = pybind11;
 
-class PyBaseStream {
+class PyOutputStream {
  public:
-  explicit PyBaseStream(std::shared_ptr<io::BaseStream> stream);
+  explicit PyOutputStream(std::shared_ptr<io::OutputStream> stream);
 
-  py::bytes read();
-  py::bytes read(size_t len = 0);
   size_t write(const py::bytes& buf);
 
  private:
-  std::shared_ptr<io::BaseStream> stream_;
+  std::shared_ptr<io::OutputStream> stream_;
 };
 
 }  // namespace org::apache::nifi::minifi::python
diff --git a/extensions/script/python/PyProcessSession.cpp b/extensions/script/python/PyProcessSession.cpp
index d9047e0e0..843d00323 100644
--- a/extensions/script/python/PyProcessSession.cpp
+++ b/extensions/script/python/PyProcessSession.cpp
@@ -76,8 +76,8 @@ void PyProcessSession::read(const std::shared_ptr<script::ScriptFlowFile>& scrip
     throw std::runtime_error("Access of FlowFile after it has been released");
   }
 
-  session_->read(flow_file, [&input_stream_callback](const std::shared_ptr<io::BaseStream>& input_stream) -> int64_t {
-    return input_stream_callback.attr("process")(std::make_shared<PyBaseStream>(input_stream)).cast<int64_t>();
+  session_->read(flow_file, [&input_stream_callback](const std::shared_ptr<io::InputStream>& input_stream) -> int64_t {
+    return input_stream_callback.attr("process")(std::make_shared<PyInputStream>(input_stream)).cast<int64_t>();
   });
 }
 
@@ -93,8 +93,8 @@ void PyProcessSession::write(const std::shared_ptr<script::ScriptFlowFile>& scri
     throw std::runtime_error("Access of FlowFile after it has been released");
   }
 
-  session_->write(flow_file, [&output_stream_callback](const std::shared_ptr<io::BaseStream>& output_stream) -> int64_t {
-    return output_stream_callback.attr("process")(std::make_shared<PyBaseStream>(output_stream)).cast<int64_t>();
+  session_->write(flow_file, [&output_stream_callback](const std::shared_ptr<io::OutputStream>& output_stream) -> int64_t {
+    return output_stream_callback.attr("process")(std::make_shared<PyOutputStream>(output_stream)).cast<int64_t>();
   });
 }
 
diff --git a/extensions/script/python/PyProcessSession.h b/extensions/script/python/PyProcessSession.h
index 2aa5473da..898719854 100644
--- a/extensions/script/python/PyProcessSession.h
+++ b/extensions/script/python/PyProcessSession.h
@@ -25,7 +25,8 @@
 #include "pybind11/embed.h"
 #include "core/ProcessSession.h"
 #include "../ScriptFlowFile.h"
-#include "PyBaseStream.h"
+#include "PyInputStream.h"
+#include "PyOutputStream.h"
 
 #if defined(__GNUC__) || defined(__GNUG__)
 #pragma GCC visibility push(hidden)
diff --git a/extensions/script/python/PythonBindings.h b/extensions/script/python/PythonBindings.h
index 15f68b3c7..a406f02f8 100644
--- a/extensions/script/python/PythonBindings.h
+++ b/extensions/script/python/PythonBindings.h
@@ -30,7 +30,8 @@
 
 #include "PyProcessSession.h"
 #include "PythonProcessor.h"
-#include "PyBaseStream.h"
+#include "PyInputStream.h"
+#include "PyOutputStream.h"
 
 PYBIND11_EMBEDDED_MODULE(minifi_native, m) { // NOLINT
   namespace py = pybind11;
@@ -74,8 +75,10 @@ PYBIND11_EMBEDDED_MODULE(minifi_native, m) { // NOLINT
       .def("getName", &org::apache::nifi::minifi::core::Relationship::getName)
       .def("getDescription", &org::apache::nifi::minifi::core::Relationship::getDescription);
 
-  py::class_<python::PyBaseStream, std::shared_ptr<python::PyBaseStream>>(m, "BaseStream")
-      .def("read", static_cast<py::bytes (python::PyBaseStream::*)()>(&python::PyBaseStream::read))
-      .def("read", static_cast<py::bytes (python::PyBaseStream::*)(size_t)>(&python::PyBaseStream::read))
-      .def("write", &python::PyBaseStream::write);
+  py::class_<python::PyInputStream, std::shared_ptr<python::PyInputStream>>(m, "InputStream")
+      .def("read", static_cast<py::bytes (python::PyInputStream::*)()>(&python::PyInputStream::read))
+      .def("read", static_cast<py::bytes (python::PyInputStream::*)(size_t)>(&python::PyInputStream::read));
+
+  py::class_<python::PyOutputStream, std::shared_ptr<python::PyOutputStream>>(m, "OutputStream")
+      .def("write", &python::PyOutputStream::write);
 }
diff --git a/extensions/sftp/client/SFTPClient.cpp b/extensions/sftp/client/SFTPClient.cpp
index 7afcc98df..e2167bfca 100644
--- a/extensions/sftp/client/SFTPClient.cpp
+++ b/extensions/sftp/client/SFTPClient.cpp
@@ -464,7 +464,7 @@ SFTPError SFTPClient::getLastError() const {
   return last_error_;
 }
 
-bool SFTPClient::getFile(const std::string& path, io::BaseStream& output, int64_t expected_size /*= -1*/) {
+std::optional<uint64_t> SFTPClient::getFile(const std::string& path, io::OutputStream& output, int64_t expected_size /*= -1*/) {
   /**
    * SFTP servers should not set the mode of an existing file on open
    * (see https://tools.ietf.org/html/draft-ietf-secsh-filexfer-13, Page 33
@@ -493,7 +493,7 @@ bool SFTPClient::getFile(const std::string& path, io::BaseStream& output, int64_
       libssh2_session_last_error(ssh_session_, &err_msg, nullptr, 0);
       logger_->log_error("Failed to open remote file \"%s\" due to an underlying SSH error: %s", path.c_str(), err_msg);
     }
-    return false;
+    return std::nullopt;
   }
   const auto guard = gsl::finally([&file_handle]() {
     libssh2_sftp_close(file_handle);
@@ -507,7 +507,7 @@ bool SFTPClient::getFile(const std::string& path, io::BaseStream& output, int64_
     if (read_ret < 0) {
       last_error_.setSftpError(SFTPError::IoError);
       logger_->log_error("Failed to read remote file \"%s\"", path.c_str());
-      return false;
+      return std::nullopt;
     } else if (read_ret == 0) {
       logger_->log_trace("EOF while reading remote file \"%s\"", path.c_str());
       break;
@@ -520,7 +520,7 @@ bool SFTPClient::getFile(const std::string& path, io::BaseStream& output, int64_
       if (io::isError(write_ret)) {
         last_error_.setLibssh2Error(LIBSSH2_FX_OK);
         logger_->log_error("Failed to write output");
-        return false;
+        return std::nullopt;
       }
       remaining -= gsl::narrow<decltype(remaining)>(write_ret);
     }
@@ -529,13 +529,13 @@ bool SFTPClient::getFile(const std::string& path, io::BaseStream& output, int64_
   if (expected_size >= 0 && total_read != gsl::narrow<uint64_t>(expected_size)) {
     last_error_.setLibssh2Error(LIBSSH2_FX_OK);
     logger_->log_error("Remote file \"%s\" has unexpected size, expected: %ld, actual: %lu", path.c_str(), expected_size, total_read);
-    return false;
+    return std::nullopt;
   }
 
-  return true;
+  return total_read;
 }
 
-bool SFTPClient::putFile(const std::string& path, io::BaseStream& input, bool overwrite, int64_t expected_size /*= -1*/) {
+std::optional<uint64_t> SFTPClient::putFile(const std::string& path, io::InputStream& input, bool overwrite, int64_t expected_size /*= -1*/) {
   int flags = LIBSSH2_FXF_WRITE | LIBSSH2_FXF_CREAT | (overwrite ? LIBSSH2_FXF_TRUNC : LIBSSH2_FXF_EXCL);
   logger_->log_trace("Opening remote file \"%s\"", path.c_str());
   LIBSSH2_SFTP_HANDLE *file_handle = libssh2_sftp_open(sftp_session_, path.c_str(), flags, 0644);
@@ -559,7 +559,7 @@ bool SFTPClient::putFile(const std::string& path, io::BaseStream& input, bool ov
 
   /* If they just want a zero byte file, we are done */
   if (expected_size == 0) {
-    return true;
+    return 0;
   }
 
   const size_t buf_size = expected_size < 0 ? MAX_BUFFER_SIZE : std::min(gsl::narrow<size_t>(expected_size), MAX_BUFFER_SIZE);
@@ -570,7 +570,7 @@ bool SFTPClient::putFile(const std::string& path, io::BaseStream& input, bool ov
     if (io::isError(read_ret)) {
       last_error_.setLibssh2Error(LIBSSH2_FX_OK);
       logger_->log_error("Error while reading input");
-      return false;
+      return std::nullopt;
     } else if (read_ret == 0) {
       logger_->log_trace("EOF while reading input");
       break;
@@ -583,7 +583,7 @@ bool SFTPClient::putFile(const std::string& path, io::BaseStream& input, bool ov
       if (write_ret < 0) {
         last_error_.setSftpError(SFTPError::IoError);
         logger_->log_error("Failed to write remote file \"%s\"", path.c_str());
-        return false;
+        return std::nullopt;
       }
       logger_->log_trace("Wrote %d bytes to remote file \"%s\"", write_ret, path.c_str());
       remaining -= gsl::narrow<size_t>(write_ret);
@@ -593,10 +593,10 @@ bool SFTPClient::putFile(const std::string& path, io::BaseStream& input, bool ov
   if (expected_size >= 0 && total_read != gsl::narrow<size_t>(expected_size)) {
     last_error_.setLibssh2Error(LIBSSH2_FX_OK);
     logger_->log_error("Input has unexpected size, expected: %ld, actual: %lu", path.c_str(), expected_size, total_read);
-    return false;
+    return std::nullopt;
   }
 
-  return true;
+  return total_read;
 }
 
 bool SFTPClient::rename(const std::string& source_path, const std::string& target_path, bool overwrite) {
diff --git a/extensions/sftp/client/SFTPClient.h b/extensions/sftp/client/SFTPClient.h
index 9cc48b0ea..8cdf77eee 100644
--- a/extensions/sftp/client/SFTPClient.h
+++ b/extensions/sftp/client/SFTPClient.h
@@ -25,12 +25,12 @@
 #include <string>
 #include <tuple>
 #include <memory>
+#include <optional>
 
 #include "Exception.h"
 #include "utils/Enum.h"
 #include "utils/BaseHTTPClient.h"
 #include "core/logging/Logger.h"
-#include "io/BaseStream.h"
 
 namespace org::apache::nifi::minifi::utils {
 
@@ -117,9 +117,9 @@ class SFTPClient {
    */
   [[nodiscard]] SFTPError getLastError() const;
 
-  bool getFile(const std::string& path, io::BaseStream& output, int64_t expected_size = -1);
+  std::optional<uint64_t> getFile(const std::string& path, io::OutputStream& output, int64_t expected_size = -1);
 
-  bool putFile(const std::string& path, io::BaseStream& input, bool overwrite, int64_t expected_size = -1);
+  std::optional<uint64_t> putFile(const std::string& path, io::InputStream& input, bool overwrite, int64_t expected_size = -1);
 
   bool rename(const std::string& source_path, const std::string& target_path, bool overwrite);
 
diff --git a/extensions/sftp/processors/FetchSFTP.cpp b/extensions/sftp/processors/FetchSFTP.cpp
index b65a4fffe..c1661249a 100644
--- a/extensions/sftp/processors/FetchSFTP.cpp
+++ b/extensions/sftp/processors/FetchSFTP.cpp
@@ -122,11 +122,12 @@ void FetchSFTP::onTrigger(const std::shared_ptr<core::ProcessContext> &context,
 
   /* Download file */
   try {
-    session->write(flow_file, [&remote_file, &client](const std::shared_ptr<io::BaseStream>& stream) -> int64_t {
-      if (!client->getFile(remote_file, *stream)) {
+    session->write(flow_file, [&remote_file, &client](const std::shared_ptr<io::OutputStream>& stream) -> int64_t {
+      auto bytes_read = client->getFile(remote_file, *stream);
+      if (!bytes_read) {
         throw utils::SFTPException{client->getLastError()};
       }
-      return gsl::narrow<int64_t>(stream->size());
+      return gsl::narrow<int64_t>(*bytes_read);
     });
   } catch (const utils::SFTPException& ex) {
     logger_->log_debug(ex.what());
diff --git a/extensions/sftp/processors/PutSFTP.cpp b/extensions/sftp/processors/PutSFTP.cpp
index e01eaec81..f47d29e99 100644
--- a/extensions/sftp/processors/PutSFTP.cpp
+++ b/extensions/sftp/processors/PutSFTP.cpp
@@ -290,7 +290,7 @@ bool PutSFTP::processOne(const std::shared_ptr<core::ProcessContext> &context, c
   logger_->log_debug("The target path is %s, final target path is %s", target_path.c_str(), final_target_path.c_str());
 
   try {
-    session->read(flow_file, [&client, &target_path, this](const std::shared_ptr<io::BaseStream>& stream) {
+    session->read(flow_file, [&client, &target_path, this](const std::shared_ptr<io::InputStream>& stream) {
       if (!client->putFile(target_path,
           *stream,
           conflict_resolution_ == CONFLICT_RESOLUTION_REPLACE /*overwrite*/,
diff --git a/extensions/standard-processors/processors/ExecuteProcess.h b/extensions/standard-processors/processors/ExecuteProcess.h
index 8c735e849..5556c3181 100644
--- a/extensions/standard-processors/processors/ExecuteProcess.h
+++ b/extensions/standard-processors/processors/ExecuteProcess.h
@@ -40,7 +40,6 @@
 #include "core/Processor.h"
 #include "core/ProcessSession.h"
 #include "FlowFileRecord.h"
-#include "io/BaseStream.h"
 #include "utils/gsl.h"
 
 namespace org {
diff --git a/extensions/standard-processors/processors/ExtractText.cpp b/extensions/standard-processors/processors/ExtractText.cpp
index 68450057b..bb5d8a46b 100644
--- a/extensions/standard-processors/processors/ExtractText.cpp
+++ b/extensions/standard-processors/processors/ExtractText.cpp
@@ -94,7 +94,7 @@ void ExtractText::onTrigger(core::ProcessContext *context, core::ProcessSession
   session->transfer(flowFile, Success);
 }
 
-int64_t ExtractText::ReadCallback::operator()(const std::shared_ptr<io::BaseStream>& stream) const {
+int64_t ExtractText::ReadCallback::operator()(const std::shared_ptr<io::InputStream>& stream) const {
   size_t read_size = 0;
   bool regex_mode;
   size_t size_limit = flowFile_->getSize();
diff --git a/extensions/standard-processors/processors/ExtractText.h b/extensions/standard-processors/processors/ExtractText.h
index 0d06a3c53..415093008 100644
--- a/extensions/standard-processors/processors/ExtractText.h
+++ b/extensions/standard-processors/processors/ExtractText.h
@@ -75,7 +75,7 @@ class ExtractText : public core::Processor {
   class ReadCallback {
    public:
     ReadCallback(std::shared_ptr<core::FlowFile> flowFile, core::ProcessContext *ct, std::shared_ptr<core::logging::Logger> lgr);
-    int64_t operator()(const std::shared_ptr<io::BaseStream>& stream) const;
+    int64_t operator()(const std::shared_ptr<io::InputStream>& stream) const;
 
    private:
     std::shared_ptr<core::FlowFile> flowFile_;
diff --git a/extensions/standard-processors/processors/HashContent.cpp b/extensions/standard-processors/processors/HashContent.cpp
index 2f7379f5f..3c6ecfb79 100644
--- a/extensions/standard-processors/processors/HashContent.cpp
+++ b/extensions/standard-processors/processors/HashContent.cpp
@@ -32,11 +32,7 @@
 #include "core/FlowFile.h"
 #include "core/Resource.h"
 
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace processors {
+namespace org::apache::nifi::minifi::processors {
 
 const core::Property HashContent::HashAttribute("Hash Attribute", "Attribute to store checksum to", "Checksum");
 const core::Property HashContent::HashAlgorithm("Hash Algorithm", "Name of the algorithm used to generate checksum", "SHA256");
@@ -83,7 +79,7 @@ void HashContent::onTrigger(core::ProcessContext *, core::ProcessSession *sessio
   }
 
   logger_->log_trace("attempting read");
-  session->read(flowFile, [&flowFile, this](const std::shared_ptr<io::BaseStream>& stream) {
+  session->read(flowFile, [&flowFile, this](const std::shared_ptr<io::InputStream>& stream) {
     // This throws in case algo is not found, but that's fine
     logger_->log_trace("Searching for %s", algoName_);
     auto algo = HashAlgos.at(algoName_);
@@ -99,10 +95,6 @@ void HashContent::onTrigger(core::ProcessContext *, core::ProcessSession *sessio
 
 REGISTER_RESOURCE(HashContent, Processor);
 
-}  // namespace processors
-}  // namespace minifi
-}  // namespace nifi
-}  // namespace apache
-}  // namespace org
+}  // namespace org::apache::nifi::minifi::processors
 
 #endif  // OPENSSL_SUPPORT
diff --git a/extensions/standard-processors/processors/HashContent.h b/extensions/standard-processors/processors/HashContent.h
index fa1fffe84..a0c877c90 100644
--- a/extensions/standard-processors/processors/HashContent.h
+++ b/extensions/standard-processors/processors/HashContent.h
@@ -17,8 +17,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-#ifndef EXTENSIONS_STANDARD_PROCESSORS_PROCESSORS_HASHCONTENT_H_
-#define EXTENSIONS_STANDARD_PROCESSORS_PROCESSORS_HASHCONTENT_H_
+#pragma once
 
 #ifdef OPENSSL_SUPPORT
 
@@ -37,7 +36,6 @@
 #include "FlowFileRecord.h"
 #include "core/Processor.h"
 #include "core/ProcessSession.h"
-#include "io/BaseStream.h"
 #include "utils/StringUtils.h"
 #include "utils/Export.h"
 
@@ -47,7 +45,7 @@ using HashReturnType = std::pair<std::string, int64_t>;
 namespace { // NOLINT
 #define HASH_BUFFER_SIZE 16384
 
-  HashReturnType MD5Hash(const std::shared_ptr<org::apache::nifi::minifi::io::BaseStream>& stream) {
+  HashReturnType MD5Hash(const std::shared_ptr<org::apache::nifi::minifi::io::InputStream>& stream) {
     HashReturnType ret_val;
     ret_val.second = 0;
     std::array<std::byte, HASH_BUFFER_SIZE> buffer{};
@@ -71,7 +69,7 @@ namespace { // NOLINT
     return ret_val;
   }
 
-  HashReturnType SHA1Hash(const std::shared_ptr<org::apache::nifi::minifi::io::BaseStream>& stream) {
+  HashReturnType SHA1Hash(const std::shared_ptr<org::apache::nifi::minifi::io::InputStream>& stream) {
     HashReturnType ret_val;
     ret_val.second = 0;
     std::array<std::byte, HASH_BUFFER_SIZE> buffer{};
@@ -95,7 +93,7 @@ namespace { // NOLINT
     return ret_val;
   }
 
-  HashReturnType SHA256Hash(const std::shared_ptr<org::apache::nifi::minifi::io::BaseStream>& stream) {
+  HashReturnType SHA256Hash(const std::shared_ptr<org::apache::nifi::minifi::io::InputStream>& stream) {
     HashReturnType ret_val;
     ret_val.second = 0;
     std::array<std::byte, HASH_BUFFER_SIZE> buffer{};
@@ -123,7 +121,7 @@ namespace { // NOLINT
 
 namespace org::apache::nifi::minifi::processors {
 
-static const std::map<std::string, const std::function<HashReturnType(const std::shared_ptr<io::BaseStream>&)>> HashAlgos =
+static const std::map<std::string, const std::function<HashReturnType(const std::shared_ptr<io::InputStream>&)>> HashAlgos =
   { {"MD5",  MD5Hash}, {"SHA1", SHA1Hash}, {"SHA256", SHA256Hash} };
 
 class HashContent : public core::Processor {
@@ -171,5 +169,3 @@ class HashContent : public core::Processor {
 }  // namespace org::apache::nifi::minifi::processors
 
 #endif  // OPENSSL_SUPPORT
-
-#endif  // EXTENSIONS_STANDARD_PROCESSORS_PROCESSORS_HASHCONTENT_H_
diff --git a/extensions/standard-processors/processors/PutFile.cpp b/extensions/standard-processors/processors/PutFile.cpp
index caa58d235..baddbb64d 100644
--- a/extensions/standard-processors/processors/PutFile.cpp
+++ b/extensions/standard-processors/processors/PutFile.cpp
@@ -185,7 +185,7 @@ std::string PutFile::tmpWritePath(const std::string &filename, const std::string
   return tmpFile;
 }
 
-bool PutFile::putFile(core::ProcessSession *session, std::shared_ptr<core::FlowFile> flowFile, const std::string &tmpFile, const std::string &destFile, const std::string &destDir) {
+bool PutFile::putFile(core::ProcessSession *session, const std::shared_ptr<core::FlowFile>& flowFile, const std::string &tmpFile, const std::string &destFile, const std::string &destDir) {
   if (!utils::file::exists(destDir) && try_mkdirs_) {
     // Attempt to create directories in file's path
     std::stringstream dir_path_stream;
@@ -296,7 +296,7 @@ PutFile::ReadCallback::ReadCallback(std::string tmp_file, std::string dest_file)
 }
 
 // Copy the entire file contents to the temporary file
-int64_t PutFile::ReadCallback::operator()(const std::shared_ptr<io::BaseStream>& stream) {
+int64_t PutFile::ReadCallback::operator()(const std::shared_ptr<io::InputStream>& stream) {
   // Copy file contents into tmp file
   write_succeeded_ = false;
   size_t size = 0;
diff --git a/extensions/standard-processors/processors/PutFile.h b/extensions/standard-processors/processors/PutFile.h
index ae295dcd4..d2b2e1c11 100644
--- a/extensions/standard-processors/processors/PutFile.h
+++ b/extensions/standard-processors/processors/PutFile.h
@@ -17,8 +17,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-#ifndef EXTENSIONS_STANDARD_PROCESSORS_PROCESSORS_PUTFILE_H_
-#define EXTENSIONS_STANDARD_PROCESSORS_PROCESSORS_PUTFILE_H_
+#pragma once
 
 #include <memory>
 #include <string>
@@ -32,11 +31,7 @@
 #include "utils/Id.h"
 #include "utils/Export.h"
 
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace processors {
+namespace org::apache::nifi::minifi::processors {
 
 class PutFile : public core::Processor {
  public:
@@ -92,7 +87,7 @@ class PutFile : public core::Processor {
    public:
     ReadCallback(std::string tmp_file, std::string dest_file);
     ~ReadCallback();
-    int64_t operator()(const std::shared_ptr<io::BaseStream>& stream);
+    int64_t operator()(const std::shared_ptr<io::InputStream>& stream);
     bool commit();
 
    private:
@@ -116,7 +111,7 @@ class PutFile : public core::Processor {
   int64_t max_dest_files_ = -1;
 
   bool putFile(core::ProcessSession *session,
-               std::shared_ptr<core::FlowFile> flowFile,
+               const std::shared_ptr<core::FlowFile>& flowFile,
                const std::string &tmpFile,
                const std::string &destFile,
                const std::string &destDir);
@@ -140,10 +135,4 @@ class PutFile : public core::Processor {
 #endif
 };
 
-}  // namespace processors
-}  // namespace minifi
-}  // namespace nifi
-}  // namespace apache
-}  // namespace org
-
-#endif  // EXTENSIONS_STANDARD_PROCESSORS_PROCESSORS_PUTFILE_H_
+}  // namespace org::apache::nifi::minifi::processors
diff --git a/extensions/standard-processors/processors/RouteText.cpp b/extensions/standard-processors/processors/RouteText.cpp
index 66921d0be..0f1592cb0 100644
--- a/extensions/standard-processors/processors/RouteText.cpp
+++ b/extensions/standard-processors/processors/RouteText.cpp
@@ -130,7 +130,7 @@ class RouteText::ReadCallback {
   ReadCallback(Segmentation segmentation, size_t file_size, Fn&& fn)
     : segmentation_(segmentation), file_size_(file_size), fn_(std::move(fn)) {}
 
-  int64_t operator()(const std::shared_ptr<io::BaseStream>& stream) const {
+  int64_t operator()(const std::shared_ptr<io::InputStream>& stream) const {
     std::vector<std::byte> buffer;
     buffer.resize(file_size_);
     size_t ret = stream->read(buffer);
diff --git a/extensions/standard-processors/processors/TailFile.cpp b/extensions/standard-processors/processors/TailFile.cpp
index 6c614fd9c..7a3f20194 100644
--- a/extensions/standard-processors/processors/TailFile.cpp
+++ b/extensions/standard-processors/processors/TailFile.cpp
@@ -218,8 +218,8 @@ class FileReaderCallback {
     openFile(file_name, offset, input_stream_, logger_);
   }
 
-  int64_t operator()(const std::shared_ptr<io::BaseStream>& output_stream) {
-    io::CRCStream<io::BaseStream> crc_stream{gsl::make_not_null(output_stream.get()), checksum_};
+  int64_t operator()(const std::shared_ptr<io::OutputStream>& output_stream) {
+    io::CRCStream<io::OutputStream> crc_stream{gsl::make_not_null(output_stream.get()), checksum_};
 
     uint64_t num_bytes_written = 0;
     bool found_delimiter = false;
@@ -291,10 +291,10 @@ class WholeFileReaderCallback {
     return checksum_;
   }
 
-  int64_t operator()(const std::shared_ptr<io::BaseStream>& output_stream) {
+  int64_t operator()(const std::shared_ptr<io::OutputStream>& output_stream) {
     std::array<char, BUFFER_SIZE> buffer;
 
-    io::CRCStream<io::BaseStream> crc_stream{gsl::make_not_null(output_stream.get()), checksum_};
+    io::CRCStream<io::OutputStream> crc_stream{gsl::make_not_null(output_stream.get()), checksum_};
 
     uint64_t num_bytes_written = 0;
 
diff --git a/extensions/tensorflow/TFApplyGraph.cpp b/extensions/tensorflow/TFApplyGraph.cpp
index 62be2cb65..77e70a6a0 100644
--- a/extensions/tensorflow/TFApplyGraph.cpp
+++ b/extensions/tensorflow/TFApplyGraph.cpp
@@ -182,7 +182,7 @@ void TFApplyGraph::onTrigger(const std::shared_ptr<core::ProcessContext>& /*cont
   }
 }
 
-int64_t TFApplyGraph::GraphReadCallback::process(const std::shared_ptr<io::BaseStream>& stream) {
+int64_t TFApplyGraph::GraphReadCallback::process(const std::shared_ptr<io::InputStream>& stream) {
   std::string graph_proto_buf;
   graph_proto_buf.resize(stream->size());
   const auto num_read = stream->read(reinterpret_cast<uint8_t *>(&graph_proto_buf[0]), stream->size());
@@ -193,7 +193,7 @@ int64_t TFApplyGraph::GraphReadCallback::process(const std::shared_ptr<io::BaseS
   return gsl::narrow<int64_t>(num_read);
 }
 
-int64_t TFApplyGraph::TensorReadCallback::process(const std::shared_ptr<io::BaseStream>& stream) {
+int64_t TFApplyGraph::TensorReadCallback::process(const std::shared_ptr<io::InputStream>& stream) {
   std::string tensor_proto_buf;
   tensor_proto_buf.resize(stream->size());
   const auto num_read = stream->read(reinterpret_cast<uint8_t *>(&tensor_proto_buf[0]), stream->size());
@@ -204,7 +204,7 @@ int64_t TFApplyGraph::TensorReadCallback::process(const std::shared_ptr<io::Base
   return gsl::narrow<int64_t>(num_read);
 }
 
-int64_t TFApplyGraph::TensorWriteCallback::process(const std::shared_ptr<io::BaseStream>& stream) {
+int64_t TFApplyGraph::TensorWriteCallback::process(const std::shared_ptr<io::OutputStream>& stream) {
   auto tensor_proto_buf = tensor_proto_->SerializeAsString();
   auto num_wrote = stream->write(reinterpret_cast<uint8_t *>(&tensor_proto_buf[0]),
                                      static_cast<int>(tensor_proto_buf.size()));
diff --git a/extensions/tensorflow/TFApplyGraph.h b/extensions/tensorflow/TFApplyGraph.h
index 2a67997d5..edfd5516f 100644
--- a/extensions/tensorflow/TFApplyGraph.h
+++ b/extensions/tensorflow/TFApplyGraph.h
@@ -23,6 +23,8 @@
 #include <core/Processor.h>
 #include <tensorflow/core/public/session.h>
 #include <concurrentqueue.h>
+#include "io/InputStream.h"
+#include "io/OutputStream.h"
 
 namespace org::apache::nifi::minifi::processors {
 
@@ -83,7 +85,7 @@ class TFApplyGraph : public core::Processor {
         : graph_def_(std::move(graph_def)) {
     }
     ~GraphReadCallback() override = default;
-    int64_t process(const std::shared_ptr<io::BaseStream>& stream) override;
+    int64_t process(const std::shared_ptr<io::InputStream>& stream) override;
 
    private:
     std::shared_ptr<tensorflow::GraphDef> graph_def_;
@@ -95,7 +97,7 @@ class TFApplyGraph : public core::Processor {
         : tensor_proto_(std::move(tensor_proto)) {
     }
     ~TensorReadCallback() override = default;
-    int64_t process(const std::shared_ptr<io::BaseStream>& stream) override;
+    int64_t process(const std::shared_ptr<io::InputStream>& stream) override;
 
    private:
     std::shared_ptr<tensorflow::TensorProto> tensor_proto_;
@@ -107,7 +109,7 @@ class TFApplyGraph : public core::Processor {
         : tensor_proto_(std::move(tensor_proto)) {
     }
     ~TensorWriteCallback() override = default;
-    int64_t process(const std::shared_ptr<io::BaseStream>& stream) override;
+    int64_t process(const std::shared_ptr<io::OutputStream>& stream) override;
 
    private:
     std::shared_ptr<tensorflow::TensorProto> tensor_proto_;
diff --git a/extensions/tensorflow/TFConvertImageToTensor.cpp b/extensions/tensorflow/TFConvertImageToTensor.cpp
index d1bdc5e8f..2e329d928 100644
--- a/extensions/tensorflow/TFConvertImageToTensor.cpp
+++ b/extensions/tensorflow/TFConvertImageToTensor.cpp
@@ -304,7 +304,7 @@ void TFConvertImageToTensor::onTrigger(const std::shared_ptr<core::ProcessContex
   }
 }
 
-int64_t TFConvertImageToTensor::ImageReadCallback::process(const std::shared_ptr<io::BaseStream>& stream) {
+int64_t TFConvertImageToTensor::ImageReadCallback::process(const std::shared_ptr<io::InputStream>& stream) {
   if (tensor_->AllocatedBytes() < stream->size()) {
     throw std::runtime_error("Tensor is not big enough to hold FlowFile bytes");
   }
@@ -315,7 +315,7 @@ int64_t TFConvertImageToTensor::ImageReadCallback::process(const std::shared_ptr
   return num_read;
 }
 
-int64_t TFConvertImageToTensor::TensorWriteCallback::process(const std::shared_ptr<io::BaseStream>& stream) {
+int64_t TFConvertImageToTensor::TensorWriteCallback::process(const std::shared_ptr<io::OutputStream>& stream) {
   auto tensor_proto_buf = tensor_proto_->SerializeAsString();
   auto num_wrote = stream->write(reinterpret_cast<uint8_t *>(&tensor_proto_buf[0]),
                                      static_cast<int>(tensor_proto_buf.size()));
diff --git a/extensions/tensorflow/TFConvertImageToTensor.h b/extensions/tensorflow/TFConvertImageToTensor.h
index ddb33690f..6c71bf180 100644
--- a/extensions/tensorflow/TFConvertImageToTensor.h
+++ b/extensions/tensorflow/TFConvertImageToTensor.h
@@ -23,6 +23,8 @@
 #include <core/Processor.h>
 #include <tensorflow/core/public/session.h>
 #include <concurrentqueue.h>
+#include "io/InputStream.h"
+#include "io/OutputStream.h"
 
 namespace org::apache::nifi::minifi::processors {
 
@@ -89,7 +91,7 @@ class TFConvertImageToTensor : public core::Processor {
         : tensor_(tensor) {
     }
     ~ImageReadCallback() override = default;
-    int64_t process(const std::shared_ptr<io::BaseStream>& stream) override;
+    int64_t process(const std::shared_ptr<io::InputStream>& stream) override;
 
    private:
     tensorflow::Tensor *tensor_;
@@ -101,7 +103,7 @@ class TFConvertImageToTensor : public core::Processor {
         : tensor_proto_(std::move(tensor_proto)) {
     }
     ~TensorWriteCallback() override = default;
-    int64_t process(const std::shared_ptr<io::BaseStream>& stream) override;
+    int64_t process(const std::shared_ptr<io::OutputStream>& stream) override;
 
    private:
     std::shared_ptr<tensorflow::TensorProto> tensor_proto_;
diff --git a/extensions/tensorflow/TFExtractTopLabels.cpp b/extensions/tensorflow/TFExtractTopLabels.cpp
index 3e26d6f83..285a08dc4 100644
--- a/extensions/tensorflow/TFExtractTopLabels.cpp
+++ b/extensions/tensorflow/TFExtractTopLabels.cpp
@@ -116,7 +116,7 @@ void TFExtractTopLabels::onTrigger(const std::shared_ptr<core::ProcessContext>&
   }
 }
 
-int64_t TFExtractTopLabels::LabelsReadCallback::process(const std::shared_ptr<io::BaseStream>& stream) {
+int64_t TFExtractTopLabels::LabelsReadCallback::process(const std::shared_ptr<io::InputStream>& stream) {
   size_t total_read = 0;
   std::string label;
   uint64_t max_label_len = 65536;
@@ -145,7 +145,7 @@ int64_t TFExtractTopLabels::LabelsReadCallback::process(const std::shared_ptr<io
   return gsl::narrow<int64_t>(total_read);
 }
 
-int64_t TFExtractTopLabels::TensorReadCallback::process(const std::shared_ptr<io::BaseStream>& stream) {
+int64_t TFExtractTopLabels::TensorReadCallback::process(const std::shared_ptr<io::OutputStream>& stream) {
   std::string tensor_proto_buf;
   tensor_proto_buf.resize(stream->size());
   const auto num_read = stream->read(reinterpret_cast<uint8_t *>(&tensor_proto_buf[0]), stream->size());
diff --git a/extensions/tensorflow/TFExtractTopLabels.h b/extensions/tensorflow/TFExtractTopLabels.h
index cb1446dcb..bd707752d 100644
--- a/extensions/tensorflow/TFExtractTopLabels.h
+++ b/extensions/tensorflow/TFExtractTopLabels.h
@@ -23,6 +23,8 @@
 #include <core/Processor.h>
 #include <tensorflow/core/public/session.h>
 #include <concurrentqueue.h>
+#include "io/InputStream.h"
+#include "io/OutputStream.h"
 
 namespace org::apache::nifi::minifi::processors {
 
@@ -63,7 +65,7 @@ class TFExtractTopLabels : public core::Processor {
         : labels_(std::move(labels)) {
     }
     ~LabelsReadCallback() override = default;
-    int64_t process(const std::shared_ptr<io::BaseStream>& stream) override;
+    int64_t process(const std::shared_ptr<io::InputStream>& stream) override;
 
    private:
     std::shared_ptr<std::vector<std::string>> labels_;
@@ -75,7 +77,7 @@ class TFExtractTopLabels : public core::Processor {
         : tensor_proto_(std::move(tensor_proto)) {
     }
     ~TensorReadCallback() override = default;
-    int64_t process(const std::shared_ptr<io::BaseStream>& stream) override;
+    int64_t process(const std::shared_ptr<io::OutputStream>& stream) override;
 
    private:
     std::shared_ptr<tensorflow::TensorProto> tensor_proto_;
diff --git a/extensions/usb-camera/GetUSBCamera.cpp b/extensions/usb-camera/GetUSBCamera.cpp
index 98e4204e0..72923fc7b 100644
--- a/extensions/usb-camera/GetUSBCamera.cpp
+++ b/extensions/usb-camera/GetUSBCamera.cpp
@@ -384,7 +384,7 @@ GetUSBCamera::PNGWriteCallback::PNGWriteCallback(std::shared_ptr<std::mutex> wri
       height_(height) {
 }
 
-int64_t GetUSBCamera::PNGWriteCallback::operator()(const std::shared_ptr<io::BaseStream>& stream) {
+int64_t GetUSBCamera::PNGWriteCallback::operator()(const std::shared_ptr<io::OutputStream>& stream) {
   std::lock_guard<std::mutex> lock(*png_write_mtx_);
   logger_->log_info("Writing %d bytes of raw capture data to PNG output", frame_->data_bytes);
   png_structp png = png_create_write_struct(PNG_LIBPNG_VER_STRING, nullptr, nullptr, nullptr);
diff --git a/extensions/usb-camera/GetUSBCamera.h b/extensions/usb-camera/GetUSBCamera.h
index 869529f1f..54a17d2c9 100644
--- a/extensions/usb-camera/GetUSBCamera.h
+++ b/extensions/usb-camera/GetUSBCamera.h
@@ -118,7 +118,7 @@ class GetUSBCamera : public core::Processor {
   class PNGWriteCallback {
    public:
     PNGWriteCallback(std::shared_ptr<std::mutex> write_mtx, uvc_frame_t *frame, uint32_t width, uint32_t height);
-    int64_t operator()(const std::shared_ptr<io::BaseStream>& stream);
+    int64_t operator()(const std::shared_ptr<io::OutputStream>& stream);
 
    private:
     std::shared_ptr<std::mutex> png_write_mtx_;
diff --git a/libminifi/include/FlowFileRecord.h b/libminifi/include/FlowFileRecord.h
index 210fa935e..141f3b4ba 100644
--- a/libminifi/include/FlowFileRecord.h
+++ b/libminifi/include/FlowFileRecord.h
@@ -31,7 +31,6 @@
 #include <fstream>
 #include <set>
 #include "core/ContentRepository.h"
-#include "io/BaseStream.h"
 #include "core/FlowFile.h"
 #include "utils/TimeUtil.h"
 #include "core/logging/LoggerFactory.h"
diff --git a/libminifi/include/c2/PayloadSerializer.h b/libminifi/include/c2/PayloadSerializer.h
index effd297e1..aa9dc8bbd 100644
--- a/libminifi/include/c2/PayloadSerializer.h
+++ b/libminifi/include/c2/PayloadSerializer.h
@@ -26,7 +26,8 @@
 
 #include "core/state/Value.h"
 #include "c2/C2Protocol.h"
-#include "io/BaseStream.h"
+#include "io/OutputStream.h"
+#include "io/InputStream.h"
 #include "io/BufferStream.h"
 #include "utils/gsl.h"
 
@@ -41,7 +42,7 @@ class PayloadSerializer {
   /**
    * Static function that serializes the value nodes
    */
-  static void serializeValueNode(state::response::ValueNode &value, std::shared_ptr<io::BaseStream> stream) {
+  static void serializeValueNode(state::response::ValueNode &value, std::shared_ptr<io::OutputStream> stream) {
     auto base_type = value.getValue();
     if (!base_type) {
       uint8_t type = 0;
@@ -75,7 +76,7 @@ class PayloadSerializer {
       stream->write(str);
     }
   }
-  static void serialize(uint16_t op, const C2Payload &payload, std::shared_ptr<io::BaseStream> stream) {
+  static void serialize(uint16_t op, const C2Payload &payload, std::shared_ptr<io::OutputStream> stream) {
     uint8_t st;
     uint32_t size = gsl::narrow<uint32_t>(payload.getNestedPayloads().size());
     stream->write(size);
@@ -143,8 +144,8 @@ class PayloadSerializer {
     }
     return op;
   }
-  static std::shared_ptr<io::BaseStream> serialize(uint16_t version, const C2Payload &payload) {
-    std::shared_ptr<io::BaseStream> stream = std::make_shared<io::BufferStream>();
+  static std::shared_ptr<io::OutputStream> serialize(uint16_t version, const C2Payload &payload) {
+    std::shared_ptr<io::OutputStream> stream = std::make_shared<io::BufferStream>();
     uint16_t op = 0;
     uint8_t st = 0;
     op = opToInt(payload.getOperation());
@@ -176,7 +177,7 @@ class PayloadSerializer {
     return stream;
   }
 
-  static state::response::ValueNode deserializeValueNode(io::BaseStream *stream) {
+  static state::response::ValueNode deserializeValueNode(io::InputStream *stream) {
     uint8_t type = 0;
     stream->read(type);
     state::response::ValueNode node;
@@ -224,7 +225,7 @@ class PayloadSerializer {
    * @param identifier for this payload
    * @param stream base stream in which we will serialize the parent payload.
    */
-  static bool deserializePayload(C2Payload &parent, Operation operation, std::string identifier, io::BaseStream *stream) {
+  static bool deserializePayload(C2Payload &parent, Operation operation, std::string identifier, io::InputStream *stream) {
     uint32_t payloads = 0;
     stream->read(payloads);
     uint8_t op{}, st{};
diff --git a/libminifi/include/core/ContentRepository.h b/libminifi/include/core/ContentRepository.h
index e109eb97a..e963d99f5 100644
--- a/libminifi/include/core/ContentRepository.h
+++ b/libminifi/include/core/ContentRepository.h
@@ -25,7 +25,6 @@
 #include "properties/Configure.h"
 #include "ResourceClaim.h"
 #include "io/BufferStream.h"
-#include "io/BaseStream.h"
 #include "StreamManager.h"
 #include "core/Connectable.h"
 #include "ContentSession.h"
diff --git a/libminifi/include/core/ProcessSession.h b/libminifi/include/core/ProcessSession.h
index 29054e008..794d64997 100644
--- a/libminifi/include/core/ProcessSession.h
+++ b/libminifi/include/core/ProcessSession.h
@@ -82,11 +82,11 @@ class ProcessSession : public ReferenceContainer {
   // Clone a new UUID FlowFile from parent for attributes and sub set of parent content resource claim
   std::shared_ptr<core::FlowFile> clone(const std::shared_ptr<core::FlowFile> &parent, int64_t offset, int64_t size);
   // Transfer the FlowFile to the relationship
-  virtual void transfer(const std::shared_ptr<core::FlowFile> &flow, Relationship relationship);
+  virtual void transfer(const std::shared_ptr<core::FlowFile> &flow, const Relationship& relationship);
   // Put Attribute
-  void putAttribute(const std::shared_ptr<core::FlowFile> &flow, std::string key, std::string value);
+  void putAttribute(const std::shared_ptr<core::FlowFile> &flow, const std::string& key, const std::string& value);
   // Remove Attribute
-  void removeAttribute(const std::shared_ptr<core::FlowFile> &flow, std::string key);
+  void removeAttribute(const std::shared_ptr<core::FlowFile> &flow, const std::string& key);
   // Remove Flow File
   void remove(const std::shared_ptr<core::FlowFile> &flow);
   // Execute the given read callback against the content
diff --git a/libminifi/include/core/ProcessSessionReadCallback.h b/libminifi/include/core/ProcessSessionReadCallback.h
index a89003b90..e4bf7d74d 100644
--- a/libminifi/include/core/ProcessSessionReadCallback.h
+++ b/libminifi/include/core/ProcessSessionReadCallback.h
@@ -17,27 +17,21 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-#ifndef LIBMINIFI_INCLUDE_CORE_PROCESSSESSIONREADCALLBACK_H_
-#define LIBMINIFI_INCLUDE_CORE_PROCESSSESSIONREADCALLBACK_H_
+#pragma once
 
 #include <memory>
 #include <string>
 
 #include "core/logging/LoggerFactory.h"
-#include "io/BaseStream.h"
 #include "FlowFileRecord.h"
+#include "io/InputStream.h"
 
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace core {
+namespace org::apache::nifi::minifi::core {
 class ProcessSessionReadCallback {
  public:
-  ProcessSessionReadCallback(const std::string &tmpFile, const std::string &destFile,
-      std::shared_ptr<logging::Logger> logger);
+  ProcessSessionReadCallback(const std::string &tmpFile, std::string destFile, std::shared_ptr<logging::Logger> logger);
   ~ProcessSessionReadCallback();
-  int64_t operator()(const std::shared_ptr<io::BaseStream>& stream);
+  int64_t operator()(const std::shared_ptr<io::InputStream>& stream);
   bool commit();
 
  private:
@@ -47,9 +41,4 @@ class ProcessSessionReadCallback {
   std::string _tmpFile;
   std::string _destFile;
 };
-}  // namespace core
-}  // namespace minifi
-}  // namespace nifi
-}  // namespace apache
-}  // namespace org
-#endif  // LIBMINIFI_INCLUDE_CORE_PROCESSSESSIONREADCALLBACK_H_
+}  // namespace org::apache::nifi::minifi::core
diff --git a/libminifi/include/io/BaseStream.h b/libminifi/include/io/BaseStream.h
index bff331f1e..0c7b344a9 100644
--- a/libminifi/include/io/BaseStream.h
+++ b/libminifi/include/io/BaseStream.h
@@ -26,11 +26,7 @@
 #include "InputStream.h"
 #include "OutputStream.h"
 
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace io {
+namespace org::apache::nifi::minifi::io {
 
 /**
  * Base Stream is the base of a composable stream architecture.
@@ -42,8 +38,4 @@ namespace io {
  */
 class BaseStream : public InputStream, public OutputStream {};
 
-}  // namespace io
-}  // namespace minifi
-}  // namespace nifi
-}  // namespace apache
-}  // namespace org
+}  // namespace org::apache::nifi::minifi::io
diff --git a/libminifi/include/io/CRCStream.h b/libminifi/include/io/CRCStream.h
index 9afd66da0..606f69275 100644
--- a/libminifi/include/io/CRCStream.h
+++ b/libminifi/include/io/CRCStream.h
@@ -33,7 +33,8 @@
 #include <arpa/inet.h>
 
 #endif
-#include "BaseStream.h"
+#include "InputStream.h"
+#include "OutputStream.h"
 #include "Exception.h"
 
 namespace org {
diff --git a/libminifi/include/io/StreamCallback.h b/libminifi/include/io/StreamCallback.h
index 0116ea746..6d4e6c00a 100644
--- a/libminifi/include/io/StreamCallback.h
+++ b/libminifi/include/io/StreamCallback.h
@@ -21,12 +21,13 @@
 
 namespace org::apache::nifi::minifi::io {
 
-class BaseStream;
+class InputStream;
+class OutputStream;
 
 // FlowFile IO Callback functions for input and output
 // throw exception for error
-using InputStreamCallback = std::function<int64_t(const std::shared_ptr<BaseStream>& input_stream)>;
-using OutputStreamCallback = std::function<int64_t(const std::shared_ptr<BaseStream>& output_stream)>;
-using InputOutputStreamCallback = std::function<int64_t(const std::shared_ptr<BaseStream>& input_stream, const std::shared_ptr<BaseStream>& output_stream)>;
+using InputStreamCallback = std::function<int64_t(const std::shared_ptr<InputStream>& input_stream)>;
+using OutputStreamCallback = std::function<int64_t(const std::shared_ptr<OutputStream>& output_stream)>;
+using InputOutputStreamCallback = std::function<int64_t(const std::shared_ptr<InputStream>& input_stream, const std::shared_ptr<OutputStream>& output_stream)>;
 
 }  // namespace org::apache::nifi::minifi::io
diff --git a/libminifi/include/io/StreamPipe.h b/libminifi/include/io/StreamPipe.h
index 25364ec1a..c24d04bfa 100644
--- a/libminifi/include/io/StreamPipe.h
+++ b/libminifi/include/io/StreamPipe.h
@@ -23,7 +23,8 @@
 #include <functional>
 #include <memory>
 #include <utility>
-#include "BaseStream.h"
+#include "InputStream.h"
+#include "OutputStream.h"
 #include "StreamCallback.h"
 
 namespace org::apache::nifi::minifi {
@@ -62,7 +63,7 @@ class InputStreamPipe {
  public:
   explicit InputStreamPipe(io::OutputStream& output) : output_(&output) {}
 
-  int64_t operator()(const std::shared_ptr<io::BaseStream>& stream) const {
+  int64_t operator()(const std::shared_ptr<io::InputStream>& stream) const {
     return internal::pipe(*stream, *output_);
   }
 
@@ -74,7 +75,7 @@ class OutputStreamPipe {
  public:
   explicit OutputStreamPipe(io::InputStream& input) : input_(&input) {}
 
-  int64_t operator()(const std::shared_ptr<io::BaseStream>& stream) const {
+  int64_t operator()(const std::shared_ptr<io::OutputStream>& stream) const {
     return internal::pipe(*input_, *stream);
   }
 
diff --git a/libminifi/include/io/StreamSlice.h b/libminifi/include/io/StreamSlice.h
index aedbd708f..bf53d3a10 100644
--- a/libminifi/include/io/StreamSlice.h
+++ b/libminifi/include/io/StreamSlice.h
@@ -20,7 +20,8 @@
 #include <algorithm>
 #include <memory>
 
-#include "BaseStream.h"
+#include "StreamCallback.h"
+#include "InputStream.h"
 
 namespace org::apache::nifi::minifi::io {
 
@@ -28,17 +29,14 @@ namespace org::apache::nifi::minifi::io {
  * A wrapped Base Stream with configurable offset and size
  * It hides the original (bigger stream) and acts like the stream starts and ends at the configured offset/size
  */
-class StreamSlice : public BaseStream {  // TODO(MINIFICPP-1648) This should be an InputStreamCallback, because writing to Slice is not supported
+class StreamSlice : public InputStream {
  public:
-  StreamSlice(std::shared_ptr<io::BaseStream>& stream, size_t offset, size_t size);
+  StreamSlice(std::shared_ptr<io::InputStream>& stream, size_t offset, size_t size);
 
   // from InputStream
   size_t size() const override { return slice_size_; }
   size_t read(gsl::span<std::byte> out_buffer) override;
 
-  // from OutputStream
-  size_t write(const uint8_t*, size_t) override { throw std::runtime_error("write is not supported in StreamSlice"); }
-
   // from Stream
   void close() override { stream_->close(); }
   int initialize() override { return stream_->initialize(); }
@@ -48,7 +46,7 @@ class StreamSlice : public BaseStream {  // TODO(MINIFICPP-1648) This should be
   [[nodiscard]] gsl::span<const std::byte> getBuffer() const override;
 
  private:
-  const std::shared_ptr<io::BaseStream>& stream_;
+  const std::shared_ptr<io::InputStream>& stream_;
   size_t slice_offset_;
   size_t slice_size_;
 };
diff --git a/libminifi/include/io/ZlibStream.h b/libminifi/include/io/ZlibStream.h
index 3c1a2d8c9..33097371d 100644
--- a/libminifi/include/io/ZlibStream.h
+++ b/libminifi/include/io/ZlibStream.h
@@ -25,7 +25,7 @@
 #include <memory>
 #include <vector>
 
-#include "BaseStream.h"
+#include "OutputStream.h"
 #include "core/logging/Logger.h"
 #include "utils/gsl.h"
 
diff --git a/libminifi/include/sitetosite/SiteToSiteClient.h b/libminifi/include/sitetosite/SiteToSiteClient.h
index be1b3b0ab..8d57558f1 100644
--- a/libminifi/include/sitetosite/SiteToSiteClient.h
+++ b/libminifi/include/sitetosite/SiteToSiteClient.h
@@ -16,8 +16,7 @@
  * limitations under the License.
  */
 
-#ifndef LIBMINIFI_INCLUDE_SITETOSITE_SITETOSITECLIENT_H_
-#define LIBMINIFI_INCLUDE_SITETOSITE_SITETOSITECLIENT_H_
+#pragma once
 
 #include <algorithm>
 #include <array>
@@ -34,11 +33,7 @@
 #include "core/Connectable.h"
 #include "utils/gsl.h"
 
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace sitetosite {
+namespace org::apache::nifi::minifi::sitetosite {
 
 /**
  * Represents a piece of data that is to be sent to or that was received from a
@@ -219,7 +214,7 @@ class SiteToSiteClient : public core::Connectable {
   // read Respond
   virtual int readResponse(const std::shared_ptr<Transaction> &transaction, RespondCode &code, std::string &message);
   // write respond
-  virtual int writeResponse(const std::shared_ptr<Transaction> &transaction, RespondCode code, std::string message);
+  virtual int writeResponse(const std::shared_ptr<Transaction> &transaction, RespondCode code, const std::string& message);
   // getRespondCodeContext
   virtual RespondCodeContext *getRespondCodeContext(RespondCode code) {
     for (auto & i : SiteToSiteRequest::respondCodeContext) {
@@ -266,10 +261,4 @@ class SiteToSiteClient : public core::Connectable {
   std::shared_ptr<core::logging::Logger> logger_{core::logging::LoggerFactory<SiteToSiteClient>::getLogger()};
 };
 
-}  // namespace sitetosite
-}  // namespace minifi
-}  // namespace nifi
-}  // namespace apache
-}  // namespace org
-
-#endif  // LIBMINIFI_INCLUDE_SITETOSITE_SITETOSITECLIENT_H_
+}  // namespace org::apache::nifi::minifi::sitetosite
diff --git a/libminifi/include/utils/ByteArrayCallback.h b/libminifi/include/utils/ByteArrayCallback.h
index 3f68fa878..9d1fb4c85 100644
--- a/libminifi/include/utils/ByteArrayCallback.h
+++ b/libminifi/include/utils/ByteArrayCallback.h
@@ -14,8 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-#ifndef LIBMINIFI_INCLUDE_UTILS_BYTEARRAYCALLBACK_H_
-#define LIBMINIFI_INCLUDE_UTILS_BYTEARRAYCALLBACK_H_
+#pragma once
 
 #include <memory>
 #include <string>
@@ -36,7 +35,7 @@ class ByteInputCallback {
  public:
   virtual ~ByteInputCallback() = default;
 
-  virtual int64_t operator()(const std::shared_ptr<io::BaseStream>& stream) {
+  virtual int64_t operator()(const std::shared_ptr<io::InputStream>& stream) {
     stream->seek(0);
 
     if (stream->size() > 0) {
@@ -101,7 +100,7 @@ class ByteOutputCallback {
     close();
   }
 
-  virtual int64_t operator()(const std::shared_ptr<io::BaseStream>& stream);
+  virtual int64_t operator()(const std::shared_ptr<io::InputStream>& stream);
 
   virtual std::vector<char> to_string();
 
@@ -141,17 +140,4 @@ class ByteOutputCallback {
   std::shared_ptr<core::logging::Logger> logger_;
 };
 
-class StreamOutputCallback : public ByteOutputCallback {
- public:
-  explicit StreamOutputCallback(size_t max_size, bool wait_on_read = false)
-      : ByteOutputCallback(max_size, wait_on_read) {
-  }
-
-  void write(char *data, size_t size) override;
-
-  int64_t operator()(const std::shared_ptr<io::BaseStream>& stream) override;
-};
-
 }  // namespace org::apache::nifi::minifi::utils
-
-#endif  // LIBMINIFI_INCLUDE_UTILS_BYTEARRAYCALLBACK_H_
diff --git a/libminifi/include/utils/FileReaderCallback.h b/libminifi/include/utils/FileReaderCallback.h
index e1030a547..3d8233716 100644
--- a/libminifi/include/utils/FileReaderCallback.h
+++ b/libminifi/include/utils/FileReaderCallback.h
@@ -24,11 +24,7 @@
 #include "io/StreamPipe.h"
 #include "core/logging/Logger.h"
 
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace utils {
+namespace org::apache::nifi::minifi::utils {
 
 /**
  * Simple callback to read a file, to be used with ProcessSession::write().
@@ -36,7 +32,7 @@ namespace utils {
 class FileReaderCallback {
  public:
   explicit FileReaderCallback(std::string file_name);
-  int64_t operator()(const std::shared_ptr<io::BaseStream>& output_stream) const;
+  int64_t operator()(const std::shared_ptr<io::OutputStream>& output_stream) const;
 
  private:
   std::string file_name_;
@@ -49,8 +45,4 @@ class FileReaderCallbackIOError : public std::runtime_error {
   int error_code;
 };
 
-}  // namespace utils
-}  // namespace minifi
-}  // namespace nifi
-}  // namespace apache
-}  // namespace org
+}  // namespace org::apache::nifi::minifi::utils
diff --git a/libminifi/include/utils/JsonCallback.h b/libminifi/include/utils/JsonCallback.h
index 99bb07240..4c37afce0 100644
--- a/libminifi/include/utils/JsonCallback.h
+++ b/libminifi/include/utils/JsonCallback.h
@@ -36,7 +36,7 @@ namespace utils {
 class JsonInputCallback {
  public:
   explicit JsonInputCallback(rapidjson::Document& document) : document_(document) {}
-  int64_t operator()(const std::shared_ptr<io::BaseStream>& stream) {
+  int64_t operator()(const std::shared_ptr<io::InputStream>& stream) {
     std::string content;
     content.resize(stream->size());
     const auto read_ret = stream->read(gsl::make_span(content).as_span<std::byte>());
@@ -58,7 +58,7 @@ class JsonOutputCallback {
   explicit JsonOutputCallback(rapidjson::Document&& root, std::optional<uint8_t> decimal_places)
       : root_(std::move(root)), decimal_places_(decimal_places) {}
 
-  int64_t operator()(const std::shared_ptr<io::BaseStream>& stream) const {
+  int64_t operator()(const std::shared_ptr<io::OutputStream>& stream) const {
     rapidjson::StringBuffer buffer;
     rapidjson::Writer<rapidjson::StringBuffer> writer(buffer);
     if (decimal_places_.has_value())
@@ -78,7 +78,7 @@ class PrettyJsonOutputCallback {
   explicit PrettyJsonOutputCallback(rapidjson::Document&& root, std::optional<uint8_t> decimal_places)
       : root_(std::move(root)), decimal_places_(decimal_places) {}
 
-  int64_t operator()(const std::shared_ptr<io::BaseStream>& stream) const {
+  int64_t operator()(const std::shared_ptr<io::OutputStream>& stream) const {
     rapidjson::StringBuffer buffer;
     rapidjson::PrettyWriter<rapidjson::StringBuffer> writer(buffer);
     if (decimal_places_.has_value())
diff --git a/libminifi/include/utils/LineByLineInputOutputStreamCallback.h b/libminifi/include/utils/LineByLineInputOutputStreamCallback.h
index 6538020e2..0d68fe568 100644
--- a/libminifi/include/utils/LineByLineInputOutputStreamCallback.h
+++ b/libminifi/include/utils/LineByLineInputOutputStreamCallback.h
@@ -23,7 +23,8 @@
 #include <vector>
 
 #include "core/logging/Logger.h"
-#include "io/BaseStream.h"
+#include "io/InputStream.h"
+#include "io/OutputStream.h"
 #include "io/StreamPipe.h"
 
 namespace org::apache::nifi::minifi::utils {
@@ -32,7 +33,7 @@ class LineByLineInputOutputStreamCallback {
  public:
   using CallbackType = std::function<std::string(const std::string& input_line, bool is_first_line, bool is_last_line)>;
   explicit LineByLineInputOutputStreamCallback(CallbackType callback);
-  int64_t operator()(const std::shared_ptr<io::BaseStream>& input, const std::shared_ptr<io::BaseStream>& output);
+  int64_t operator()(const std::shared_ptr<io::InputStream>& input, const std::shared_ptr<io::OutputStream>& output);
 
  private:
   int64_t readInput(io::InputStream& stream);
diff --git a/libminifi/src/core/ProcessSession.cpp b/libminifi/src/core/ProcessSession.cpp
index ff5cbb1c7..7eca66138 100644
--- a/libminifi/src/core/ProcessSession.cpp
+++ b/libminifi/src/core/ProcessSession.cpp
@@ -201,14 +201,14 @@ void ProcessSession::remove(const std::shared_ptr<core::FlowFile> &flow) {
   provenance_report_->drop(flow, reason);
 }
 
-void ProcessSession::putAttribute(const std::shared_ptr<core::FlowFile> &flow, std::string key, std::string value) {
+void ProcessSession::putAttribute(const std::shared_ptr<core::FlowFile> &flow, const std::string& key, const std::string& value) {
   flow->setAttribute(key, value);
   std::stringstream details;
   details << process_context_->getProcessorNode()->getName() << " modify flow record " << flow->getUUIDStr() << " attribute " << key << ":" << value;
   provenance_report_->modifyAttributes(flow, details.str());
 }
 
-void ProcessSession::removeAttribute(const std::shared_ptr<core::FlowFile> &flow, std::string key) {
+void ProcessSession::removeAttribute(const std::shared_ptr<core::FlowFile> &flow, const std::string& key) {
   flow->removeAttribute(key);
   std::stringstream details;
   details << process_context_->getProcessorNode()->getName() << " remove flow record " << flow->getUUIDStr() << " attribute " + key;
@@ -221,7 +221,7 @@ void ProcessSession::penalize(const std::shared_ptr<core::FlowFile> &flow) {
   flow->penalize(penalization_period);
 }
 
-void ProcessSession::transfer(const std::shared_ptr<core::FlowFile> &flow, Relationship relationship) {
+void ProcessSession::transfer(const std::shared_ptr<core::FlowFile> &flow, const Relationship& relationship) {
   logging::LOG_INFO(logger_) << "Transferring " << flow->getUUIDStr() << " from " << process_context_->getProcessorNode()->getName() << " to relationship " << relationship.getName();
   utils::Identifier uuid = flow->getUUID();
   _transferRelationship[uuid] = relationship;
@@ -267,7 +267,7 @@ void ProcessSession::writeBuffer(const std::shared_ptr<core::FlowFile>& flow_fil
   writeBuffer(flow_file, buffer.as_span<const std::byte>());
 }
 void ProcessSession::writeBuffer(const std::shared_ptr<core::FlowFile>& flow_file, gsl::span<const std::byte> buffer) {
-  write(flow_file, [buffer](const std::shared_ptr<io::BaseStream>& output_stream) {
+  write(flow_file, [buffer](const std::shared_ptr<io::OutputStream>& output_stream) {
     const auto write_status = output_stream->write(buffer);
     return io::isError(write_status) ? -1 : gsl::narrow<int64_t>(write_status);
   });
@@ -318,7 +318,7 @@ void ProcessSession::appendBuffer(const std::shared_ptr<core::FlowFile>& flow_fi
   appendBuffer(flow_file, buffer.as_span<const std::byte>());
 }
 void ProcessSession::appendBuffer(const std::shared_ptr<core::FlowFile>& flow_file, gsl::span<const std::byte> buffer) {
-  append(flow_file, [buffer](const std::shared_ptr<io::BaseStream>& output_stream) {
+  append(flow_file, [buffer](const std::shared_ptr<io::OutputStream>& output_stream) {
     const auto write_status = output_stream->write(buffer);
     return io::isError(write_status) ? -1 : gsl::narrow<int64_t>(write_status);
   });
@@ -339,7 +339,7 @@ int64_t ProcessSession::read(const std::shared_ptr<core::FlowFile> &flow, const
 
     claim = flow->getResourceClaim();
 
-    std::shared_ptr<io::BaseStream> stream = content_session_->read(claim);
+    std::shared_ptr<io::InputStream> stream = content_session_->read(claim);
 
     if (nullptr == stream) {
       throw Exception(FILE_OPERATION_EXCEPTION, "Failed to open flowfile content for read");
@@ -410,7 +410,7 @@ int64_t ProcessSession::readWrite(const std::shared_ptr<core::FlowFile> &flow, c
 
 detail::ReadBufferResult ProcessSession::readBuffer(const std::shared_ptr<core::FlowFile>& flow) {
   detail::ReadBufferResult result;
-  result.status = read(flow, [&result, this](const std::shared_ptr<io::BaseStream>& input_stream) {
+  result.status = read(flow, [&result, this](const std::shared_ptr<io::InputStream>& input_stream) {
     result.buffer.resize(input_stream->size());
     const auto read_status = input_stream->read(result.buffer);
     if (read_status != result.buffer.size()) {
diff --git a/libminifi/src/core/ProcessSessionReadCallback.cpp b/libminifi/src/core/ProcessSessionReadCallback.cpp
index 810e46c89..caaab46b9 100644
--- a/libminifi/src/core/ProcessSessionReadCallback.cpp
+++ b/libminifi/src/core/ProcessSessionReadCallback.cpp
@@ -24,26 +24,21 @@
 #include <string>
 
 #include "core/logging/LoggerConfiguration.h"
-#include "io/BaseStream.h"
 #include "utils/gsl.h"
 
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace core {
+namespace org::apache::nifi::minifi::core {
 
 ProcessSessionReadCallback::ProcessSessionReadCallback(const std::string &tmpFile,
-                                                       const std::string &destFile,
+                                                       std::string destFile,
                                                        std::shared_ptr<logging::Logger> logger)
-    : logger_(logger),
+    : logger_(std::move(logger)),
     _tmpFileOs(tmpFile, std::ios::binary),
     _tmpFile(tmpFile),
-    _destFile(destFile) {
+    _destFile(std::move(destFile)) {
 }
 
 // Copy the entire file contents to the temporary file
-int64_t ProcessSessionReadCallback::operator()(const std::shared_ptr<io::BaseStream>& stream) {
+int64_t ProcessSessionReadCallback::operator()(const std::shared_ptr<io::InputStream>& stream) {
   // Copy file contents into tmp file
   _writeSucceeded = false;
   size_t size = 0;
@@ -95,8 +90,4 @@ ProcessSessionReadCallback::~ProcessSessionReadCallback() {
   std::remove(_tmpFile.c_str());
 }
 
-}  // namespace core
-}  // namespace minifi
-}  // namespace nifi
-}  // namespace apache
-}  // namespace org
+}  // namespace org::apache::nifi::minifi::core
diff --git a/libminifi/src/io/StreamSlice.cpp b/libminifi/src/io/StreamSlice.cpp
index d87f19345..182a6f511 100644
--- a/libminifi/src/io/StreamSlice.cpp
+++ b/libminifi/src/io/StreamSlice.cpp
@@ -20,7 +20,7 @@
 
 namespace org::apache::nifi::minifi::io {
 
-StreamSlice::StreamSlice(std::shared_ptr<io::BaseStream>& stream, size_t offset, size_t size) : stream_(stream), slice_offset_(offset), slice_size_(size) {
+StreamSlice::StreamSlice(std::shared_ptr<io::InputStream>& stream, size_t offset, size_t size) : stream_(stream), slice_offset_(offset), slice_size_(size) {
   stream_->seek(slice_offset_);
   if (stream_->size() < slice_offset_ + slice_size_)
     throw std::invalid_argument("StreamSlice is bigger than the Stream");
diff --git a/libminifi/src/sitetosite/SiteToSiteClient.cpp b/libminifi/src/sitetosite/SiteToSiteClient.cpp
index e0cfe3670..514599289 100644
--- a/libminifi/src/sitetosite/SiteToSiteClient.cpp
+++ b/libminifi/src/sitetosite/SiteToSiteClient.cpp
@@ -74,7 +74,7 @@ void SiteToSiteClient::deleteTransaction(const utils::Identifier& transactionID)
   known_transactions_.erase(transactionID);
 }
 
-int SiteToSiteClient::writeResponse(const std::shared_ptr<Transaction>& /*transaction*/, RespondCode code, std::string message) {
+int SiteToSiteClient::writeResponse(const std::shared_ptr<Transaction>& /*transaction*/, RespondCode code, const std::string& message) {
   RespondCodeContext *resCode = this->getRespondCodeContext(code);
   if (!resCode) {
     return -1;
@@ -465,7 +465,7 @@ int16_t SiteToSiteClient::send(const utils::Identifier &transactionID, DataPacke
       return -1;
     }
     if (flowFile->getSize() > 0) {
-      session->read(flowFile, [packet](const std::shared_ptr<io::BaseStream>& input_stream) -> int64_t {
+      session->read(flowFile, [packet](const std::shared_ptr<io::InputStream>& input_stream) -> int64_t {
         const auto result = internal::pipe(*input_stream, packet->transaction_->getStream());
         if (result == -1) return -1;
         packet->_size = gsl::narrow<size_t>(result);
@@ -692,7 +692,7 @@ bool SiteToSiteClient::receiveFlowFiles(const std::shared_ptr<core::ProcessConte
       }
 
       if (packet._size > 0) {
-        session->write(flowFile, [&packet](const std::shared_ptr<io::BaseStream>& output_stream) -> int64_t {
+        session->write(flowFile, [&packet](const std::shared_ptr<io::OutputStream>& output_stream) -> int64_t {
           return internal::pipe(packet.transaction_->getStream(), *output_stream);
         });
         if (flowFile->getSize() != packet._size) {
diff --git a/libminifi/src/utils/ByteArrayCallback.cpp b/libminifi/src/utils/ByteArrayCallback.cpp
index 8fe16a946..018d24790 100644
--- a/libminifi/src/utils/ByteArrayCallback.cpp
+++ b/libminifi/src/utils/ByteArrayCallback.cpp
@@ -24,13 +24,9 @@
 
 #include "utils/gsl.h"
 
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace utils {
+namespace org::apache::nifi::minifi::utils {
 
-int64_t ByteOutputCallback::operator()(const std::shared_ptr<io::BaseStream>& stream) {
+int64_t ByteOutputCallback::operator()(const std::shared_ptr<io::InputStream>& stream) {
   stream->seek(0);
   if (stream->size() > 0) {
     std::vector<std::byte> buffer;
@@ -42,20 +38,6 @@ int64_t ByteOutputCallback::operator()(const std::shared_ptr<io::BaseStream>& st
   return gsl::narrow<int64_t>(size_.load());
 }
 
-int64_t StreamOutputCallback::operator()(const std::shared_ptr<io::BaseStream>& stream) {
-  stream->seek(0);
-  std::unique_ptr<char> buffer = std::unique_ptr<char>(new char[size_.load()]);
-  auto written = readFully(buffer.get(), size_);
-  stream->write(reinterpret_cast<uint8_t*>(buffer.get()), written);
-  return gsl::narrow<int64_t>(stream->size());
-}
-
-void StreamOutputCallback::write(char *data, size_t size) {
-  if (!is_alive_)
-    return;
-  write_and_notify(data, size);
-}
-
 std::vector<char> ByteOutputCallback::to_string() {
   std::vector<char> buffer;
   buffer.resize(size_.load());
@@ -173,8 +155,5 @@ bool ByteOutputCallback::preload_next_str() {
   size_ -= current_str.size();
   return true;
 }
-} /* namespace utils */
-} /* namespace minifi */
-} /* namespace nifi */
-} /* namespace apache */
-} /* namespace org */
+
+}  // namespace org::apache::nifi::minifi::utils
diff --git a/libminifi/src/utils/FileReaderCallback.cpp b/libminifi/src/utils/FileReaderCallback.cpp
index 825ae3647..5d6e54c01 100644
--- a/libminifi/src/utils/FileReaderCallback.cpp
+++ b/libminifi/src/utils/FileReaderCallback.cpp
@@ -29,18 +29,14 @@ constexpr std::size_t BUFFER_SIZE = 4096;
 
 }  // namespace
 
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace utils {
+namespace org::apache::nifi::minifi::utils {
 
 FileReaderCallback::FileReaderCallback(std::string file_name)
     : file_name_{std::move(file_name)},
     logger_(core::logging::LoggerFactory<FileReaderCallback>::getLogger()) {
 }
 
-int64_t FileReaderCallback::operator()(const std::shared_ptr<io::BaseStream>& output_stream) const {
+int64_t FileReaderCallback::operator()(const std::shared_ptr<io::OutputStream>& output_stream) const {
   std::array<char, BUFFER_SIZE> buffer;
   uint64_t num_bytes_written = 0;
 
@@ -66,8 +62,4 @@ int64_t FileReaderCallback::operator()(const std::shared_ptr<io::BaseStream>& ou
   return num_bytes_written;
 }
 
-}  // namespace utils
-}  // namespace minifi
-}  // namespace nifi
-}  // namespace apache
-}  // namespace org
+}  // namespace org::apache::nifi::minifi::utils
diff --git a/libminifi/src/utils/LineByLineInputOutputStreamCallback.cpp b/libminifi/src/utils/LineByLineInputOutputStreamCallback.cpp
index f06ba7786..9f11131d5 100644
--- a/libminifi/src/utils/LineByLineInputOutputStreamCallback.cpp
+++ b/libminifi/src/utils/LineByLineInputOutputStreamCallback.cpp
@@ -25,7 +25,7 @@ LineByLineInputOutputStreamCallback::LineByLineInputOutputStreamCallback(Callbac
   : callback_(std::move(callback)) {
 }
 
-int64_t LineByLineInputOutputStreamCallback::operator()(const std::shared_ptr<io::BaseStream>& input, const std::shared_ptr<io::BaseStream>& output) {
+int64_t LineByLineInputOutputStreamCallback::operator()(const std::shared_ptr<io::InputStream>& input, const std::shared_ptr<io::OutputStream>& output) {
   gsl_Expects(input);
   gsl_Expects(output);
 
diff --git a/libminifi/test/archive-tests/CompressContentTests.cpp b/libminifi/test/archive-tests/CompressContentTests.cpp
index df2bcf208..f061a08ab 100644
--- a/libminifi/test/archive-tests/CompressContentTests.cpp
+++ b/libminifi/test/archive-tests/CompressContentTests.cpp
@@ -54,7 +54,7 @@ class ReadCallback {
   ReadCallback& operator=(const ReadCallback&) = delete;
   ReadCallback& operator=(ReadCallback&&) = delete;
 
-  int64_t operator()(const std::shared_ptr<minifi::io::BaseStream>& stream) {
+  int64_t operator()(const std::shared_ptr<minifi::io::InputStream>& stream) {
     int64_t total_read = 0;
     do {
       const auto ret = stream->read(gsl::make_span(buffer_).subspan(read_size_));
diff --git a/libminifi/test/archive-tests/MergeFileTests.cpp b/libminifi/test/archive-tests/MergeFileTests.cpp
index dc5e3ba64..d3bb0c1c0 100644
--- a/libminifi/test/archive-tests/MergeFileTests.cpp
+++ b/libminifi/test/archive-tests/MergeFileTests.cpp
@@ -97,7 +97,7 @@ class FixedBuffer {
     } while (size_ != capacity_);
     return total_read;
   }
-  int64_t operator()(const std::shared_ptr<minifi::io::BaseStream>& stream) {
+  int64_t operator()(const std::shared_ptr<minifi::io::InputStream>& stream) {
     return write(*stream, capacity_);
   }
 
@@ -680,7 +680,7 @@ TEST_CASE_METHOD(MergeTestController, "Test Merge File Attributes Keeping All Un
   REQUIRE(attributes["mime.type"] == "application/tar");
 }
 
-void writeString(const std::string& str, const std::shared_ptr<minifi::io::BaseStream>& out) {
+void writeString(const std::string& str, const std::shared_ptr<minifi::io::OutputStream>& out) {
   out->write(reinterpret_cast<const uint8_t*>(str.data()), str.length());
 }
 
diff --git a/libminifi/test/rocksdb-tests/ContentSessionTests.cpp b/libminifi/test/rocksdb-tests/ContentSessionTests.cpp
index 04c906a42..a73e9c19e 100644
--- a/libminifi/test/rocksdb-tests/ContentSessionTests.cpp
+++ b/libminifi/test/rocksdb-tests/ContentSessionTests.cpp
@@ -47,12 +47,12 @@ class ContentSessionController : public TestController {
   std::shared_ptr<core::ContentRepository> contentRepository;
 };
 
-const std::shared_ptr<minifi::io::BaseStream>& operator<<(const std::shared_ptr<minifi::io::BaseStream>& stream, const std::string& str) {
+const std::shared_ptr<minifi::io::OutputStream>& operator<<(const std::shared_ptr<minifi::io::OutputStream>& stream, const std::string& str) {
   REQUIRE(stream->write(reinterpret_cast<const uint8_t*>(str.data()), str.length()) == str.length());
   return stream;
 }
 
-const std::shared_ptr<minifi::io::BaseStream>& operator>>(const std::shared_ptr<minifi::io::BaseStream>& stream, std::string& str) {
+const std::shared_ptr<minifi::io::InputStream>& operator>>(const std::shared_ptr<minifi::io::InputStream>& stream, std::string& str) {
   str = "";
   std::array<std::byte, 4096> buffer{};
   while (true) {
diff --git a/libminifi/test/rocksdb-tests/SwapTests.cpp b/libminifi/test/rocksdb-tests/SwapTests.cpp
index 053f08333..97716a58d 100644
--- a/libminifi/test/rocksdb-tests/SwapTests.cpp
+++ b/libminifi/test/rocksdb-tests/SwapTests.cpp
@@ -56,7 +56,7 @@ class OutputProcessor : public core::Processor {
     auto id = std::to_string(next_id_++);
     auto ff = session->create();
     ff->addAttribute("index", id);
-    session->write(ff, [&] (const std::shared_ptr<minifi::io::BaseStream>& output) -> int64_t {
+    session->write(ff, [&] (const std::shared_ptr<minifi::io::OutputStream>& output) -> int64_t {
       auto ret = output->write(gsl::span<const char>(id.data(), id.size()).as_span<const std::byte>());
       if (minifi::io::isError(ret)) {
         return -1;
diff --git a/libminifi/test/unit/CRCTests.cpp b/libminifi/test/unit/CRCTests.cpp
index a6d08b45e..3defb7eca 100644
--- a/libminifi/test/unit/CRCTests.cpp
+++ b/libminifi/test/unit/CRCTests.cpp
@@ -19,21 +19,21 @@
 #include <string>
 #include <vector>
 #include "io/CRCStream.h"
-#include "io/BufferStream.h"
+#include "io/OutputStream.h"
 #include "../TestBase.h"
 #include "../Catch.h"
 #include "utils/gsl.h"
 
 TEST_CASE("Test CRC1", "[testcrc1]") {
   org::apache::nifi::minifi::io::BufferStream base;
-  org::apache::nifi::minifi::io::CRCStream<org::apache::nifi::minifi::io::BaseStream> test(gsl::make_not_null(&base));
+  org::apache::nifi::minifi::io::CRCStream<org::apache::nifi::minifi::io::OutputStream> test(gsl::make_not_null(&base));
   test.write(reinterpret_cast<const uint8_t*>("cow"), 3);
   REQUIRE(2580823964 == test.getCRC());
 }
 
 TEST_CASE("Test CRC2", "[testcrc2]") {
   org::apache::nifi::minifi::io::BufferStream base;
-  org::apache::nifi::minifi::io::CRCStream<org::apache::nifi::minifi::io::BaseStream> test(gsl::make_not_null(&base));
+  org::apache::nifi::minifi::io::CRCStream<org::apache::nifi::minifi::io::OutputStream> test(gsl::make_not_null(&base));
   std::string fox = "the quick brown fox jumped over the brown fox";
   std::vector<uint8_t> charvect(fox.begin(), fox.end());
   test.write(charvect, charvect.size());
@@ -42,7 +42,7 @@ TEST_CASE("Test CRC2", "[testcrc2]") {
 
 TEST_CASE("Test CRC3", "[testcrc3]") {
   org::apache::nifi::minifi::io::BufferStream base;
-  org::apache::nifi::minifi::io::CRCStream<org::apache::nifi::minifi::io::BaseStream> test(gsl::make_not_null(&base));
+  org::apache::nifi::minifi::io::CRCStream<org::apache::nifi::minifi::io::OutputStream> test(gsl::make_not_null(&base));
   uint64_t number = 7;
   test.write(number);
   REQUIRE(4215687882 == test.getCRC());
@@ -50,7 +50,7 @@ TEST_CASE("Test CRC3", "[testcrc3]") {
 
 TEST_CASE("Test CRC4", "[testcrc4]") {
   org::apache::nifi::minifi::io::BufferStream base;
-  org::apache::nifi::minifi::io::CRCStream<org::apache::nifi::minifi::io::BaseStream> test(gsl::make_not_null(&base));
+  org::apache::nifi::minifi::io::CRCStream<org::apache::nifi::minifi::io::OutputStream> test(gsl::make_not_null(&base));
   uint32_t number = 7;
   test.write(number);
   REQUIRE(3206564543 == test.getCRC());
@@ -58,7 +58,7 @@ TEST_CASE("Test CRC4", "[testcrc4]") {
 
 TEST_CASE("Test CRC5", "[testcrc5]") {
   org::apache::nifi::minifi::io::BufferStream base;
-  org::apache::nifi::minifi::io::CRCStream<org::apache::nifi::minifi::io::BaseStream> test(gsl::make_not_null(&base));
+  org::apache::nifi::minifi::io::CRCStream<org::apache::nifi::minifi::io::OutputStream> test(gsl::make_not_null(&base));
   uint16_t number = 7;
   test.write(number);
   REQUIRE(3753740124 == test.getCRC());
@@ -66,10 +66,10 @@ TEST_CASE("Test CRC5", "[testcrc5]") {
 
 TEST_CASE("CRCStream with initial crc = 0 is the same as without initial crc", "[initial_crc_arg]") {
   org::apache::nifi::minifi::io::BufferStream base1;
-  org::apache::nifi::minifi::io::CRCStream<org::apache::nifi::minifi::io::BaseStream> test_noinit(gsl::make_not_null(&base1));
+  org::apache::nifi::minifi::io::CRCStream<org::apache::nifi::minifi::io::OutputStream> test_noinit(gsl::make_not_null(&base1));
 
   org::apache::nifi::minifi::io::BufferStream base2;
-  org::apache::nifi::minifi::io::CRCStream<org::apache::nifi::minifi::io::BaseStream> test_initzero(gsl::make_not_null(&base2), 0);
+  org::apache::nifi::minifi::io::CRCStream<org::apache::nifi::minifi::io::OutputStream> test_initzero(gsl::make_not_null(&base2), 0);
 
   const std::string textString = "The quick brown fox jumps over the lazy dog";
   std::vector<uint8_t> textVector1(textString.begin(), textString.end());
@@ -84,17 +84,17 @@ TEST_CASE("CRCStream: one long write is the same as writing in two pieces", "[in
   const std::string textString = "The quick brown fox jumps over the lazy dog";
 
   org::apache::nifi::minifi::io::BufferStream base_full;
-  org::apache::nifi::minifi::io::CRCStream<org::apache::nifi::minifi::io::BaseStream> test_full(gsl::make_not_null(&base_full), 0);
+  org::apache::nifi::minifi::io::CRCStream<org::apache::nifi::minifi::io::OutputStream> test_full(gsl::make_not_null(&base_full), 0);
   std::vector<uint8_t> textVector_full(textString.begin(), textString.end());
   test_full.write(textVector_full, textVector_full.size());
 
   org::apache::nifi::minifi::io::BufferStream base_piece1;
-  org::apache::nifi::minifi::io::CRCStream<org::apache::nifi::minifi::io::BaseStream> test_piece1(gsl::make_not_null(&base_piece1), 0);
+  org::apache::nifi::minifi::io::CRCStream<org::apache::nifi::minifi::io::OutputStream> test_piece1(gsl::make_not_null(&base_piece1), 0);
   std::vector<uint8_t> textVector_piece1(textString.begin(), textString.begin() + 15);
   test_piece1.write(textVector_piece1, textVector_piece1.size());
 
   org::apache::nifi::minifi::io::BufferStream base_piece2;
-  org::apache::nifi::minifi::io::CRCStream<org::apache::nifi::minifi::io::BaseStream> test_piece2(gsl::make_not_null(&base_piece2), test_piece1.getCRC());
+  org::apache::nifi::minifi::io::CRCStream<org::apache::nifi::minifi::io::OutputStream> test_piece2(gsl::make_not_null(&base_piece2), test_piece1.getCRC());
   std::vector<uint8_t> textVector_piece2(textString.begin() + 15, textString.end());
   test_piece2.write(textVector_piece2, textVector_piece2.size());
 
diff --git a/libminifi/test/unit/ContentRepositoryDependentTests.h b/libminifi/test/unit/ContentRepositoryDependentTests.h
index 17c60e612..a476b4558 100644
--- a/libminifi/test/unit/ContentRepositoryDependentTests.h
+++ b/libminifi/test/unit/ContentRepositoryDependentTests.h
@@ -36,7 +36,7 @@ namespace ContentRepositoryDependentTests {
 struct ReadUntilItCan {
   std::string value_;
 
-  int64_t operator()(const std::shared_ptr<minifi::io::BaseStream> &stream) {
+  int64_t operator()(const std::shared_ptr<minifi::io::InputStream> &stream) {
     value_.clear();
     std::array<std::byte, 1024> buffer{};
     size_t bytes_read = 0;
diff --git a/libminifi/test/unit/FlowFileSerializationTests.cpp b/libminifi/test/unit/FlowFileSerializationTests.cpp
index dcfca26eb..b623f4887 100644
--- a/libminifi/test/unit/FlowFileSerializationTests.cpp
+++ b/libminifi/test/unit/FlowFileSerializationTests.cpp
@@ -19,7 +19,6 @@
 #include <memory>
 #include <string>
 
-#include "io/BaseStream.h"
 #include "serialization/FlowFileV3Serializer.h"
 #include "serialization/PayloadSerializer.h"
 #include "core/FlowFile.h"
diff --git a/libminifi/test/unit/SerializationTests.cpp b/libminifi/test/unit/SerializationTests.cpp
index 880bec58e..8ba130f92 100644
--- a/libminifi/test/unit/SerializationTests.cpp
+++ b/libminifi/test/unit/SerializationTests.cpp
@@ -19,7 +19,6 @@
 #include <memory>
 #include <string>
 
-#include "io/BaseStream.h"
 #include "SiteToSiteHelper.h"
 #include "../TestBase.h"
 #include "../Catch.h"
diff --git a/libminifi/test/unit/StreamTests.cpp b/libminifi/test/unit/StreamTests.cpp
index d2df3f0b3..dc77c5438 100644
--- a/libminifi/test/unit/StreamTests.cpp
+++ b/libminifi/test/unit/StreamTests.cpp
@@ -83,14 +83,16 @@ TEST_CASE("TestWrite1", "[testwrite]") {
 TEST_CASE("InvalidStreamSliceTest", "[teststreamslice]") {
   std::shared_ptr<minifi::io::BaseStream> base = std::make_shared<minifi::io::BufferStream>();
   base->write((const uint8_t*)"\x01\x02\x03\x04\x05\x06\x07\x08", 8);
-  REQUIRE_THROWS_WITH(std::make_shared<minifi::io::StreamSlice>(base, 0, 9), "StreamSlice is bigger than the Stream");
-  REQUIRE_THROWS_WITH(std::make_shared<minifi::io::StreamSlice>(base, 7, 3), "StreamSlice is bigger than the Stream");
+  auto input_stream = std::static_pointer_cast<minifi::io::InputStream>(base);
+  REQUIRE_THROWS_WITH(std::make_shared<minifi::io::StreamSlice>(input_stream, 0, 9), "StreamSlice is bigger than the Stream");
+  REQUIRE_THROWS_WITH(std::make_shared<minifi::io::StreamSlice>(input_stream, 7, 3), "StreamSlice is bigger than the Stream");
 }
 
 TEST_CASE("StreamSliceTest1", "[teststreamslice]") {
   std::shared_ptr<minifi::io::BaseStream> base = std::make_shared<minifi::io::BufferStream>();
   base->write((const uint8_t*)"\x01\x02\x03\x04\x05\x06\x07\x08", 8);
-  std::shared_ptr<minifi::io::BaseStream> stream_slice = std::make_shared<minifi::io::StreamSlice>(base, 2, 4);
+  auto input_stream = std::static_pointer_cast<minifi::io::InputStream>(base);
+  std::shared_ptr<minifi::io::InputStream> stream_slice = std::make_shared<minifi::io::StreamSlice>(input_stream, 2, 4);
   std::vector<std::byte> buffer;
   buffer.resize(stream_slice->size());
   REQUIRE(stream_slice->read(buffer) == 4);
diff --git a/nanofi/include/cxx/CallbackProcessor.h b/nanofi/include/cxx/CallbackProcessor.h
index b5ae9c061..082e4c35f 100644
--- a/nanofi/include/cxx/CallbackProcessor.h
+++ b/nanofi/include/cxx/CallbackProcessor.h
@@ -30,7 +30,6 @@
 #include <utility>
 #include <sys/types.h>
 #include "core/cstructs.h"
-#include "io/BaseStream.h"
 #include "FlowFileRecord.h"
 #include "core/Processor.h"
 #include "core/ProcessSession.h"
diff --git a/nanofi/include/cxx/ReflexiveSession.h b/nanofi/include/cxx/ReflexiveSession.h
index 4804d9385..07232d3a1 100644
--- a/nanofi/include/cxx/ReflexiveSession.h
+++ b/nanofi/include/cxx/ReflexiveSession.h
@@ -15,8 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-#ifndef __REFLEXIVE_SESSION_H__
-#define __REFLEXIVE_SESSION_H__
+#pragma once
 
 #include <vector>
 #include <queue>
@@ -28,47 +27,30 @@
 
 #include "core/ProcessSession.h"
 
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace core {
+namespace org::apache::nifi::minifi::core {
 
-// ReflexiveSession Class
 class ReflexiveSession : public ProcessSession{
  public:
-  // Constructor
-  /*!
-   * Create a new process session
-   */
   ReflexiveSession(std::shared_ptr<ProcessContext> processContext = nullptr)
-      : ProcessSession(processContext) {
+    : ProcessSession(processContext) {
   }
 
-// Destructor
-  virtual ~ReflexiveSession() = default;
-
-   virtual std::shared_ptr<core::FlowFile> get() {
-     auto prevff = ff;
-     ff = nullptr;
-     return prevff;
-   }
+  std::shared_ptr<core::FlowFile> get() override {
+    auto prevff = ff;
+    ff = nullptr;
+    return prevff;
+  }
 
-   virtual void add(const std::shared_ptr<core::FlowFile> &flow) {
-     ff = flow;
-   }
-   virtual void transfer(const std::shared_ptr<core::FlowFile>& /*flow*/, Relationship /*relationship*/) {
-     // no op
-   }
+  void add(const std::shared_ptr<core::FlowFile> &flow) override {
+    ff = flow;
+  }
+  void transfer(const std::shared_ptr<core::FlowFile>& /*flow*/, const Relationship& /*relationship*/) override {
+    // no op
+  }
  protected:
-  //
+
   // Get the FlowFile from the highest priority queue
   std::shared_ptr<core::FlowFile> ff;
 };
 
-} /* namespace core */
-} /* namespace minifi */
-} /* namespace nifi */
-} /* namespace apache */
-} /* namespace org */
-#endif
+}  // namespace org::apache::nifi::minifi::core


[nifi-minifi-cpp] 01/02: MINIFICPP-1938 Enable parallel onTrigger calls for Azure and AWS processors

Posted by sz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

szaszm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit 0014de517014b46393ec4aca8a01022302c977b0
Author: Gabor Gyimesi <ga...@gmail.com>
AuthorDate: Wed Oct 12 16:46:52 2022 +0200

    MINIFICPP-1938 Enable parallel onTrigger calls for Azure and AWS processors
    
    Closes #1422
    Signed-off-by: Marton Szasz <sz...@apache.org>
---
 extensions/aws/processors/DeleteS3Object.h         |  2 +-
 extensions/aws/processors/FetchS3Object.h          |  2 +-
 extensions/aws/processors/PutS3Object.h            |  2 +-
 .../azure/processors/DeleteAzureBlobStorage.h      |  2 +-
 .../azure/processors/DeleteAzureDataLakeStorage.h  |  2 +-
 .../azure/processors/FetchAzureDataLakeStorage.h   |  2 +-
 .../azure/processors/PutAzureDataLakeStorage.h     |  2 +-
 .../azure/storage/AzureBlobStorageClient.cpp       | 40 ++++++++--------------
 extensions/azure/storage/AzureBlobStorageClient.h  |  5 +--
 .../azure/storage/AzureDataLakeStorageClient.cpp   | 26 +++++---------
 .../azure/storage/AzureDataLakeStorageClient.h     | 13 +++----
 11 files changed, 36 insertions(+), 62 deletions(-)

diff --git a/extensions/aws/processors/DeleteS3Object.h b/extensions/aws/processors/DeleteS3Object.h
index 1566eb86a..8b3214cc2 100644
--- a/extensions/aws/processors/DeleteS3Object.h
+++ b/extensions/aws/processors/DeleteS3Object.h
@@ -52,7 +52,7 @@ class DeleteS3Object : public S3Processor {
   EXTENSIONAPI static constexpr bool SupportsDynamicProperties = true;
   EXTENSIONAPI static constexpr bool SupportsDynamicRelationships = false;
   EXTENSIONAPI static constexpr core::annotation::Input InputRequirement = core::annotation::Input::INPUT_REQUIRED;
-  EXTENSIONAPI static constexpr bool IsSingleThreaded = true;
+  EXTENSIONAPI static constexpr bool IsSingleThreaded = false;
 
   ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_PROCESSORS
 
diff --git a/extensions/aws/processors/FetchS3Object.h b/extensions/aws/processors/FetchS3Object.h
index 5a5b73344..49377ee77 100644
--- a/extensions/aws/processors/FetchS3Object.h
+++ b/extensions/aws/processors/FetchS3Object.h
@@ -56,7 +56,7 @@ class FetchS3Object : public S3Processor {
   EXTENSIONAPI static constexpr bool SupportsDynamicProperties = true;
   EXTENSIONAPI static constexpr bool SupportsDynamicRelationships = false;
   EXTENSIONAPI static constexpr core::annotation::Input InputRequirement = core::annotation::Input::INPUT_REQUIRED;
-  EXTENSIONAPI static constexpr bool IsSingleThreaded = true;
+  EXTENSIONAPI static constexpr bool IsSingleThreaded = false;
 
   ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_PROCESSORS
 
diff --git a/extensions/aws/processors/PutS3Object.h b/extensions/aws/processors/PutS3Object.h
index 13f0cbd60..f04f89671 100644
--- a/extensions/aws/processors/PutS3Object.h
+++ b/extensions/aws/processors/PutS3Object.h
@@ -76,7 +76,7 @@ class PutS3Object : public S3Processor {
   EXTENSIONAPI static constexpr bool SupportsDynamicProperties = true;
   EXTENSIONAPI static constexpr bool SupportsDynamicRelationships = false;
   EXTENSIONAPI static constexpr core::annotation::Input InputRequirement = core::annotation::Input::INPUT_REQUIRED;
-  EXTENSIONAPI static constexpr bool IsSingleThreaded = true;
+  EXTENSIONAPI static constexpr bool IsSingleThreaded = false;
 
   ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_PROCESSORS
 
diff --git a/extensions/azure/processors/DeleteAzureBlobStorage.h b/extensions/azure/processors/DeleteAzureBlobStorage.h
index f185094e0..22a8e51a3 100644
--- a/extensions/azure/processors/DeleteAzureBlobStorage.h
+++ b/extensions/azure/processors/DeleteAzureBlobStorage.h
@@ -52,7 +52,7 @@ class DeleteAzureBlobStorage final : public AzureBlobStorageSingleBlobProcessorB
   EXTENSIONAPI static constexpr bool SupportsDynamicProperties = false;
   EXTENSIONAPI static constexpr bool SupportsDynamicRelationships = false;
   EXTENSIONAPI static constexpr core::annotation::Input InputRequirement = core::annotation::Input::INPUT_REQUIRED;
-  EXTENSIONAPI static constexpr bool IsSingleThreaded = true;
+  EXTENSIONAPI static constexpr bool IsSingleThreaded = false;
 
   ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_PROCESSORS
 
diff --git a/extensions/azure/processors/DeleteAzureDataLakeStorage.h b/extensions/azure/processors/DeleteAzureDataLakeStorage.h
index 89f0e682c..ea5f6277b 100644
--- a/extensions/azure/processors/DeleteAzureDataLakeStorage.h
+++ b/extensions/azure/processors/DeleteAzureDataLakeStorage.h
@@ -44,7 +44,7 @@ class DeleteAzureDataLakeStorage final : public AzureDataLakeStorageFileProcesso
   EXTENSIONAPI static constexpr bool SupportsDynamicProperties = false;
   EXTENSIONAPI static constexpr bool SupportsDynamicRelationships = false;
   EXTENSIONAPI static constexpr core::annotation::Input InputRequirement = core::annotation::Input::INPUT_REQUIRED;
-  EXTENSIONAPI static constexpr bool IsSingleThreaded = true;
+  EXTENSIONAPI static constexpr bool IsSingleThreaded = false;
 
   ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_PROCESSORS
 
diff --git a/extensions/azure/processors/FetchAzureDataLakeStorage.h b/extensions/azure/processors/FetchAzureDataLakeStorage.h
index 7e9cd295a..f4d14265d 100644
--- a/extensions/azure/processors/FetchAzureDataLakeStorage.h
+++ b/extensions/azure/processors/FetchAzureDataLakeStorage.h
@@ -52,7 +52,7 @@ class FetchAzureDataLakeStorage final : public AzureDataLakeStorageFileProcessor
   EXTENSIONAPI static constexpr bool SupportsDynamicProperties = false;
   EXTENSIONAPI static constexpr bool SupportsDynamicRelationships = false;
   EXTENSIONAPI static constexpr core::annotation::Input InputRequirement = core::annotation::Input::INPUT_REQUIRED;
-  EXTENSIONAPI static constexpr bool IsSingleThreaded = true;
+  EXTENSIONAPI static constexpr bool IsSingleThreaded = false;
 
   ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_PROCESSORS
 
diff --git a/extensions/azure/processors/PutAzureDataLakeStorage.h b/extensions/azure/processors/PutAzureDataLakeStorage.h
index 420175a27..ac674dbcf 100644
--- a/extensions/azure/processors/PutAzureDataLakeStorage.h
+++ b/extensions/azure/processors/PutAzureDataLakeStorage.h
@@ -51,7 +51,7 @@ class PutAzureDataLakeStorage final : public AzureDataLakeStorageFileProcessorBa
   EXTENSIONAPI static constexpr bool SupportsDynamicProperties = false;
   EXTENSIONAPI static constexpr bool SupportsDynamicRelationships = false;
   EXTENSIONAPI static constexpr core::annotation::Input InputRequirement = core::annotation::Input::INPUT_REQUIRED;
-  EXTENSIONAPI static constexpr bool IsSingleThreaded = true;
+  EXTENSIONAPI static constexpr bool IsSingleThreaded = false;
 
   ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_PROCESSORS
 
diff --git a/extensions/azure/storage/AzureBlobStorageClient.cpp b/extensions/azure/storage/AzureBlobStorageClient.cpp
index 6b7ee3388..f83302ded 100644
--- a/extensions/azure/storage/AzureBlobStorageClient.cpp
+++ b/extensions/azure/storage/AzureBlobStorageClient.cpp
@@ -54,59 +54,49 @@ AzureBlobStorageClient::AzureBlobStorageClient() {
   utils::AzureSdkLogger::initialize();
 }
 
-void AzureBlobStorageClient::resetClientIfNeeded(const AzureStorageCredentials &credentials, const std::string &container_name) {
-  if (container_client_ && credentials == credentials_ && container_name == container_name_) {
-    logger_->log_debug("Azure Blob Storage client credentials have not changed, no need to reset client");
-    return;
-  }
-
+std::unique_ptr<Azure::Storage::Blobs::BlobContainerClient> AzureBlobStorageClient::createClient(const AzureStorageCredentials &credentials, const std::string &container_name) {
   if (credentials.getUseManagedIdentityCredentials()) {
     auto storage_client = Azure::Storage::Blobs::BlobServiceClient(
       "https://" + credentials.getStorageAccountName() + ".blob." + credentials.getEndpointSuffix(), std::make_shared<Azure::Identity::ManagedIdentityCredential>());
 
-    container_client_ = std::make_unique<Azure::Storage::Blobs::BlobContainerClient>(storage_client.GetBlobContainerClient(container_name));
-    logger_->log_debug("Azure Blob Storage client has been reset with new managed identity credentials.");
+    return std::make_unique<Azure::Storage::Blobs::BlobContainerClient>(storage_client.GetBlobContainerClient(container_name));
   } else {
-    container_client_ = std::make_unique<Azure::Storage::Blobs::BlobContainerClient>(
+    return std::make_unique<Azure::Storage::Blobs::BlobContainerClient>(
       Azure::Storage::Blobs::BlobContainerClient::CreateFromConnectionString(credentials.buildConnectionString(), container_name));
-    logger_->log_debug("Azure Blob Storage client has been reset with new connection string credentials.");
   }
-
-  credentials_ = credentials;
-  container_name_ = container_name;
 }
 
 bool AzureBlobStorageClient::createContainerIfNotExists(const PutAzureBlobStorageParameters& params) {
-  resetClientIfNeeded(params.credentials, params.container_name);
-  return container_client_->CreateIfNotExists().Value.Created;
+  auto container_client = createClient(params.credentials, params.container_name);
+  return container_client->CreateIfNotExists().Value.Created;
 }
 
 Azure::Storage::Blobs::Models::UploadBlockBlobResult AzureBlobStorageClient::uploadBlob(const PutAzureBlobStorageParameters& params, gsl::span<const std::byte> buffer) {
-  resetClientIfNeeded(params.credentials, params.container_name);
-  auto blob_client = container_client_->GetBlockBlobClient(params.blob_name);
+  auto container_client = createClient(params.credentials, params.container_name);
+  auto blob_client = container_client->GetBlockBlobClient(params.blob_name);
   return blob_client.UploadFrom(reinterpret_cast<const uint8_t*>(buffer.data()), buffer.size()).Value;
 }
 
 std::string AzureBlobStorageClient::getUrl(const AzureBlobStorageParameters& params) {
-  resetClientIfNeeded(params.credentials, params.container_name);
-  return container_client_->GetUrl();
+  auto container_client = createClient(params.credentials, params.container_name);
+  return container_client->GetUrl();
 }
 
 bool AzureBlobStorageClient::deleteBlob(const DeleteAzureBlobStorageParameters& params) {
-  resetClientIfNeeded(params.credentials, params.container_name);
+  auto container_client = createClient(params.credentials, params.container_name);
   Azure::Storage::Blobs::DeleteBlobOptions delete_options;
   if (params.optional_deletion == OptionalDeletion::INCLUDE_SNAPSHOTS) {
     delete_options.DeleteSnapshots = Azure::Storage::Blobs::Models::DeleteSnapshotsOption::IncludeSnapshots;
   } else if (params.optional_deletion == OptionalDeletion::DELETE_SNAPSHOTS_ONLY) {
     delete_options.DeleteSnapshots = Azure::Storage::Blobs::Models::DeleteSnapshotsOption::OnlySnapshots;
   }
-  auto response = container_client_->DeleteBlob(params.blob_name, delete_options);
+  auto response = container_client->DeleteBlob(params.blob_name, delete_options);
   return response.Value.Deleted;
 }
 
 std::unique_ptr<io::InputStream> AzureBlobStorageClient::fetchBlob(const FetchAzureBlobStorageParameters& params) {
-  resetClientIfNeeded(params.credentials, params.container_name);
-  auto blob_client = container_client_->GetBlobClient(params.blob_name);
+  auto container_client = createClient(params.credentials, params.container_name);
+  auto blob_client = container_client->GetBlobClient(params.blob_name);
   Azure::Storage::Blobs::DownloadBlobOptions options;
   if (params.range_start || params.range_length) {
     Azure::Core::Http::HttpRange range;
@@ -125,10 +115,10 @@ std::unique_ptr<io::InputStream> AzureBlobStorageClient::fetchBlob(const FetchAz
 
 std::vector<Azure::Storage::Blobs::Models::BlobItem> AzureBlobStorageClient::listContainer(const ListAzureBlobStorageParameters& params) {
   std::vector<Azure::Storage::Blobs::Models::BlobItem> result;
-  resetClientIfNeeded(params.credentials, params.container_name);
+  auto container_client = createClient(params.credentials, params.container_name);
   Azure::Storage::Blobs::ListBlobsOptions options;
   options.Prefix = params.prefix;
-  for (auto page_result = container_client_->ListBlobs(options); page_result.HasPage(); page_result.MoveToNextPage()) {
+  for (auto page_result = container_client->ListBlobs(options); page_result.HasPage(); page_result.MoveToNextPage()) {
     result.insert(result.end(), page_result.Blobs.begin(), page_result.Blobs.end());
   }
   return result;
diff --git a/extensions/azure/storage/AzureBlobStorageClient.h b/extensions/azure/storage/AzureBlobStorageClient.h
index 3f9378b21..325f4ef24 100644
--- a/extensions/azure/storage/AzureBlobStorageClient.h
+++ b/extensions/azure/storage/AzureBlobStorageClient.h
@@ -43,11 +43,8 @@ class AzureBlobStorageClient : public BlobStorageClient {
   std::vector<Azure::Storage::Blobs::Models::BlobItem> listContainer(const ListAzureBlobStorageParameters& params) override;
 
  private:
-  void resetClientIfNeeded(const AzureStorageCredentials& credentials, const std::string &container_name);
+  static std::unique_ptr<Azure::Storage::Blobs::BlobContainerClient> createClient(const AzureStorageCredentials& credentials, const std::string &container_name);
 
-  AzureStorageCredentials credentials_;
-  std::string container_name_;
-  std::unique_ptr<Azure::Storage::Blobs::BlobContainerClient> container_client_;
   std::shared_ptr<core::logging::Logger> logger_{core::logging::LoggerFactory<AzureBlobStorageClient>::getLogger()};
 };
 
diff --git a/extensions/azure/storage/AzureDataLakeStorageClient.cpp b/extensions/azure/storage/AzureDataLakeStorageClient.cpp
index e26cc73e3..d64ba296b 100644
--- a/extensions/azure/storage/AzureDataLakeStorageClient.cpp
+++ b/extensions/azure/storage/AzureDataLakeStorageClient.cpp
@@ -34,12 +34,8 @@ AzureDataLakeStorageClient::AzureDataLakeStorageClient() {
   utils::AzureSdkLogger::initialize();
 }
 
-void AzureDataLakeStorageClient::resetClientIfNeeded(const AzureStorageCredentials& credentials, const std::string& file_system_name, std::optional<uint64_t> number_of_retries) {
-  if (client_ && credentials_ == credentials && file_system_name_ == file_system_name && number_of_retries_ == number_of_retries) {
-    logger_->log_debug("Azure Data Lake Storge client credentials have not changed, no need to reset client");
-    return;
-  }
-
+std::unique_ptr<Azure::Storage::Files::DataLake::DataLakeFileSystemClient> AzureDataLakeStorageClient::createClient(
+    const AzureStorageCredentials& credentials, const std::string& file_system_name, std::optional<uint64_t> number_of_retries) {
   Azure::Storage::Files::DataLake::DataLakeClientOptions options;
   if (number_of_retries) {
     options.Retry.MaxRetries = *number_of_retries;
@@ -48,22 +44,16 @@ void AzureDataLakeStorageClient::resetClientIfNeeded(const AzureStorageCredentia
   if (credentials.getUseManagedIdentityCredentials()) {
     auto datalake_service_client = Azure::Storage::Files::DataLake::DataLakeServiceClient(
         "https://" + credentials.getStorageAccountName() + ".dfs." + credentials.getEndpointSuffix(), std::make_shared<Azure::Identity::ManagedIdentityCredential>(), options);
-    client_ = std::make_unique<Azure::Storage::Files::DataLake::DataLakeFileSystemClient>(datalake_service_client.GetFileSystemClient(file_system_name));
-    logger_->log_debug("Azure Data Lake Storge client has been reset with new managed identity credentials.");
+    return std::make_unique<Azure::Storage::Files::DataLake::DataLakeFileSystemClient>(datalake_service_client.GetFileSystemClient(file_system_name));
   } else {
-    client_ = std::make_unique<Azure::Storage::Files::DataLake::DataLakeFileSystemClient>(
+    return std::make_unique<Azure::Storage::Files::DataLake::DataLakeFileSystemClient>(
         Azure::Storage::Files::DataLake::DataLakeFileSystemClient::CreateFromConnectionString(credentials.buildConnectionString(), file_system_name, options));
-    logger_->log_debug("Azure Data Lake Storge client has been reset with new connection string credentials.");
   }
-
-  file_system_name_ = file_system_name;
-  credentials_ = credentials;
-  number_of_retries_ = number_of_retries;
 }
 
 Azure::Storage::Files::DataLake::DataLakeDirectoryClient AzureDataLakeStorageClient::getDirectoryClient(const AzureDataLakeStorageParameters& params) {
-  resetClientIfNeeded(params.credentials, params.file_system_name, params.number_of_retries);
-  return client_->GetDirectoryClient(params.directory_name);
+  auto client = createClient(params.credentials, params.file_system_name, params.number_of_retries);
+  return client->GetDirectoryClient(params.directory_name);
 }
 
 Azure::Storage::Files::DataLake::DataLakeFileClient AzureDataLakeStorageClient::getFileClient(const AzureDataLakeStorageFileOperationParameters& params) {
@@ -113,8 +103,8 @@ std::unique_ptr<io::InputStream> AzureDataLakeStorageClient::fetchFile(const Fet
 std::vector<Azure::Storage::Files::DataLake::Models::PathItem> AzureDataLakeStorageClient::listDirectory(const ListAzureDataLakeStorageParameters& params) {
   std::vector<Azure::Storage::Files::DataLake::Models::PathItem> result;
   if (params.directory_name.empty()) {
-    resetClientIfNeeded(params.credentials, params.file_system_name, params.number_of_retries);
-    for (auto page_result = client_->ListPaths(params.recurse_subdirectories); page_result.HasPage(); page_result.MoveToNextPage()) {
+    auto client = createClient(params.credentials, params.file_system_name, params.number_of_retries);
+    for (auto page_result = client->ListPaths(params.recurse_subdirectories); page_result.HasPage(); page_result.MoveToNextPage()) {
       result.insert(result.end(), page_result.Paths.begin(), page_result.Paths.end());
     }
   } else {
diff --git a/extensions/azure/storage/AzureDataLakeStorageClient.h b/extensions/azure/storage/AzureDataLakeStorageClient.h
index 308c118cc..2e9fd98ac 100644
--- a/extensions/azure/storage/AzureDataLakeStorageClient.h
+++ b/extensions/azure/storage/AzureDataLakeStorageClient.h
@@ -91,14 +91,11 @@ class AzureDataLakeStorageClient : public DataLakeStorageClient {
     Azure::Storage::Files::DataLake::Models::DownloadFileResult result_;
   };
 
-  void resetClientIfNeeded(const AzureStorageCredentials& credentials, const std::string& file_system_name, std::optional<uint64_t> number_of_retries);
-  Azure::Storage::Files::DataLake::DataLakeDirectoryClient getDirectoryClient(const AzureDataLakeStorageParameters& params);
-  Azure::Storage::Files::DataLake::DataLakeFileClient getFileClient(const AzureDataLakeStorageFileOperationParameters& params);
-
-  AzureStorageCredentials credentials_;
-  std::string file_system_name_;
-  std::optional<uint64_t> number_of_retries_;
-  std::unique_ptr<Azure::Storage::Files::DataLake::DataLakeFileSystemClient> client_;
+  static std::unique_ptr<Azure::Storage::Files::DataLake::DataLakeFileSystemClient> createClient(
+    const AzureStorageCredentials& credentials, const std::string& file_system_name, std::optional<uint64_t> number_of_retries);
+  static Azure::Storage::Files::DataLake::DataLakeDirectoryClient getDirectoryClient(const AzureDataLakeStorageParameters& params);
+  static Azure::Storage::Files::DataLake::DataLakeFileClient getFileClient(const AzureDataLakeStorageFileOperationParameters& params);
+
   std::shared_ptr<core::logging::Logger> logger_{core::logging::LoggerFactory<AzureDataLakeStorageClient>::getLogger()};
 };