You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by sz...@apache.org on 2020/10/27 13:38:31 UTC

[nifi-minifi-cpp] branch main updated: MINIFICPP-1350 Support serializers in MergeContent

This is an automated email from the ASF dual-hosted git repository.

szaszm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new 674fad7  MINIFICPP-1350 Support serializers in MergeContent
674fad7 is described below

commit 674fad7fdc082840bfd17bececb4fa5ac7678468
Author: Adam Debreceni <ad...@protonmail.com>
AuthorDate: Tue Oct 27 14:36:38 2020 +0100

    MINIFICPP-1350 Support serializers in MergeContent
    
    This closes #900
    
    Signed-off-by: Marton Szasz <sz...@gmail.com>
---
 extensions/bustache/ApplyTemplate.h                |   2 +-
 extensions/civetweb/processors/ListenHTTP.cpp      |   2 +-
 extensions/civetweb/processors/ListenHTTP.h        |   4 +-
 extensions/http-curl/client/HTTPCallback.h         |   2 +-
 .../http-curl/tests/unit/InvokeHTTPTests.cpp       |   2 +-
 extensions/jni/jvm/JniReferenceObjects.h           |   4 +-
 extensions/libarchive/CompressContent.h            |  10 +-
 extensions/libarchive/FocusArchiveEntry.cpp        |   2 +-
 extensions/libarchive/FocusArchiveEntry.h          |   2 +-
 extensions/libarchive/MergeContent.cpp             | 120 +++++++------
 extensions/libarchive/MergeContent.h               | 199 ++++++++++-----------
 extensions/libarchive/UnfocusArchiveEntry.cpp      |   2 +-
 extensions/libarchive/UnfocusArchiveEntry.h        |   2 +-
 extensions/librdkafka/PublishKafka.cpp             |   2 +-
 extensions/mqtt/processors/ConsumeMQTT.h           |   2 +-
 extensions/mqtt/processors/ConvertJSONAck.h        |   2 +-
 extensions/mqtt/processors/PublishMQTT.h           |   2 +-
 extensions/opc/include/fetchopc.h                  |   2 +-
 extensions/opc/include/putopc.h                    |   2 +-
 extensions/opc/src/putopc.cpp                      |   2 +-
 extensions/opencv/CaptureRTSPFrame.h               |   2 +-
 extensions/opencv/FrameIO.h                        |   4 +-
 .../SourceInitiatedSubscriptionListener.cpp        |   2 +-
 .../SourceInitiatedSubscriptionListener.h          |   2 +-
 extensions/script/lua/LuaProcessSession.h          |   4 +-
 extensions/script/python/PyProcessSession.h        |   4 +-
 extensions/sensors/SensorBase.h                    |   2 +-
 extensions/sftp/processors/FetchSFTP.cpp           |   2 +-
 extensions/sftp/processors/FetchSFTP.h             |   2 +-
 extensions/sftp/processors/PutSFTP.cpp             |   2 +-
 extensions/sftp/processors/PutSFTP.h               |   2 +-
 extensions/sql/data/WriteCallback.h                |   2 +-
 extensions/sqlite/ExecuteSQL.cpp                   |   2 +-
 extensions/sqlite/ExecuteSQL.h                     |   2 +-
 extensions/sqlite/PutSQL.cpp                       |   2 +-
 extensions/sqlite/PutSQL.h                         |   2 +-
 .../processors/ExecuteProcess.h                    |   2 +-
 .../standard-processors/processors/ExtractText.cpp |   2 +-
 .../standard-processors/processors/ExtractText.h   |   2 +-
 .../processors/GenerateFlowFile.h                  |   2 +-
 extensions/standard-processors/processors/GetTCP.h |   2 +-
 .../standard-processors/processors/HashContent.cpp |   2 +-
 .../standard-processors/processors/HashContent.h   |   2 +-
 .../standard-processors/processors/ListenSyslog.h  |   2 +-
 .../standard-processors/processors/LogAttribute.h  |   2 +-
 .../standard-processors/processors/PutFile.cpp     |   2 +-
 .../standard-processors/processors/PutFile.h       |   2 +-
 .../standard-processors/processors/TailFile.cpp    |   4 +-
 extensions/tensorflow/TFApplyGraph.cpp             |   6 +-
 extensions/tensorflow/TFApplyGraph.h               |   6 +-
 extensions/tensorflow/TFConvertImageToTensor.cpp   |   4 +-
 extensions/tensorflow/TFConvertImageToTensor.h     |   4 +-
 extensions/tensorflow/TFExtractTopLabels.cpp       |   4 +-
 extensions/tensorflow/TFExtractTopLabels.h         |   4 +-
 extensions/usb-camera/GetUSBCamera.cpp             |   4 +-
 extensions/usb-camera/GetUSBCamera.h               |   4 +-
 .../CollectorInitiatedSubscription.cpp             |   2 +-
 .../windows-event-log/ConsumeWindowsEventLog.cpp   |   2 +-
 libminifi/CMakeLists.txt                           |   2 +-
 libminifi/include/FlowFileRecord.h                 |  16 +-
 libminifi/include/core/ProcessSession.h            |   2 +-
 .../include/core/ProcessSessionReadCallback.h      |   2 +-
 libminifi/include/io/StreamPipe.h                  | 110 ++++++++++++
 .../include/serialization/FlowFileSerializer.h     |  39 ++--
 .../include/serialization/FlowFileV3Serializer.h   |  27 +--
 .../include/serialization/PayloadSerializer.h      |  23 +--
 libminifi/include/sitetosite/SiteToSiteClient.h    |   4 +-
 libminifi/include/utils/ByteArrayCallback.h        |   6 +-
 libminifi/include/utils/FileOutputCallback.h       |   2 +-
 libminifi/src/core/ProcessSession.cpp              |   8 +-
 libminifi/src/core/ProcessSessionReadCallback.cpp  |   2 +-
 .../src/serialization/FlowFileV3Serializer.cpp     | 113 ++++++++++++
 .../src/serialization/PayloadSerializer.cpp        |  26 +--
 libminifi/src/utils/ByteArrayCallback.cpp          |   4 +-
 libminifi/src/utils/FileOutputCallback.cpp         |   2 +-
 libminifi/test/BufferReader.h                      |   2 +-
 .../test/archive-tests/CompressContentTests.cpp    |   2 +-
 libminifi/test/archive-tests/MergeFileTests.cpp    |  99 +++++++++-
 libminifi/test/unit/FlowFileSerializationTests.cpp |  85 +++++++++
 79 files changed, 716 insertions(+), 331 deletions(-)

diff --git a/extensions/bustache/ApplyTemplate.h b/extensions/bustache/ApplyTemplate.h
index 739d02a..e743ca8 100644
--- a/extensions/bustache/ApplyTemplate.h
+++ b/extensions/bustache/ApplyTemplate.h
@@ -61,7 +61,7 @@ class ApplyTemplate : public core::Processor {
   class WriteCallback : public OutputStreamCallback {
    public:
     WriteCallback(const std::string &templateFile, const std::shared_ptr<core::FlowFile> &flow_file);
-    int64_t process(std::shared_ptr<io::BaseStream> stream);
+    int64_t process(const std::shared_ptr<io::BaseStream>& stream);
 
    private:
     std::shared_ptr<logging::Logger> logger_;
diff --git a/extensions/civetweb/processors/ListenHTTP.cpp b/extensions/civetweb/processors/ListenHTTP.cpp
index 19befa4..39b7e8e 100644
--- a/extensions/civetweb/processors/ListenHTTP.cpp
+++ b/extensions/civetweb/processors/ListenHTTP.cpp
@@ -503,7 +503,7 @@ ListenHTTP::WriteCallback::WriteCallback(std::unique_ptr<io::BufferStream> reque
     : request_content_(std::move(request_content)) {
 }
 
-int64_t ListenHTTP::WriteCallback::process(std::shared_ptr<io::BaseStream> stream) {
+int64_t ListenHTTP::WriteCallback::process(const std::shared_ptr<io::BaseStream>& stream) {
   return stream->write(const_cast<uint8_t*>(request_content_->getBuffer()), request_content_->size());
 }
 
diff --git a/extensions/civetweb/processors/ListenHTTP.h b/extensions/civetweb/processors/ListenHTTP.h
index c774b5e..1904e20 100644
--- a/extensions/civetweb/processors/ListenHTTP.h
+++ b/extensions/civetweb/processors/ListenHTTP.h
@@ -129,7 +129,7 @@ class ListenHTTP : public core::Processor {
     explicit ResponseBodyReadCallback(std::string *out_str)
         : out_str_(out_str) {
     }
-    int64_t process(std::shared_ptr<io::BaseStream> stream) override {
+    int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
       out_str_->resize(stream->size());
       uint64_t num_read = stream->read(reinterpret_cast<uint8_t *>(&(*out_str_)[0]),
                                            gsl::narrow<int>(stream->size()));
@@ -149,7 +149,7 @@ class ListenHTTP : public core::Processor {
   class WriteCallback : public OutputStreamCallback {
    public:
     WriteCallback(std::unique_ptr<io::BufferStream>);
-    int64_t process(std::shared_ptr<io::BaseStream> stream) override;
+    int64_t process(const std::shared_ptr<io::BaseStream>& stream) override;
 
    private:
     std::unique_ptr<io::BufferStream> request_content_;
diff --git a/extensions/http-curl/client/HTTPCallback.h b/extensions/http-curl/client/HTTPCallback.h
index 5415979..827de10 100644
--- a/extensions/http-curl/client/HTTPCallback.h
+++ b/extensions/http-curl/client/HTTPCallback.h
@@ -78,7 +78,7 @@ class HttpStreamingCallback : public ByteInputCallBack {
     seekInner(lock, pos);
   }
 
-  int64_t process(std::shared_ptr<io::BaseStream> stream) override {
+  int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
     std::vector<char> vec;
 
     if (stream->size() > 0) {
diff --git a/extensions/http-curl/tests/unit/InvokeHTTPTests.cpp b/extensions/http-curl/tests/unit/InvokeHTTPTests.cpp
index 4639f3e..14cd7f0 100644
--- a/extensions/http-curl/tests/unit/InvokeHTTPTests.cpp
+++ b/extensions/http-curl/tests/unit/InvokeHTTPTests.cpp
@@ -150,7 +150,7 @@ class CallBack : public minifi::OutputStreamCallback {
   }
   virtual ~CallBack() {
   }
-  virtual int64_t process(std::shared_ptr<minifi::io::BaseStream> stream) {
+  virtual int64_t process(const std::shared_ptr<minifi::io::BaseStream>& stream) {
     // leaving the typo for posterity sake
     std::string st = "we're gnna write some test stuff";
     return stream->write(reinterpret_cast<uint8_t*>(const_cast<char*>(st.c_str())), st.length());
diff --git a/extensions/jni/jvm/JniReferenceObjects.h b/extensions/jni/jvm/JniReferenceObjects.h
index 2e9205b..7f50744 100644
--- a/extensions/jni/jvm/JniReferenceObjects.h
+++ b/extensions/jni/jvm/JniReferenceObjects.h
@@ -103,7 +103,7 @@ class JniByteOutStream : public minifi::OutputStreamCallback {
   }
 
   virtual ~JniByteOutStream() = default;
-  virtual int64_t process(std::shared_ptr<minifi::io::BaseStream> stream) {
+  virtual int64_t process(const std::shared_ptr<minifi::io::BaseStream>& stream) {
     return stream->write((uint8_t*) bytes_, length_);
   }
  private:
@@ -126,7 +126,7 @@ class JniByteInputStream : public minifi::InputStreamCallback {
     if (buffer_)
       delete[] buffer_;
   }
-  int64_t process(std::shared_ptr<minifi::io::BaseStream> stream) {
+  int64_t process(const std::shared_ptr<minifi::io::BaseStream>& stream) {
     stream_ = stream;
     return 0;
   }
diff --git a/extensions/libarchive/CompressContent.h b/extensions/libarchive/CompressContent.h
index 66541de..8cda1d6 100644
--- a/extensions/libarchive/CompressContent.h
+++ b/extensions/libarchive/CompressContent.h
@@ -85,7 +85,7 @@ public:
         flow_(flow), arch_(arch), entry_(entry), status_(0), logger_(logging::LoggerFactory<CompressContent>::getLogger()) {
     }
     ~ReadCallbackCompress() = default;
-    int64_t process(std::shared_ptr<io::BaseStream> stream) {
+    int64_t process(const std::shared_ptr<io::BaseStream>& stream) {
       uint8_t buffer[4096U];
       int64_t ret = 0;
       uint64_t read_size = 0;
@@ -130,7 +130,7 @@ public:
       origin_offset_ = flow_->getOffset();
     }
     ~ReadCallbackDecompress() = default;
-    int64_t process(std::shared_ptr<io::BaseStream> stream) {
+    int64_t process(const std::shared_ptr<io::BaseStream>& stream) {
       read_size_ = 0;
       stream->seek(offset_);
       int readRet = stream->read(buffer_, sizeof(buffer_));
@@ -208,7 +208,7 @@ public:
       archive_read_free(arch);
     }
 
-    int64_t process(std::shared_ptr<io::BaseStream> stream) {
+    int64_t process(const std::shared_ptr<io::BaseStream>& stream) {
       struct archive *arch;
       int r;
 
@@ -361,7 +361,7 @@ public:
     std::shared_ptr<core::ProcessSession> session_;
     bool success_{false};
 
-    int64_t process(std::shared_ptr<io::BaseStream> outputStream) override {
+    int64_t process(const std::shared_ptr<io::BaseStream>& outputStream) override {
       class ReadCallback : public InputStreamCallback {
        public:
         ReadCallback(GzipWriteCallback& writer, std::shared_ptr<io::OutputStream> outputStream)
@@ -369,7 +369,7 @@ public:
           , outputStream_(std::move(outputStream)) {
         }
 
-        int64_t process(std::shared_ptr<io::BaseStream> inputStream) override {
+        int64_t process(const std::shared_ptr<io::BaseStream>& inputStream) override {
           std::vector<uint8_t> buffer(16 * 1024U);
           int64_t read_size = 0;
           while (read_size < gsl::narrow<int64_t>(writer_.flow_->getSize())) {
diff --git a/extensions/libarchive/FocusArchiveEntry.cpp b/extensions/libarchive/FocusArchiveEntry.cpp
index ccce67c..ebe1721 100644
--- a/extensions/libarchive/FocusArchiveEntry.cpp
+++ b/extensions/libarchive/FocusArchiveEntry.cpp
@@ -166,7 +166,7 @@ la_ssize_t FocusArchiveEntry::ReadCallback::read_cb(struct archive * a, void *d,
   return read;
 }
 
-int64_t FocusArchiveEntry::ReadCallback::process(std::shared_ptr<io::BaseStream> stream) {
+int64_t FocusArchiveEntry::ReadCallback::process(const std::shared_ptr<io::BaseStream>& stream) {
   auto inputArchive = archive_read_new();
   struct archive_entry *entry;
   int64_t nlen = 0;
diff --git a/extensions/libarchive/FocusArchiveEntry.h b/extensions/libarchive/FocusArchiveEntry.h
index 8603f9d..e475c75 100644
--- a/extensions/libarchive/FocusArchiveEntry.h
+++ b/extensions/libarchive/FocusArchiveEntry.h
@@ -71,7 +71,7 @@ class FocusArchiveEntry : public core::Processor {
    public:
     explicit ReadCallback(core::Processor*, fileutils::FileManager *file_man, ArchiveMetadata *archiveMetadata);
     ~ReadCallback();
-    virtual int64_t process(std::shared_ptr<io::BaseStream> stream);
+    virtual int64_t process(const std::shared_ptr<io::BaseStream>& stream);
     bool isRunning() {return proc_->isRunning();}
 
    private:
diff --git a/extensions/libarchive/MergeContent.cpp b/extensions/libarchive/MergeContent.cpp
index 1b195c2..e702eb5 100644
--- a/extensions/libarchive/MergeContent.cpp
+++ b/extensions/libarchive/MergeContent.cpp
@@ -34,6 +34,8 @@
 #include "utils/GeneralUtils.h"
 #include "core/ProcessContext.h"
 #include "core/ProcessSession.h"
+#include "serialization/PayloadSerializer.h"
+#include "serialization/FlowFileV3Serializer.h"
 
 namespace org {
 namespace apache {
@@ -49,7 +51,11 @@ core::Property MergeContent::MergeStrategy(
 core::Property MergeContent::MergeFormat(
   core::PropertyBuilder::createProperty("Merge Format")
   ->withDescription("Merge Format")
-  ->withAllowableValues<std::string>({merge_content_options::MERGE_FORMAT_CONCAT_VALUE, merge_content_options::MERGE_FORMAT_TAR_VALUE, merge_content_options::MERGE_FORMAT_ZIP_VALUE})
+  ->withAllowableValues<std::string>({
+      merge_content_options::MERGE_FORMAT_CONCAT_VALUE,
+      merge_content_options::MERGE_FORMAT_TAR_VALUE,
+      merge_content_options::MERGE_FORMAT_ZIP_VALUE,
+      merge_content_options::MERGE_FORMAT_FLOWFILE_STREAM_V3_VALUE})
   ->withDefaultValue(merge_content_options::MERGE_FORMAT_CONCAT_VALUE)->build());
 core::Property MergeContent::CorrelationAttributeName("Correlation Attribute Name", "Correlation Attribute Name", "");
 core::Property MergeContent::DelimiterStrategy(
@@ -73,9 +79,6 @@ core::Property MergeContent::AttributeStrategy(
   ->withAllowableValues<std::string>({merge_content_options::ATTRIBUTE_STRATEGY_KEEP_COMMON, merge_content_options::ATTRIBUTE_STRATEGY_KEEP_ALL_UNIQUE})
   ->withDefaultValue(merge_content_options::ATTRIBUTE_STRATEGY_KEEP_COMMON)->build());
 core::Relationship MergeContent::Merge("merged", "The FlowFile containing the merged content");
-const char *BinaryConcatenationMerge::mimeType = "application/octet-stream";
-const char *TarMerge::mimeType = "application/tar";
-const char *ZipMerge::mimeType = "application/zip";
 
 void MergeContent::initialize() {
   // Set the supported properties
@@ -118,35 +121,17 @@ std::string MergeContent::readContent(std::string path) {
 }
 
 void MergeContent::onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory) {
-  std::string value;
   BinFiles::onSchedule(context, sessionFactory);
-  if (context->getProperty(MergeStrategy.getName(), value) && !value.empty()) {
-    mergeStrategy_ = value;
-  }
-  if (context->getProperty(MergeFormat.getName(), value) && !value.empty()) {
-    mergeFormat_ = value;
-  }
-  if (context->getProperty(CorrelationAttributeName.getName(), value) && !value.empty()) {
-    correlationAttributeName_ = value;
-  }
-  if (context->getProperty(DelimiterStrategy.getName(), value) && !value.empty()) {
-    delimiterStrategy_ = value;
-  }
-  if (context->getProperty(Header.getName(), value) && !value.empty()) {
-    header_ = value;
-  }
-  if (context->getProperty(Footer.getName(), value) && !value.empty()) {
-    footer_ = value;
-  }
-  if (context->getProperty(Demarcator.getName(), value) && !value.empty()) {
-    demarcator_ = value;
-  }
-  if (context->getProperty(KeepPath.getName(), value) && !value.empty()) {
-    org::apache::nifi::minifi::utils::StringUtils::StringToBool(value, keepPath_);
-  }
-  if (context->getProperty(AttributeStrategy.getName(), value) && !value.empty()) {
-    attributeStrategy_ = value;
-  }
+
+  context->getProperty(MergeStrategy.getName(), mergeStrategy_);
+  context->getProperty(MergeFormat.getName(), mergeFormat_);
+  context->getProperty(CorrelationAttributeName.getName(), correlationAttributeName_);
+  context->getProperty(DelimiterStrategy.getName(), delimiterStrategy_);
+  context->getProperty(Header.getName(), header_);
+  context->getProperty(Footer.getName(), footer_);
+  context->getProperty(Demarcator.getName(), demarcator_);
+  context->getProperty(KeepPath.getName(), keepPath_);
+  context->getProperty(AttributeStrategy.getName(), attributeStrategy_);
 
   validatePropertyOptions();
 
@@ -155,6 +140,19 @@ void MergeContent::onSchedule(core::ProcessContext *context, core::ProcessSessio
   }
   logger_->log_debug("Merge Content: Strategy [%s] Format [%s] Correlation Attribute [%s] Delimiter [%s]", mergeStrategy_, mergeFormat_, correlationAttributeName_, delimiterStrategy_);
   logger_->log_debug("Merge Content: Footer [%s] Header [%s] Demarcator [%s] KeepPath [%d]", footer_, header_, demarcator_, keepPath_);
+
+  if (mergeFormat_ != merge_content_options::MERGE_FORMAT_CONCAT_VALUE) {
+    if (!header_.empty()) {
+      logger_->log_warn("Header property only works with the Binary Concatenation format, value [%s] is ignored", header_);
+    }
+    if (!footer_.empty()) {
+      logger_->log_warn("Footer property only works with the Binary Concatenation format, value [%s] is ignored", footer_);
+    }
+    if (!demarcator_.empty()) {
+      logger_->log_warn("Demarcator property only works with the Binary Concatenation format, value [%s] is ignored", demarcator_);
+    }
+  }
+
   if (delimiterStrategy_ == merge_content_options::DELIMITER_STRATEGY_FILENAME) {
     if (!header_.empty()) {
       headerContent_ = readContent(header_);
@@ -182,7 +180,8 @@ void MergeContent::validatePropertyOptions() {
 
   if (mergeFormat_ != merge_content_options::MERGE_FORMAT_CONCAT_VALUE &&
       mergeFormat_ != merge_content_options::MERGE_FORMAT_TAR_VALUE &&
-      mergeFormat_ != merge_content_options::MERGE_FORMAT_ZIP_VALUE) {
+      mergeFormat_ != merge_content_options::MERGE_FORMAT_ZIP_VALUE &&
+      mergeFormat_ != merge_content_options::MERGE_FORMAT_FLOWFILE_STREAM_V3_VALUE) {
     logger_->log_error("Merge format not supported %s", mergeFormat_);
     throw minifi::Exception(ExceptionType::PROCESSOR_EXCEPTION, "Invalid merge format: " + mergeFormat_);
   }
@@ -298,20 +297,36 @@ bool MergeContent::processBin(core::ProcessContext *context, core::ProcessSessio
     return false;
   }
 
+  auto flowFileReader = [&] (const std::shared_ptr<core::FlowFile>& ff, InputStreamCallback* cb) {
+    return session->read(ff, cb);
+  };
+
+  const char* mimeType;
   std::unique_ptr<MergeBin> mergeBin;
-  if (mergeFormat_ == merge_content_options::MERGE_FORMAT_CONCAT_VALUE)
-    mergeBin = utils::make_unique<BinaryConcatenationMerge>();
-  else if (mergeFormat_ == merge_content_options::MERGE_FORMAT_TAR_VALUE)
+  std::unique_ptr<minifi::FlowFileSerializer> serializer = utils::make_unique<PayloadSerializer>(flowFileReader);
+  if (mergeFormat_ == merge_content_options::MERGE_FORMAT_CONCAT_VALUE) {
+    mergeBin = utils::make_unique<BinaryConcatenationMerge>(headerContent_, footerContent_, demarcatorContent_);
+    mimeType = "application/octet-stream";
+  } else if (mergeFormat_ == merge_content_options::MERGE_FORMAT_FLOWFILE_STREAM_V3_VALUE) {
+    // disregard header, demarcator, footer
+    mergeBin = utils::make_unique<BinaryConcatenationMerge>("", "", "");
+    serializer = utils::make_unique<FlowFileV3Serializer>(flowFileReader);
+    mimeType = "application/flowfile-v3";
+  } else if (mergeFormat_ == merge_content_options::MERGE_FORMAT_TAR_VALUE) {
     mergeBin = utils::make_unique<TarMerge>();
-  else if (mergeFormat_ == merge_content_options::MERGE_FORMAT_ZIP_VALUE)
+    mimeType = "application/tar";
+  } else if (mergeFormat_ == merge_content_options::MERGE_FORMAT_ZIP_VALUE) {
     mergeBin = utils::make_unique<ZipMerge>();
-  else {
+    mimeType = "application/zip";
+  } else {
     logger_->log_error("Merge format not supported %s", mergeFormat_);
     return false;
   }
 
+  std::shared_ptr<core::FlowFile> mergeFlow;
   try {
-    mergeBin->merge(context, session, bin->getFlowFile(), headerContent_, footerContent_, demarcatorContent_, merge_flow);
+    mergeBin->merge(context, session, bin->getFlowFile(), *serializer, merge_flow);
+    session->putAttribute(merge_flow, core::SpecialFlowAttribute::MIME_TYPE, mimeType);
   } catch (...) {
     logger_->log_error("Merge Content merge catch exception");
     return false;
@@ -329,12 +344,15 @@ bool MergeContent::processBin(core::ProcessContext *context, core::ProcessSessio
   return true;
 }
 
-void BinaryConcatenationMerge::merge(core::ProcessContext*, core::ProcessSession *session,
-    std::deque<std::shared_ptr<core::FlowFile>> &flows, std::string &header, std::string &footer, std::string &demarcator,
-    const std::shared_ptr<core::FlowFile> &merge_flow) {
-  BinaryConcatenationMerge::WriteCallback callback(header, footer, demarcator, flows, session);
+BinaryConcatenationMerge::BinaryConcatenationMerge(const std::string &header, const std::string& footer, const std::string &demarcator)
+  : header_(header),
+    footer_(footer),
+    demarcator_(demarcator) {}
+
+void BinaryConcatenationMerge::merge(core::ProcessContext *context, core::ProcessSession *session,
+    std::deque<std::shared_ptr<core::FlowFile>> &flows, FlowFileSerializer& serializer, const std::shared_ptr<core::FlowFile>& merge_flow) {
+  BinaryConcatenationMerge::WriteCallback callback(header_, footer_, demarcator_, flows, serializer);
   session->write(merge_flow, &callback);
-  session->putAttribute(merge_flow, core::SpecialFlowAttribute::MIME_TYPE, getMergedContentType());
   std::string fileName;
   if (flows.size() == 1) {
     flows.front()->getAttribute(core::SpecialFlowAttribute::FILENAME, fileName);
@@ -345,11 +363,10 @@ void BinaryConcatenationMerge::merge(core::ProcessContext*, core::ProcessSession
     session->putAttribute(merge_flow, core::SpecialFlowAttribute::FILENAME, fileName);
 }
 
-void TarMerge::merge(core::ProcessContext*, core::ProcessSession *session, std::deque<std::shared_ptr<core::FlowFile>> &flows, std::string&,
-    std::string&, std::string&, const std::shared_ptr<core::FlowFile> &merge_flow) {
-  ArchiveMerge::WriteCallback callback(std::string(merge_content_options::MERGE_FORMAT_TAR_VALUE), flows, session);
+void TarMerge::merge(core::ProcessContext *context, core::ProcessSession *session,
+    std::deque<std::shared_ptr<core::FlowFile>> &flows, FlowFileSerializer& serializer, const std::shared_ptr<core::FlowFile>& merge_flow) {
+  ArchiveMerge::WriteCallback callback(std::string(merge_content_options::MERGE_FORMAT_TAR_VALUE), flows, serializer);
   session->write(merge_flow, &callback);
-  session->putAttribute(merge_flow, core::SpecialFlowAttribute::MIME_TYPE, getMergedContentType());
   std::string fileName;
   merge_flow->getAttribute(core::SpecialFlowAttribute::FILENAME, fileName);
   if (flows.size() == 1) {
@@ -363,11 +380,10 @@ void TarMerge::merge(core::ProcessContext*, core::ProcessSession *session, std::
   }
 }
 
-void ZipMerge::merge(core::ProcessContext*, core::ProcessSession *session, std::deque<std::shared_ptr<core::FlowFile>> &flows, std::string&,
-    std::string&, std::string&, const std::shared_ptr<core::FlowFile> &merge_flow) {
-  ArchiveMerge::WriteCallback callback(std::string(merge_content_options::MERGE_FORMAT_ZIP_VALUE), flows, session);
+void ZipMerge::merge(core::ProcessContext *context, core::ProcessSession *session,
+    std::deque<std::shared_ptr<core::FlowFile>> &flows, FlowFileSerializer& serializer, const std::shared_ptr<core::FlowFile>& merge_flow) {
+  ArchiveMerge::WriteCallback callback(std::string(merge_content_options::MERGE_FORMAT_ZIP_VALUE), flows, serializer);
   session->write(merge_flow, &callback);
-  session->putAttribute(merge_flow, core::SpecialFlowAttribute::MIME_TYPE, getMergedContentType());
   std::string fileName;
   merge_flow->getAttribute(core::SpecialFlowAttribute::FILENAME, fileName);
   if (flows.size() == 1) {
diff --git a/extensions/libarchive/MergeContent.h b/extensions/libarchive/MergeContent.h
index 99cf457..cfa067a 100644
--- a/extensions/libarchive/MergeContent.h
+++ b/extensions/libarchive/MergeContent.h
@@ -25,6 +25,7 @@
 #include "archive_entry.h"
 #include "archive.h"
 #include "core/logging/LoggerConfiguration.h"
+#include "serialization/FlowFileSerializer.h"
 
 namespace org {
 namespace apache {
@@ -38,11 +39,8 @@ constexpr const char *MERGE_STRATEGY_BIN_PACK = "Bin-Packing Algorithm";
 constexpr const char *MERGE_STRATEGY_DEFRAGMENT = "Defragment";
 constexpr const char *MERGE_FORMAT_TAR_VALUE = "TAR";
 constexpr const char *MERGE_FORMAT_ZIP_VALUE = "ZIP";
-constexpr const char *MERGE_FORMAT_FLOWFILE_STREAM_V3_VALUE = "FlowFile Stream, v3";
-constexpr const char *MERGE_FORMAT_FLOWFILE_STREAM_V2_VALUE = "FlowFile Stream, v2";
-constexpr const char *MERGE_FORMAT_FLOWFILE_TAR_V1_VALUE = "FlowFile Tar, v1";
 constexpr const char *MERGE_FORMAT_CONCAT_VALUE = "Binary Concatenation";
-constexpr const char *MERGE_FORMAT_AVRO_VALUE = "Avro";
+constexpr const char* MERGE_FORMAT_FLOWFILE_STREAM_V3_VALUE = "FlowFile Stream, v3";
 constexpr const char *DELIMITER_STRATEGY_FILENAME = "Filename";
 constexpr const char *DELIMITER_STRATEGY_TEXT = "Text";
 constexpr const char *ATTRIBUTE_STRATEGY_KEEP_COMMON = "Keep Only Common Attributes";
@@ -52,63 +50,33 @@ constexpr const char *ATTRIBUTE_STRATEGY_KEEP_ALL_UNIQUE = "Keep All Unique Attr
 
 // MergeBin Class
 class MergeBin {
-public:
-
+ public:
   virtual ~MergeBin() = default;
-  virtual std::string getMergedContentType() = 0;
   // merge the flows in the bin
   virtual void merge(core::ProcessContext *context, core::ProcessSession *session,
-      std::deque<std::shared_ptr<core::FlowFile>> &flows, std::string &header, std::string &footer, std::string &demarcator,
-      const std::shared_ptr<core::FlowFile> &flowFile) = 0;
+      std::deque<std::shared_ptr<core::FlowFile>> &flows, FlowFileSerializer& serializer, const std::shared_ptr<core::FlowFile> &flowFile) = 0;
 };
 
 // BinaryConcatenationMerge Class
 class BinaryConcatenationMerge : public MergeBin {
-public:
-  static const char *mimeType;
-  std::string getMergedContentType() override {
-    return mimeType;
-  }
-  void merge(
-    core::ProcessContext *context, core::ProcessSession *session, std::deque<std::shared_ptr<core::FlowFile>> &flows,
-    std::string &header, std::string &footer, std::string &demarcator, const std::shared_ptr<core::FlowFile> &flowFile) override;
-  // Nest Callback Class for read stream
-  class ReadCallback : public InputStreamCallback {
-   public:
-    ReadCallback(uint64_t size, std::shared_ptr<io::BaseStream> stream)
-        : buffer_size_(size), stream_(stream) {
-    }
-    ~ReadCallback() = default;
-    int64_t process(std::shared_ptr<io::BaseStream> stream) {
-      uint8_t buffer[4096U];
-      int64_t ret = 0;
-      uint64_t read_size = 0;
-      while (read_size < buffer_size_) {
-        int readRet = stream->read(buffer, sizeof(buffer));
-        if (readRet > 0) {
-          ret += stream_->write(buffer, readRet);
-          read_size += readRet;
-        } else {
-          break;
-        }
-      }
-      return ret;
-    }
-    uint64_t buffer_size_;
-    std::shared_ptr<io::BaseStream> stream_;
-  };
+ public:
+  BinaryConcatenationMerge(const std::string& header, const std::string& footer, const std::string& demarcator);
+
+  void merge(core::ProcessContext *context, core::ProcessSession *session,
+      std::deque<std::shared_ptr<core::FlowFile>> &flows, FlowFileSerializer& serializer, const std::shared_ptr<core::FlowFile> &flowFile);
   // Nest Callback Class for write stream
   class WriteCallback: public OutputStreamCallback {
-  public:
-    WriteCallback(std::string &header, std::string &footer, std::string &demarcator, std::deque<std::shared_ptr<core::FlowFile>> &flows, core::ProcessSession *session) :
-      header_(header), footer_(footer), demarcator_(demarcator), flows_(flows), session_(session) {
+   public:
+    WriteCallback(std::string &header, std::string &footer, std::string &demarcator,
+        std::deque<std::shared_ptr<core::FlowFile>> &flows, FlowFileSerializer& serializer) :
+      header_(header), footer_(footer), demarcator_(demarcator), flows_(flows), serializer_(serializer) {
     }
     std::string &header_;
     std::string &footer_;
     std::string &demarcator_;
     std::deque<std::shared_ptr<core::FlowFile>> &flows_;
-    core::ProcessSession *session_;
-    int64_t process(std::shared_ptr<io::BaseStream> stream) {
+    FlowFileSerializer& serializer_;
+    int64_t process(const std::shared_ptr<io::BaseStream>& stream) {
       int64_t ret = 0;
       if (!header_.empty()) {
         int64_t len = stream->write(reinterpret_cast<uint8_t*>(const_cast<char*>(header_.data())), header_.size());
@@ -124,9 +92,10 @@ public:
             return len;
           ret += len;
         }
-        ReadCallback readCb(flow->getSize(), stream);
-        session_->read(flow, &readCb);
-        ret += flow->getSize();
+        int len = serializer_.serialize(flow, stream);
+        if (len < 0)
+          return len;
+        ret += len;
         isFirst = false;
       }
       if (!footer_.empty()) {
@@ -138,46 +107,54 @@ public:
       return ret;
     }
   };
+
+ private:
+  std::string header_;
+  std::string footer_;
+  std::string demarcator_;
 };
 
 
 // Archive Class
 class ArchiveMerge {
-public:
-  // Nest Callback Class for read stream
-  class ReadCallback: public InputStreamCallback {
-  public:
-    ReadCallback(uint64_t size, struct archive *arch, struct archive_entry *entry) :
-        buffer_size_(size), arch_(arch), entry_(entry) {
-    }
-    ~ReadCallback() = default;
-    int64_t process(std::shared_ptr<io::BaseStream> stream) {
-      uint8_t buffer[4096U];
-      int64_t ret = 0;
-      uint64_t read_size = 0;
-      ret = archive_write_header(arch_, entry_);
-      while (read_size < buffer_size_) {
-        int readRet = stream->read(buffer, sizeof(buffer));
-        if (readRet > 0) {
-          ret += archive_write_data(arch_, buffer, readRet);
-          read_size += readRet;
+ public:
+  class ArchiveWriter : public io::OutputStream {
+   public:
+    ArchiveWriter(struct archive *arch, struct archive_entry *entry) : arch_(arch), entry_(entry) {}
+    int write(const uint8_t* data, int size) override {
+      if (!header_emitted_) {
+        if (archive_write_header(arch_, entry_) != ARCHIVE_OK) {
+          return -1;
+        }
+        header_emitted_ = true;
+      }
+      int totalWrote = 0;
+      int remaining = size;
+      while (remaining > 0) {
+        int ret = archive_write_data(arch_, data + totalWrote, remaining);
+        if (ret < 0) {
+          return ret;
         }
-        else {
+        if (ret == 0) {
           break;
         }
+        totalWrote += ret;
+        remaining -= ret;
       }
-      return ret;
+      return totalWrote;
     }
-    uint64_t buffer_size_;
+
+   private:
     struct archive *arch_;
     struct archive_entry *entry_;
+    bool header_emitted_{false};
   };
   // Nest Callback Class for write stream
   class WriteCallback: public OutputStreamCallback {
-  public:
-    WriteCallback(std::string merge_type, std::deque<std::shared_ptr<core::FlowFile>> &flows, core::ProcessSession *session) :
-        merge_type_(merge_type), flows_(flows), session_(session),
-        logger_(logging::LoggerFactory<ArchiveMerge>::getLogger()) {
+   public:
+    WriteCallback(std::string merge_type, std::deque<std::shared_ptr<core::FlowFile>> &flows, FlowFileSerializer& serializer)
+        : merge_type_(merge_type), flows_(flows), serializer_(serializer),
+          logger_(logging::LoggerFactory<ArchiveMerge>::getLogger()) {
       size_ = 0;
       stream_ = nullptr;
     }
@@ -185,20 +162,33 @@ public:
 
     std::string merge_type_;
     std::deque<std::shared_ptr<core::FlowFile>> &flows_;
-    core::ProcessSession *session_;
     std::shared_ptr<io::BaseStream> stream_;
-    int64_t size_;
+    size_t size_;
     std::shared_ptr<logging::Logger> logger_;
+    FlowFileSerializer& serializer_;
 
     static la_ssize_t archive_write(struct archive *arch, void *context, const void *buff, size_t size) {
       WriteCallback *callback = (WriteCallback *) context;
-      la_ssize_t ret = callback->stream_->write(reinterpret_cast<uint8_t*>(const_cast<void*>(buff)), size);
-      if (ret > 0)
-        callback->size_ += (int64_t) ret;
-      return ret;
+      uint8_t* data = reinterpret_cast<uint8_t*>(const_cast<void*>(buff));
+      la_ssize_t totalWrote = 0;
+      size_t remaining = size;
+      while (remaining > 0) {
+        la_ssize_t ret = callback->stream_->write(data + totalWrote, remaining);
+        if (ret < 0) {
+          // libarchive expects us to return -1 on error
+          return -1;
+        }
+        if (ret == 0) {
+          break;
+        }
+        callback->size_ += ret;
+        totalWrote += ret;
+        remaining -= ret;
+      }
+      return totalWrote;
     }
 
-    int64_t process(std::shared_ptr<io::BaseStream> stream) {
+    int64_t process(const std::shared_ptr<io::BaseStream>& stream) {
       struct archive *arch;
 
       arch = archive_write_new();
@@ -232,8 +222,10 @@ public:
             }
           }
         }
-        ReadCallback readCb(flow->getSize(), arch, entry);
-        session_->read(flow, &readCb);
+        int ret = serializer_.serialize(flow, std::make_shared<ArchiveWriter>(arch, entry));
+        if (ret < 0) {
+          return ret;
+        }
         archive_entry_free(entry);
       }
 
@@ -246,33 +238,26 @@ public:
 
 // TarMerge Class
 class TarMerge: public ArchiveMerge, public MergeBin {
-public:
-  static const char *mimeType;
-  void merge(core::ProcessContext *context, core::ProcessSession *session, std::deque<std::shared_ptr<core::FlowFile>> &flows, std::string &header, std::string &footer,
-    std::string &demarcator, const std::shared_ptr<core::FlowFile> &flowFile) override;
-  std::string getMergedContentType() override {
-    return mimeType;
-  }
+ public:
+  void merge(core::ProcessContext *context, core::ProcessSession *session, std::deque<std::shared_ptr<core::FlowFile>> &flows,
+             FlowFileSerializer& serializer, const std::shared_ptr<core::FlowFile> &merge_flow) override;
 };
 
 // ZipMerge Class
 class ZipMerge: public ArchiveMerge, public MergeBin {
-public:
-  static const char *mimeType;
-  void merge(core::ProcessContext *context, core::ProcessSession *session, std::deque<std::shared_ptr<core::FlowFile>> &flows, std::string &header, std::string &footer,
-    std::string &demarcator, const std::shared_ptr<core::FlowFile> &flowFile) override;
-  std::string getMergedContentType() override {
-    return mimeType;
-  }
+ public:
+  void merge(core::ProcessContext *context, core::ProcessSession *session, std::deque<std::shared_ptr<core::FlowFile>> &flows,
+             FlowFileSerializer& serializer, const std::shared_ptr<core::FlowFile> &merge_flow) override;
 };
 
 class AttributeMerger {
-public:
+ public:
   explicit AttributeMerger(std::deque<std::shared_ptr<org::apache::nifi::minifi::core::FlowFile>> &flows)
     : flows_(flows) {}
   void mergeAttributes(core::ProcessSession *session, const std::shared_ptr<core::FlowFile> &merge_flow);
   virtual ~AttributeMerger() = default;
-protected:
+
+ protected:
   std::map<std::string, std::string> getMergedAttributes();
   virtual void processFlowFile(const std::shared_ptr<core::FlowFile> &flow_file, std::map<std::string, std::string> &merged_attributes) = 0;
 
@@ -280,27 +265,29 @@ protected:
 };
 
 class KeepOnlyCommonAttributesMerger: public AttributeMerger {
-public:
+ public:
   explicit KeepOnlyCommonAttributesMerger(std::deque<std::shared_ptr<org::apache::nifi::minifi::core::FlowFile>> &flows)
     : AttributeMerger(flows) {}
-protected:
+
+ protected:
   void processFlowFile(const std::shared_ptr<core::FlowFile> &flow_file, std::map<std::string, std::string> &merged_attributes) override;
 };
 
 class KeepAllUniqueAttributesMerger: public AttributeMerger {
-public:
+ public:
   explicit KeepAllUniqueAttributesMerger(std::deque<std::shared_ptr<org::apache::nifi::minifi::core::FlowFile>> &flows)
     : AttributeMerger(flows) {}
-protected:
+
+ protected:
   void processFlowFile(const std::shared_ptr<core::FlowFile> &flow_file, std::map<std::string, std::string> &merged_attributes) override;
 
-private:
+ private:
   std::vector<std::string> removed_attributes_;
 };
 
 // MergeContent Class
 class MergeContent : public processors::BinFiles {
-public:
+ public:
   // Constructor
   /*!
    * Create a new processor
diff --git a/extensions/libarchive/UnfocusArchiveEntry.cpp b/extensions/libarchive/UnfocusArchiveEntry.cpp
index 16aec91..1569c34 100644
--- a/extensions/libarchive/UnfocusArchiveEntry.cpp
+++ b/extensions/libarchive/UnfocusArchiveEntry.cpp
@@ -158,7 +158,7 @@ la_ssize_t UnfocusArchiveEntry::WriteCallback::write_cb(struct archive *, void *
   return data->stream->write(const_cast<uint8_t*>(ui_buffer), length);
 }
 
-int64_t UnfocusArchiveEntry::WriteCallback::process(std::shared_ptr<io::BaseStream> stream) {
+int64_t UnfocusArchiveEntry::WriteCallback::process(const std::shared_ptr<io::BaseStream>& stream) {
   auto outputArchive = archive_write_new();
   int64_t nlen = 0;
 
diff --git a/extensions/libarchive/UnfocusArchiveEntry.h b/extensions/libarchive/UnfocusArchiveEntry.h
index 9ed5809..904455d 100644
--- a/extensions/libarchive/UnfocusArchiveEntry.h
+++ b/extensions/libarchive/UnfocusArchiveEntry.h
@@ -70,7 +70,7 @@ class UnfocusArchiveEntry : public core::Processor {
   class WriteCallback : public OutputStreamCallback {
    public:
     explicit WriteCallback(ArchiveMetadata *archiveMetadata);
-    int64_t process(std::shared_ptr<io::BaseStream> stream);
+    int64_t process(const std::shared_ptr<io::BaseStream>& stream);
    private:
     //! Logger
     std::shared_ptr<Logger> logger_;
diff --git a/extensions/librdkafka/PublishKafka.cpp b/extensions/librdkafka/PublishKafka.cpp
index 5b007a3..30c8847 100644
--- a/extensions/librdkafka/PublishKafka.cpp
+++ b/extensions/librdkafka/PublishKafka.cpp
@@ -377,7 +377,7 @@ class ReadCallback : public InputStreamCallback {
   ReadCallback(const ReadCallback&) = delete;
   ReadCallback& operator=(ReadCallback) = delete;
 
-  int64_t process(const std::shared_ptr<io::BaseStream> stream) override {
+  int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
     std::vector<unsigned char> buffer;
 
     buffer.resize(max_seg_size_);
diff --git a/extensions/mqtt/processors/ConsumeMQTT.h b/extensions/mqtt/processors/ConsumeMQTT.h
index 18c0b33..b4e39a1 100644
--- a/extensions/mqtt/processors/ConsumeMQTT.h
+++ b/extensions/mqtt/processors/ConsumeMQTT.h
@@ -79,7 +79,7 @@ class ConsumeMQTT : public processors::AbstractMQTTProcessor {
       status_ = 0;
     }
     MQTTClient_message *message_;
-    int64_t process(std::shared_ptr<io::BaseStream> stream) {
+    int64_t process(const std::shared_ptr<io::BaseStream>& stream) {
       int64_t len = stream->write(reinterpret_cast<uint8_t*>(message_->payload), message_->payloadlen);
       if (len < 0)
         status_ = -1;
diff --git a/extensions/mqtt/processors/ConvertJSONAck.h b/extensions/mqtt/processors/ConvertJSONAck.h
index c99cb1b..dc90cc3 100644
--- a/extensions/mqtt/processors/ConvertJSONAck.h
+++ b/extensions/mqtt/processors/ConvertJSONAck.h
@@ -73,7 +73,7 @@ class ConvertJSONAck : public ConvertBase {
    public:
     ReadCallback() = default;
     ~ReadCallback() = default;
-    int64_t process(std::shared_ptr<io::BaseStream> stream) {
+    int64_t process(const std::shared_ptr<io::BaseStream>& stream) {
       int64_t ret = 0;
       if (nullptr == stream)
         return 0;
diff --git a/extensions/mqtt/processors/PublishMQTT.h b/extensions/mqtt/processors/PublishMQTT.h
index a3f4805..7925596 100644
--- a/extensions/mqtt/processors/PublishMQTT.h
+++ b/extensions/mqtt/processors/PublishMQTT.h
@@ -75,7 +75,7 @@ class PublishMQTT : public processors::AbstractMQTTProcessor {
       read_size_ = 0;
     }
     ~ReadCallback() = default;
-    int64_t process(std::shared_ptr<io::BaseStream> stream) {
+    int64_t process(const std::shared_ptr<io::BaseStream>& stream) {
       if (flow_size_ < max_seg_size_)
         max_seg_size_ = flow_size_;
       std::vector<unsigned char> buffer;
diff --git a/extensions/opc/include/fetchopc.h b/extensions/opc/include/fetchopc.h
index e8ff90c..cb60f31 100644
--- a/extensions/opc/include/fetchopc.h
+++ b/extensions/opc/include/fetchopc.h
@@ -82,7 +82,7 @@ protected:
     WriteCallback(std::string&& data)
       : data_(data) {
     }
-    int64_t process(std::shared_ptr<io::BaseStream> stream) {
+    int64_t process(const std::shared_ptr<io::BaseStream>& stream) {
       return stream->write(reinterpret_cast<uint8_t*>(const_cast<char*>(data_.c_str())), data_.size());
     }
   };
diff --git a/extensions/opc/include/putopc.h b/extensions/opc/include/putopc.h
index 8dfe61c..ec58de2 100644
--- a/extensions/opc/include/putopc.h
+++ b/extensions/opc/include/putopc.h
@@ -80,7 +80,7 @@ class PutOPCProcessor : public BaseOPCProcessor {
   class ReadCallback : public InputStreamCallback {
   public:
     ReadCallback(std::shared_ptr<logging::Logger> logger) : logger_(logger) {}
-    int64_t process(std::shared_ptr<io::BaseStream> stream) override;
+    int64_t process(const std::shared_ptr<io::BaseStream>& stream) override;
     const std::vector<uint8_t>& getContent() const { return buf_; }
 
   private:
diff --git a/extensions/opc/src/putopc.cpp b/extensions/opc/src/putopc.cpp
index 2214489..36f8739 100644
--- a/extensions/opc/src/putopc.cpp
+++ b/extensions/opc/src/putopc.cpp
@@ -421,7 +421,7 @@ namespace processors {
     }
   }
 
-  int64_t PutOPCProcessor::ReadCallback::process(std::shared_ptr<io::BaseStream> stream) {
+  int64_t PutOPCProcessor::ReadCallback::process(const std::shared_ptr<io::BaseStream>& stream) {
     buf_.clear();
     buf_.resize(stream->size());
 
diff --git a/extensions/opencv/CaptureRTSPFrame.h b/extensions/opencv/CaptureRTSPFrame.h
index 5253f23..9a3f500 100644
--- a/extensions/opencv/CaptureRTSPFrame.h
+++ b/extensions/opencv/CaptureRTSPFrame.h
@@ -69,7 +69,7 @@ class CaptureRTSPFrame : public core::Processor {
     }
     ~CaptureRTSPFrameWriteCallback() override = default;
 
-    int64_t process(std::shared_ptr<io::BaseStream> stream) override {
+    int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
       int64_t ret = 0;
       imencode(image_encoding_, image_mat_, image_buf_);
       ret = stream->write(image_buf_.data(), image_buf_.size());
diff --git a/extensions/opencv/FrameIO.h b/extensions/opencv/FrameIO.h
index 3359be9..bd3669e 100644
--- a/extensions/opencv/FrameIO.h
+++ b/extensions/opencv/FrameIO.h
@@ -32,7 +32,7 @@ class FrameWriteCallback : public OutputStreamCallback {
     }
     ~FrameWriteCallback() override = default;
 
-    int64_t process(std::shared_ptr<io::BaseStream> stream) override {
+    int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
       int64_t ret = 0;
       imencode(image_encoding_, image_mat_, image_buf_);
       ret = stream->write(image_buf_.data(), image_buf_.size());
@@ -52,7 +52,7 @@ class FrameReadCallback : public InputStreamCallback {
     }
     ~FrameReadCallback() override = default;
 
-    int64_t process(std::shared_ptr<io::BaseStream> stream) override {
+    int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
       int64_t ret = 0;
       image_buf_.resize(stream->getSize());
       ret = stream->read(image_buf_.data(), static_cast<int>(stream->getSize()));
diff --git a/extensions/openwsman/processors/SourceInitiatedSubscriptionListener.cpp b/extensions/openwsman/processors/SourceInitiatedSubscriptionListener.cpp
index 8e697a0..9f14ea5 100644
--- a/extensions/openwsman/processors/SourceInitiatedSubscriptionListener.cpp
+++ b/extensions/openwsman/processors/SourceInitiatedSubscriptionListener.cpp
@@ -604,7 +604,7 @@ SourceInitiatedSubscriptionListener::Handler::WriteCallback::WriteCallback(char*
     : text_(text) {
 }
 
-int64_t SourceInitiatedSubscriptionListener::Handler::WriteCallback::process(std::shared_ptr<io::BaseStream> stream) {
+int64_t SourceInitiatedSubscriptionListener::Handler::WriteCallback::process(const std::shared_ptr<io::BaseStream>& stream) {
   return stream->write(reinterpret_cast<uint8_t*>(text_), strlen(text_));
 }
 
diff --git a/extensions/openwsman/processors/SourceInitiatedSubscriptionListener.h b/extensions/openwsman/processors/SourceInitiatedSubscriptionListener.h
index d884e3e..9ba0990 100644
--- a/extensions/openwsman/processors/SourceInitiatedSubscriptionListener.h
+++ b/extensions/openwsman/processors/SourceInitiatedSubscriptionListener.h
@@ -93,7 +93,7 @@ class SourceInitiatedSubscriptionListener : public core::Processor {
     class WriteCallback : public OutputStreamCallback {
      public:
       explicit WriteCallback(char* text);
-      int64_t process(std::shared_ptr<io::BaseStream> stream);
+      int64_t process(const std::shared_ptr<io::BaseStream>& stream);
 
      private:
       char* text_;
diff --git a/extensions/script/lua/LuaProcessSession.h b/extensions/script/lua/LuaProcessSession.h
index f0cbb4f..67d558b 100644
--- a/extensions/script/lua/LuaProcessSession.h
+++ b/extensions/script/lua/LuaProcessSession.h
@@ -59,7 +59,7 @@ class LuaProcessSession {
       lua_callback_ = input_stream_callback;
     }
 
-    int64_t process(std::shared_ptr<io::BaseStream> stream) override {
+    int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
       auto lua_stream = std::make_shared<LuaBaseStream>(stream);
       sol::function callback = lua_callback_["process"];
       return callback(lua_callback_, lua_stream);
@@ -75,7 +75,7 @@ class LuaProcessSession {
       lua_callback_ = output_stream_callback;
     }
 
-    int64_t process(std::shared_ptr<io::BaseStream> stream) override {
+    int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
       auto lua_stream = std::make_shared<LuaBaseStream>(stream);
       sol::function callback = lua_callback_["process"];
       return callback(lua_callback_, lua_stream);
diff --git a/extensions/script/python/PyProcessSession.h b/extensions/script/python/PyProcessSession.h
index 00bad31..33f61a3 100644
--- a/extensions/script/python/PyProcessSession.h
+++ b/extensions/script/python/PyProcessSession.h
@@ -64,7 +64,7 @@ class PyProcessSession {
       py_callback_ = input_stream_callback;
     }
 
-    int64_t process(std::shared_ptr<io::BaseStream> stream) override {
+    int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
       auto py_stream = std::make_shared<PyBaseStream>(stream);
       return py_callback_.attr("process")(py_stream).cast<int64_t>();
     }
@@ -79,7 +79,7 @@ class PyProcessSession {
       py_callback_ = output_stream_callback;
     }
 
-    int64_t process(std::shared_ptr<io::BaseStream> stream) override {
+    int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
       auto py_stream = std::make_shared<PyBaseStream>(stream);
       return py_callback_.attr("process")(py_stream).cast<int64_t>();
     }
diff --git a/extensions/sensors/SensorBase.h b/extensions/sensors/SensorBase.h
index e5c306d..19e7f18 100644
--- a/extensions/sensors/SensorBase.h
+++ b/extensions/sensors/SensorBase.h
@@ -71,7 +71,7 @@ class SensorBase : public core::Processor {
       }
       char *_data;
       uint64_t _dataSize;
-      int64_t process(std::shared_ptr<io::BaseStream> stream) {
+      int64_t process(const std::shared_ptr<io::BaseStream>& stream) {
         int64_t ret = 0;
         if (_data && _dataSize > 0)
           ret = stream->write(reinterpret_cast<uint8_t*>(_data), _dataSize);
diff --git a/extensions/sftp/processors/FetchSFTP.cpp b/extensions/sftp/processors/FetchSFTP.cpp
index e9d05ea..d5bc1f9 100644
--- a/extensions/sftp/processors/FetchSFTP.cpp
+++ b/extensions/sftp/processors/FetchSFTP.cpp
@@ -157,7 +157,7 @@ FetchSFTP::WriteCallback::WriteCallback(const std::string& remote_file,
 
 FetchSFTP::WriteCallback::~WriteCallback() = default;
 
-int64_t FetchSFTP::WriteCallback::process(std::shared_ptr<io::BaseStream> stream) {
+int64_t FetchSFTP::WriteCallback::process(const std::shared_ptr<io::BaseStream>& stream) {
   if (!client_.getFile(remote_file_, *stream)) {
     throw client_.getLastError();
   }
diff --git a/extensions/sftp/processors/FetchSFTP.h b/extensions/sftp/processors/FetchSFTP.h
index 0b27af0..6beb40a 100644
--- a/extensions/sftp/processors/FetchSFTP.h
+++ b/extensions/sftp/processors/FetchSFTP.h
@@ -82,7 +82,7 @@ class FetchSFTP : public SFTPProcessorBase {
     WriteCallback(const std::string& remote_file,
                  utils::SFTPClient& client);
     ~WriteCallback();
-    virtual int64_t process(std::shared_ptr<io::BaseStream> stream) override;
+    virtual int64_t process(const std::shared_ptr<io::BaseStream>& stream) override;
 
    private:
     std::shared_ptr<logging::Logger> logger_;
diff --git a/extensions/sftp/processors/PutSFTP.cpp b/extensions/sftp/processors/PutSFTP.cpp
index fc844a0..e9c1a8f 100644
--- a/extensions/sftp/processors/PutSFTP.cpp
+++ b/extensions/sftp/processors/PutSFTP.cpp
@@ -210,7 +210,7 @@ PutSFTP::ReadCallback::ReadCallback(const std::string& target_path,
 
 PutSFTP::ReadCallback::~ReadCallback() = default;
 
-int64_t PutSFTP::ReadCallback::process(std::shared_ptr<io::BaseStream> stream) {
+int64_t PutSFTP::ReadCallback::process(const std::shared_ptr<io::BaseStream>& stream) {
   if (!client_.putFile(target_path_,
       *stream,
       conflict_resolution_ == CONFLICT_RESOLUTION_REPLACE /*overwrite*/,
diff --git a/extensions/sftp/processors/PutSFTP.h b/extensions/sftp/processors/PutSFTP.h
index 3d2a115..dff0671 100644
--- a/extensions/sftp/processors/PutSFTP.h
+++ b/extensions/sftp/processors/PutSFTP.h
@@ -98,7 +98,7 @@ namespace processors {
         utils::SFTPClient& client,
         const std::string& conflict_resolution);
     ~ReadCallback();
-    virtual int64_t process(std::shared_ptr<io::BaseStream> stream) override;
+    virtual int64_t process(const std::shared_ptr<io::BaseStream>& stream) override;
 
    private:
     std::shared_ptr<logging::Logger> logger_;
diff --git a/extensions/sql/data/WriteCallback.h b/extensions/sql/data/WriteCallback.h
index ab961ad..5336cd0 100644
--- a/extensions/sql/data/WriteCallback.h
+++ b/extensions/sql/data/WriteCallback.h
@@ -33,7 +33,7 @@ public:
     : data_(data) {
   }
 
- int64_t process(std::shared_ptr<io::BaseStream> stream) {
+ int64_t process(const std::shared_ptr<io::BaseStream>& stream) {
     if (data_.empty())
       return 0;
 
diff --git a/extensions/sqlite/ExecuteSQL.cpp b/extensions/sqlite/ExecuteSQL.cpp
index 1da955d..e42fc4e 100644
--- a/extensions/sqlite/ExecuteSQL.cpp
+++ b/extensions/sqlite/ExecuteSQL.cpp
@@ -172,7 +172,7 @@ void ExecuteSQL::onTrigger(const std::shared_ptr<core::ProcessContext> &context,
   }
 }
 
-int64_t ExecuteSQL::SQLReadCallback::process(std::shared_ptr<io::BaseStream> stream) {
+int64_t ExecuteSQL::SQLReadCallback::process(const std::shared_ptr<io::BaseStream>& stream) {
   sql_->resize(stream->size());
   auto num_read = static_cast<uint64_t >(stream->read(reinterpret_cast<uint8_t *>(&(*sql_)[0]),
                                                           static_cast<int>(stream->size())));
diff --git a/extensions/sqlite/ExecuteSQL.h b/extensions/sqlite/ExecuteSQL.h
index 58a53a3..63b11ec 100644
--- a/extensions/sqlite/ExecuteSQL.h
+++ b/extensions/sqlite/ExecuteSQL.h
@@ -58,7 +58,7 @@ class ExecuteSQL : public core::Processor {
     explicit SQLReadCallback(std::shared_ptr<std::string> sql)
         : sql_(std::move(sql)) {
     }
-    int64_t process(std::shared_ptr<io::BaseStream> stream) override;
+    int64_t process(const std::shared_ptr<io::BaseStream>& stream) override;
 
    private:
     std::shared_ptr<std::string> sql_;
diff --git a/extensions/sqlite/PutSQL.cpp b/extensions/sqlite/PutSQL.cpp
index 8694f4e..c9d7bc8 100644
--- a/extensions/sqlite/PutSQL.cpp
+++ b/extensions/sqlite/PutSQL.cpp
@@ -172,7 +172,7 @@ void PutSQL::onTrigger(const std::shared_ptr<core::ProcessContext> &context,
   }
 }
 
-int64_t PutSQL::SQLReadCallback::process(std::shared_ptr<io::BaseStream> stream) {
+int64_t PutSQL::SQLReadCallback::process(const std::shared_ptr<io::BaseStream>& stream) {
   sql_->resize(stream->size());
   auto num_read = static_cast<uint64_t >(stream->read(reinterpret_cast<uint8_t *>(&(*sql_)[0]),
                                                           static_cast<int>(stream->size())));
diff --git a/extensions/sqlite/PutSQL.h b/extensions/sqlite/PutSQL.h
index 77d5aab..e071048 100644
--- a/extensions/sqlite/PutSQL.h
+++ b/extensions/sqlite/PutSQL.h
@@ -60,7 +60,7 @@ class PutSQL : public core::Processor {
     explicit SQLReadCallback(std::shared_ptr<std::string> sql)
         : sql_(std::move(sql)) {
     }
-    int64_t process(std::shared_ptr<io::BaseStream> stream) override;
+    int64_t process(const std::shared_ptr<io::BaseStream>& stream) override;
 
    private:
     std::shared_ptr<std::string> sql_;
diff --git a/extensions/standard-processors/processors/ExecuteProcess.h b/extensions/standard-processors/processors/ExecuteProcess.h
index 8c6cdb3..49bf3a7 100644
--- a/extensions/standard-processors/processors/ExecuteProcess.h
+++ b/extensions/standard-processors/processors/ExecuteProcess.h
@@ -92,7 +92,7 @@ class ExecuteProcess : public core::Processor {
     char *_data;
     uint64_t _dataSize;
     // void process(std::ofstream *stream) {
-    int64_t process(std::shared_ptr<io::BaseStream> stream) {
+    int64_t process(const std::shared_ptr<io::BaseStream>& stream) {
       int64_t ret = 0;
       if (_data && _dataSize > 0)
         ret = stream->write(reinterpret_cast<uint8_t*>(_data), _dataSize);
diff --git a/extensions/standard-processors/processors/ExtractText.cpp b/extensions/standard-processors/processors/ExtractText.cpp
index d2b1b3b..345b84a 100644
--- a/extensions/standard-processors/processors/ExtractText.cpp
+++ b/extensions/standard-processors/processors/ExtractText.cpp
@@ -112,7 +112,7 @@ void ExtractText::onTrigger(core::ProcessContext *context, core::ProcessSession
   session->transfer(flowFile, Success);
 }
 
-int64_t ExtractText::ReadCallback::process(std::shared_ptr<io::BaseStream> stream) {
+int64_t ExtractText::ReadCallback::process(const std::shared_ptr<io::BaseStream>& stream) {
   int64_t ret = 0;
   uint64_t read_size = 0;
   bool regex_mode;
diff --git a/extensions/standard-processors/processors/ExtractText.h b/extensions/standard-processors/processors/ExtractText.h
index e7df36f..10c9017 100644
--- a/extensions/standard-processors/processors/ExtractText.h
+++ b/extensions/standard-processors/processors/ExtractText.h
@@ -76,7 +76,7 @@ class ExtractText : public core::Processor {
      public:
         ReadCallback(std::shared_ptr<core::FlowFile> flowFile, core::ProcessContext *ct, std::shared_ptr<logging::Logger> lgr);
         ~ReadCallback() = default;
-        int64_t process(std::shared_ptr<io::BaseStream> stream);
+        int64_t process(const std::shared_ptr<io::BaseStream>& stream);
 
      private:
         std::shared_ptr<core::FlowFile> flowFile_;
diff --git a/extensions/standard-processors/processors/GenerateFlowFile.h b/extensions/standard-processors/processors/GenerateFlowFile.h
index 23bc1eb..b5d7dfd 100644
--- a/extensions/standard-processors/processors/GenerateFlowFile.h
+++ b/extensions/standard-processors/processors/GenerateFlowFile.h
@@ -70,7 +70,7 @@ class GenerateFlowFile : public core::Processor {
     WriteCallback(const std::vector<char>& data) : data_(data) { // NOLINT
     }
     std::vector<char> data_;
-    int64_t process(std::shared_ptr<io::BaseStream> stream) {
+    int64_t process(const std::shared_ptr<io::BaseStream>& stream) {
       int64_t ret = 0;
       if (data_.size() > 0)
         ret = stream->write(reinterpret_cast<uint8_t*>(&data_[0]), data_.size());
diff --git a/extensions/standard-processors/processors/GetTCP.h b/extensions/standard-processors/processors/GetTCP.h
index aaad66c..e582d44 100644
--- a/extensions/standard-processors/processors/GetTCP.h
+++ b/extensions/standard-processors/processors/GetTCP.h
@@ -92,7 +92,7 @@ class DataHandlerCallback : public OutputStreamCallback {
 
   virtual ~DataHandlerCallback() = default;
 
-  virtual int64_t process(std::shared_ptr<io::BaseStream> stream) {
+  virtual int64_t process(const std::shared_ptr<io::BaseStream>& stream) {
     return stream->write(message_, size_);
   }
 
diff --git a/extensions/standard-processors/processors/HashContent.cpp b/extensions/standard-processors/processors/HashContent.cpp
index a7ab109..7a4bbf3 100644
--- a/extensions/standard-processors/processors/HashContent.cpp
+++ b/extensions/standard-processors/processors/HashContent.cpp
@@ -93,7 +93,7 @@ void HashContent::onTrigger(core::ProcessContext *, core::ProcessSession *sessio
   session->transfer(flowFile, Success);
 }
 
-int64_t HashContent::ReadCallback::process(std::shared_ptr<io::BaseStream> stream) {
+int64_t HashContent::ReadCallback::process(const std::shared_ptr<io::BaseStream>& stream) {
   // This throws in case algo is not found, but that's fine
   parent_.logger_->log_trace("Searching for %s", parent_.algoName_);
   auto algo = HashAlgos.at(parent_.algoName_);
diff --git a/extensions/standard-processors/processors/HashContent.h b/extensions/standard-processors/processors/HashContent.h
index d55a172..1467cfb 100644
--- a/extensions/standard-processors/processors/HashContent.h
+++ b/extensions/standard-processors/processors/HashContent.h
@@ -162,7 +162,7 @@ class HashContent : public core::Processor {
    public:
     ReadCallback(std::shared_ptr<core::FlowFile> flowFile, const HashContent& parent);
     ~ReadCallback() {}
-    int64_t process(std::shared_ptr<io::BaseStream> stream);
+    int64_t process(const std::shared_ptr<io::BaseStream>& stream);
 
    private:
     std::shared_ptr<core::FlowFile> flowFile_;
diff --git a/extensions/standard-processors/processors/ListenSyslog.h b/extensions/standard-processors/processors/ListenSyslog.h
index bcf4f0d..fcf24a0 100644
--- a/extensions/standard-processors/processors/ListenSyslog.h
+++ b/extensions/standard-processors/processors/ListenSyslog.h
@@ -134,7 +134,7 @@ class ListenSyslog : public core::Processor {
     }
     uint8_t *_data;
     uint64_t _dataSize;
-    int64_t process(std::shared_ptr<io::BaseStream> stream) {
+    int64_t process(const std::shared_ptr<io::BaseStream>& stream) {
       int64_t ret = 0;
       if (_data && _dataSize > 0)
         ret = stream->write(_data, _dataSize);
diff --git a/extensions/standard-processors/processors/LogAttribute.h b/extensions/standard-processors/processors/LogAttribute.h
index 190d3d1..d1df602 100644
--- a/extensions/standard-processors/processors/LogAttribute.h
+++ b/extensions/standard-processors/processors/LogAttribute.h
@@ -102,7 +102,7 @@ class LogAttribute : public core::Processor {
         : logger_(std::move(logger))
         , buffer_(size)  {
     }
-    int64_t process(std::shared_ptr<io::BaseStream> stream) {
+    int64_t process(const std::shared_ptr<io::BaseStream>& stream) {
       if (buffer_.size() == 0U) {
         return 0U;
       }
diff --git a/extensions/standard-processors/processors/PutFile.cpp b/extensions/standard-processors/processors/PutFile.cpp
index 5eaa332..bfc9490 100644
--- a/extensions/standard-processors/processors/PutFile.cpp
+++ b/extensions/standard-processors/processors/PutFile.cpp
@@ -242,7 +242,7 @@ PutFile::ReadCallback::ReadCallback(const std::string &tmp_file, const std::stri
 }
 
 // Copy the entire file contents to the temporary file
-int64_t PutFile::ReadCallback::process(std::shared_ptr<io::BaseStream> stream) {
+int64_t PutFile::ReadCallback::process(const std::shared_ptr<io::BaseStream>& stream) {
   // Copy file contents into tmp file
   write_succeeded_ = false;
   size_t size = 0;
diff --git a/extensions/standard-processors/processors/PutFile.h b/extensions/standard-processors/processors/PutFile.h
index 6c4a0c8..f938483 100644
--- a/extensions/standard-processors/processors/PutFile.h
+++ b/extensions/standard-processors/processors/PutFile.h
@@ -80,7 +80,7 @@ class PutFile : public core::Processor {
    public:
     ReadCallback(const std::string &tmp_file, const std::string &dest_file);
     ~ReadCallback() override;
-    int64_t process(std::shared_ptr<io::BaseStream> stream) override;
+    int64_t process(const std::shared_ptr<io::BaseStream>& stream) override;
     bool commit();
 
    private:
diff --git a/extensions/standard-processors/processors/TailFile.cpp b/extensions/standard-processors/processors/TailFile.cpp
index 46bd952..547184a 100644
--- a/extensions/standard-processors/processors/TailFile.cpp
+++ b/extensions/standard-processors/processors/TailFile.cpp
@@ -202,7 +202,7 @@ class FileReaderCallback : public OutputStreamCallback {
     openFile(file_name, offset, input_stream_, logger_);
   }
 
-  int64_t process(std::shared_ptr<io::BaseStream> output_stream) override {
+  int64_t process(const std::shared_ptr<io::BaseStream>& output_stream) override {
     io::CRCStream<io::BaseStream> crc_stream{gsl::make_not_null(output_stream.get()), checksum_};
 
     uint64_t num_bytes_written = 0;
@@ -281,7 +281,7 @@ class WholeFileReaderCallback : public OutputStreamCallback {
     return checksum_;
   }
 
-  int64_t process(std::shared_ptr<io::BaseStream> output_stream) override {
+  int64_t process(const std::shared_ptr<io::BaseStream>& output_stream) override {
     std::array<char, BUFFER_SIZE> buffer;
 
     io::CRCStream<io::BaseStream> crc_stream{gsl::make_not_null(output_stream.get()), checksum_};
diff --git a/extensions/tensorflow/TFApplyGraph.cpp b/extensions/tensorflow/TFApplyGraph.cpp
index bc4aac6..4419caa 100644
--- a/extensions/tensorflow/TFApplyGraph.cpp
+++ b/extensions/tensorflow/TFApplyGraph.cpp
@@ -188,7 +188,7 @@ void TFApplyGraph::onTrigger(const std::shared_ptr<core::ProcessContext> &contex
   }
 }
 
-int64_t TFApplyGraph::GraphReadCallback::process(std::shared_ptr<io::BaseStream> stream) {
+int64_t TFApplyGraph::GraphReadCallback::process(const std::shared_ptr<io::BaseStream>& stream) {
   std::string graph_proto_buf;
   graph_proto_buf.resize(stream->size());
   auto num_read = stream->read(reinterpret_cast<uint8_t *>(&graph_proto_buf[0]),
@@ -202,7 +202,7 @@ int64_t TFApplyGraph::GraphReadCallback::process(std::shared_ptr<io::BaseStream>
   return num_read;
 }
 
-int64_t TFApplyGraph::TensorReadCallback::process(std::shared_ptr<io::BaseStream> stream) {
+int64_t TFApplyGraph::TensorReadCallback::process(const std::shared_ptr<io::BaseStream>& stream) {
   std::string tensor_proto_buf;
   tensor_proto_buf.resize(stream->size());
   auto num_read = stream->read(reinterpret_cast<uint8_t *>(&tensor_proto_buf[0]),
@@ -216,7 +216,7 @@ int64_t TFApplyGraph::TensorReadCallback::process(std::shared_ptr<io::BaseStream
   return num_read;
 }
 
-int64_t TFApplyGraph::TensorWriteCallback::process(std::shared_ptr<io::BaseStream> stream) {
+int64_t TFApplyGraph::TensorWriteCallback::process(const std::shared_ptr<io::BaseStream>& stream) {
   auto tensor_proto_buf = tensor_proto_->SerializeAsString();
   auto num_wrote = stream->write(reinterpret_cast<uint8_t *>(&tensor_proto_buf[0]),
                                      static_cast<int>(tensor_proto_buf.size()));
diff --git a/extensions/tensorflow/TFApplyGraph.h b/extensions/tensorflow/TFApplyGraph.h
index 3155b05..446b08d 100644
--- a/extensions/tensorflow/TFApplyGraph.h
+++ b/extensions/tensorflow/TFApplyGraph.h
@@ -64,7 +64,7 @@ class TFApplyGraph : public core::Processor {
         : graph_def_(std::move(graph_def)) {
     }
     ~GraphReadCallback() override = default;
-    int64_t process(std::shared_ptr<io::BaseStream> stream) override;
+    int64_t process(const std::shared_ptr<io::BaseStream>& stream) override;
 
    private:
     std::shared_ptr<tensorflow::GraphDef> graph_def_;
@@ -76,7 +76,7 @@ class TFApplyGraph : public core::Processor {
         : tensor_proto_(std::move(tensor_proto)) {
     }
     ~TensorReadCallback() override = default;
-    int64_t process(std::shared_ptr<io::BaseStream> stream) override;
+    int64_t process(const std::shared_ptr<io::BaseStream>& stream) override;
 
    private:
     std::shared_ptr<tensorflow::TensorProto> tensor_proto_;
@@ -88,7 +88,7 @@ class TFApplyGraph : public core::Processor {
         : tensor_proto_(std::move(tensor_proto)) {
     }
     ~TensorWriteCallback() override = default;
-    int64_t process(std::shared_ptr<io::BaseStream> stream) override;
+    int64_t process(const std::shared_ptr<io::BaseStream>& stream) override;
 
    private:
     std::shared_ptr<tensorflow::TensorProto> tensor_proto_;
diff --git a/extensions/tensorflow/TFConvertImageToTensor.cpp b/extensions/tensorflow/TFConvertImageToTensor.cpp
index bdc86e4..5f94548 100644
--- a/extensions/tensorflow/TFConvertImageToTensor.cpp
+++ b/extensions/tensorflow/TFConvertImageToTensor.cpp
@@ -317,7 +317,7 @@ void TFConvertImageToTensor::onTrigger(const std::shared_ptr<core::ProcessContex
   }
 }
 
-int64_t TFConvertImageToTensor::ImageReadCallback::process(std::shared_ptr<io::BaseStream> stream) {
+int64_t TFConvertImageToTensor::ImageReadCallback::process(const std::shared_ptr<io::BaseStream>& stream) {
   if (tensor_->AllocatedBytes() < stream->size()) {
     throw std::runtime_error("Tensor is not big enough to hold FlowFile bytes");
   }
@@ -332,7 +332,7 @@ int64_t TFConvertImageToTensor::ImageReadCallback::process(std::shared_ptr<io::B
   return num_read;
 }
 
-int64_t TFConvertImageToTensor::TensorWriteCallback::process(std::shared_ptr<io::BaseStream> stream) {
+int64_t TFConvertImageToTensor::TensorWriteCallback::process(const std::shared_ptr<io::BaseStream>& stream) {
   auto tensor_proto_buf = tensor_proto_->SerializeAsString();
   auto num_wrote = stream->write(reinterpret_cast<uint8_t *>(&tensor_proto_buf[0]),
                                      static_cast<int>(tensor_proto_buf.size()));
diff --git a/extensions/tensorflow/TFConvertImageToTensor.h b/extensions/tensorflow/TFConvertImageToTensor.h
index 268a041..dc0a02b 100644
--- a/extensions/tensorflow/TFConvertImageToTensor.h
+++ b/extensions/tensorflow/TFConvertImageToTensor.h
@@ -70,7 +70,7 @@ class TFConvertImageToTensor : public core::Processor {
         : tensor_(tensor) {
     }
     ~ImageReadCallback() override = default;
-    int64_t process(std::shared_ptr<io::BaseStream> stream) override;
+    int64_t process(const std::shared_ptr<io::BaseStream>& stream) override;
 
    private:
     tensorflow::Tensor *tensor_;
@@ -82,7 +82,7 @@ class TFConvertImageToTensor : public core::Processor {
         : tensor_proto_(std::move(tensor_proto)) {
     }
     ~TensorWriteCallback() override = default;
-    int64_t process(std::shared_ptr<io::BaseStream> stream) override;
+    int64_t process(const std::shared_ptr<io::BaseStream>& stream) override;
 
    private:
     std::shared_ptr<tensorflow::TensorProto> tensor_proto_;
diff --git a/extensions/tensorflow/TFExtractTopLabels.cpp b/extensions/tensorflow/TFExtractTopLabels.cpp
index dab3071..9bcb067 100644
--- a/extensions/tensorflow/TFExtractTopLabels.cpp
+++ b/extensions/tensorflow/TFExtractTopLabels.cpp
@@ -123,7 +123,7 @@ void TFExtractTopLabels::onTrigger(const std::shared_ptr<core::ProcessContext> &
   }
 }
 
-int64_t TFExtractTopLabels::LabelsReadCallback::process(std::shared_ptr<io::BaseStream> stream) {
+int64_t TFExtractTopLabels::LabelsReadCallback::process(const std::shared_ptr<io::BaseStream>& stream) {
   int64_t total_read = 0;
   std::string label;
   uint64_t max_label_len = 65536;
@@ -152,7 +152,7 @@ int64_t TFExtractTopLabels::LabelsReadCallback::process(std::shared_ptr<io::Base
   return total_read;
 }
 
-int64_t TFExtractTopLabels::TensorReadCallback::process(std::shared_ptr<io::BaseStream> stream) {
+int64_t TFExtractTopLabels::TensorReadCallback::process(const std::shared_ptr<io::BaseStream>& stream) {
   std::string tensor_proto_buf;
   tensor_proto_buf.resize(stream->size());
   auto num_read = stream->read(reinterpret_cast<uint8_t *>(&tensor_proto_buf[0]),
diff --git a/extensions/tensorflow/TFExtractTopLabels.h b/extensions/tensorflow/TFExtractTopLabels.h
index 85b0244..7288e84 100644
--- a/extensions/tensorflow/TFExtractTopLabels.h
+++ b/extensions/tensorflow/TFExtractTopLabels.h
@@ -56,7 +56,7 @@ class TFExtractTopLabels : public core::Processor {
         : labels_(std::move(labels)) {
     }
     ~LabelsReadCallback() override = default;
-    int64_t process(std::shared_ptr<io::BaseStream> stream) override;
+    int64_t process(const std::shared_ptr<io::BaseStream>& stream) override;
 
    private:
     std::shared_ptr<std::vector<std::string>> labels_;
@@ -68,7 +68,7 @@ class TFExtractTopLabels : public core::Processor {
         : tensor_proto_(std::move(tensor_proto)) {
     }
     ~TensorReadCallback() override = default;
-    int64_t process(std::shared_ptr<io::BaseStream> stream) override;
+    int64_t process(const std::shared_ptr<io::BaseStream>& stream) override;
 
    private:
     std::shared_ptr<tensorflow::TensorProto> tensor_proto_;
diff --git a/extensions/usb-camera/GetUSBCamera.cpp b/extensions/usb-camera/GetUSBCamera.cpp
index ec2d40f..523da22 100644
--- a/extensions/usb-camera/GetUSBCamera.cpp
+++ b/extensions/usb-camera/GetUSBCamera.cpp
@@ -397,7 +397,7 @@ GetUSBCamera::PNGWriteCallback::PNGWriteCallback(std::shared_ptr<std::mutex> wri
       logger_(logging::LoggerFactory<PNGWriteCallback>::getLogger()) {
 }
 
-int64_t GetUSBCamera::PNGWriteCallback::process(std::shared_ptr<io::BaseStream> stream) {
+int64_t GetUSBCamera::PNGWriteCallback::process(const std::shared_ptr<io::BaseStream>& stream) {
   std::lock_guard<std::mutex> lock(*png_write_mtx_);
   logger_->log_info("Writing %d bytes of raw capture data to PNG output", frame_->data_bytes);
   png_structp png = png_create_write_struct(PNG_LIBPNG_VER_STRING, nullptr, nullptr, nullptr);
@@ -461,7 +461,7 @@ GetUSBCamera::RawWriteCallback::RawWriteCallback(uvc_frame_t *frame)
       logger_(logging::LoggerFactory<RawWriteCallback>::getLogger()) {
 }
 
-int64_t GetUSBCamera::RawWriteCallback::process(std::shared_ptr<io::BaseStream> stream) {
+int64_t GetUSBCamera::RawWriteCallback::process(const std::shared_ptr<io::BaseStream>& stream) {
   logger_->log_info("Writing %d bytes of raw capture data", frame_->data_bytes);
   return stream->write(reinterpret_cast<uint8_t *>(frame_->data), frame_->data_bytes);
 }
diff --git a/extensions/usb-camera/GetUSBCamera.h b/extensions/usb-camera/GetUSBCamera.h
index 0699c7c..38c6884 100644
--- a/extensions/usb-camera/GetUSBCamera.h
+++ b/extensions/usb-camera/GetUSBCamera.h
@@ -98,7 +98,7 @@ class GetUSBCamera : public core::Processor {
   class PNGWriteCallback : public OutputStreamCallback {
    public:
     PNGWriteCallback(std::shared_ptr<std::mutex> write_mtx, uvc_frame_t *frame, uint32_t width, uint32_t height);
-    int64_t process(std::shared_ptr<io::BaseStream> stream) override;
+    int64_t process(const std::shared_ptr<io::BaseStream>& stream) override;
 
    private:
     std::shared_ptr<std::mutex> png_write_mtx_;
@@ -113,7 +113,7 @@ class GetUSBCamera : public core::Processor {
   class RawWriteCallback : public OutputStreamCallback {
    public:
     explicit RawWriteCallback(uvc_frame_t *frame);
-    int64_t process(std::shared_ptr<io::BaseStream> stream) override;
+    int64_t process(const std::shared_ptr<io::BaseStream>& stream) override;
 
    private:
     uvc_frame_t *frame_;
diff --git a/extensions/windows-event-log/CollectorInitiatedSubscription.cpp b/extensions/windows-event-log/CollectorInitiatedSubscription.cpp
index 651bee4..55f741c 100644
--- a/extensions/windows-event-log/CollectorInitiatedSubscription.cpp
+++ b/extensions/windows-event-log/CollectorInitiatedSubscription.cpp
@@ -633,7 +633,7 @@ int CollectorInitiatedSubscription::processQueue(const std::shared_ptr<core::Pro
       status_ = 0;
     }
 
-    int64_t process(std::shared_ptr<io::BaseStream> stream) {
+    int64_t process(const std::shared_ptr<io::BaseStream>& stream) {
       return stream->write((uint8_t*)&str_[0], str_.size());
     }
 
diff --git a/extensions/windows-event-log/ConsumeWindowsEventLog.cpp b/extensions/windows-event-log/ConsumeWindowsEventLog.cpp
index db0f4b1..4a3d552 100644
--- a/extensions/windows-event-log/ConsumeWindowsEventLog.cpp
+++ b/extensions/windows-event-log/ConsumeWindowsEventLog.cpp
@@ -651,7 +651,7 @@ void ConsumeWindowsEventLog::putEventRenderFlowFileToSession(const EventRender&
       : str_(str) {
     }
 
-    int64_t process(std::shared_ptr<io::BaseStream> stream) {
+    int64_t process(const std::shared_ptr<io::BaseStream>& stream) {
       return stream->write(reinterpret_cast<uint8_t*>(const_cast<char*>(str_.c_str())), str_.size());
     }
 
diff --git a/libminifi/CMakeLists.txt b/libminifi/CMakeLists.txt
index 333fb4c..464e71e 100644
--- a/libminifi/CMakeLists.txt
+++ b/libminifi/CMakeLists.txt
@@ -89,7 +89,7 @@ if (NOT OPENSSL_OFF)
 	set(TLS_SOURCES "src/io/tls/*.cpp")
 endif()
 
-file(GLOB SOURCES  "src/utils/file/*.cpp" "src/sitetosite/*.cpp"  "src/core/logging/*.cpp"  "src/core/state/*.cpp" "src/core/state/nodes/*.cpp" "src/c2/protocols/*.cpp" "src/c2/triggers/*.cpp" "src/c2/*.cpp" "src/io/*.cpp" ${SOCKET_SOURCES} ${TLS_SOURCES} "src/core/controller/*.cpp" "src/controllers/*.cpp" "src/controllers/keyvalue/*.cpp" "src/core/*.cpp"  "src/core/repository/*.cpp" "src/core/yaml/*.cpp" "src/core/reporting/*.cpp"  "src/provenance/*.cpp" "src/utils/*.cpp" "src/*.cpp")
+file(GLOB SOURCES  "src/utils/file/*.cpp" "src/sitetosite/*.cpp"  "src/core/logging/*.cpp"  "src/core/state/*.cpp" "src/core/state/nodes/*.cpp" "src/c2/protocols/*.cpp" "src/c2/triggers/*.cpp" "src/c2/*.cpp" "src/io/*.cpp" ${SOCKET_SOURCES} ${TLS_SOURCES} "src/core/controller/*.cpp" "src/controllers/*.cpp" "src/controllers/keyvalue/*.cpp" "src/core/*.cpp"  "src/core/repository/*.cpp" "src/core/yaml/*.cpp" "src/core/reporting/*.cpp" "src/serialization/*.cpp" "src/provenance/*.cpp" "src/ut [...]
 # manually add this as it might not yet be present when this executes
 list(APPEND SOURCES "src/agent/agent_version.cpp")
 
diff --git a/libminifi/include/FlowFileRecord.h b/libminifi/include/FlowFileRecord.h
index ed738b4..445af89 100644
--- a/libminifi/include/FlowFileRecord.h
+++ b/libminifi/include/FlowFileRecord.h
@@ -39,6 +39,7 @@
 #include "ResourceClaim.h"
 #include "Connection.h"
 #include "io/OutputStream.h"
+#include "io/StreamPipe.h"
 
 namespace org {
 namespace apache {
@@ -47,21 +48,6 @@ namespace minifi {
 
 #define DEFAULT_FLOWFILE_PATH "."
 
-// FlowFile IO Callback functions for input and output
-// throw exception for error
-class InputStreamCallback {
- public:
-  virtual ~InputStreamCallback() = default;
-  // virtual void process(std::ifstream *stream) = 0;
-
-  virtual int64_t process(std::shared_ptr<io::BaseStream> stream) = 0;
-};
-class OutputStreamCallback {
- public:
-  virtual ~OutputStreamCallback() = default;
-  virtual int64_t process(std::shared_ptr<io::BaseStream> stream) = 0;
-};
-
 namespace core {
 class ProcessSession;
 }
diff --git a/libminifi/include/core/ProcessSession.h b/libminifi/include/core/ProcessSession.h
index 125439c..c429570 100644
--- a/libminifi/include/core/ProcessSession.h
+++ b/libminifi/include/core/ProcessSession.h
@@ -93,7 +93,7 @@ class ProcessSession : public ReferenceContainer {
   // Remove Flow File
   void remove(const std::shared_ptr<core::FlowFile> &flow);
   // Execute the given read callback against the content
-  void read(const std::shared_ptr<core::FlowFile> &flow, InputStreamCallback *callback);
+  int read(const std::shared_ptr<core::FlowFile> &flow, InputStreamCallback *callback);
   // Execute the given write callback against the content
   void write(const std::shared_ptr<core::FlowFile> &flow, OutputStreamCallback *callback);
   // Execute the given write/append callback against the content
diff --git a/libminifi/include/core/ProcessSessionReadCallback.h b/libminifi/include/core/ProcessSessionReadCallback.h
index 9ef7c6a..00a6f93 100644
--- a/libminifi/include/core/ProcessSessionReadCallback.h
+++ b/libminifi/include/core/ProcessSessionReadCallback.h
@@ -37,7 +37,7 @@ class ProcessSessionReadCallback : public InputStreamCallback {
   ProcessSessionReadCallback(const std::string &tmpFile, const std::string &destFile,
       std::shared_ptr<logging::Logger> logger);
   ~ProcessSessionReadCallback();
-  virtual int64_t process(std::shared_ptr<io::BaseStream> stream);
+  virtual int64_t process(const std::shared_ptr<io::BaseStream>& stream);
   bool commit();
 
  private:
diff --git a/libminifi/include/io/StreamPipe.h b/libminifi/include/io/StreamPipe.h
new file mode 100644
index 0000000..b3fafce
--- /dev/null
+++ b/libminifi/include/io/StreamPipe.h
@@ -0,0 +1,110 @@
+/**
+ * @file FlowFileRecord.h
+ * Flow file record class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <memory>
+#include <utility>
+#include "BaseStream.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+
+// FlowFile IO Callback functions for input and output
+// throw exception for error
+class InputStreamCallback {
+ public:
+  virtual ~InputStreamCallback() = default;
+
+  virtual int64_t process(const std::shared_ptr<io::BaseStream>& stream) = 0;
+};
+class OutputStreamCallback {
+ public:
+  virtual ~OutputStreamCallback() = default;
+  virtual int64_t process(const std::shared_ptr<io::BaseStream>& stream) = 0;
+};
+
+namespace internal {
+
+inline int64_t pipe(const std::shared_ptr<io::InputStream>& src, const std::shared_ptr<io::OutputStream>& dst) {
+  uint8_t buffer[4096U];
+  int64_t totalTransferred = 0;
+  while (true) {
+    int readRet = src->read(buffer, sizeof(buffer));
+    if (readRet < 0) {
+      return readRet;
+    }
+    if (readRet == 0) {
+      break;
+    }
+    int remaining = readRet;
+    int transferred = 0;
+    while (remaining > 0) {
+      int writeRet = dst->write(buffer + transferred, remaining);
+      // TODO(adebreceni):
+      //   write might return 0, e.g. in case of a congested server
+      //   what should we return then?
+      //     - the number of bytes read or
+      //     - the number of bytes wrote
+      if (writeRet < 0) {
+        return writeRet;
+      }
+      transferred += writeRet;
+      remaining -= writeRet;
+    }
+    totalTransferred += transferred;
+  }
+  return totalTransferred;
+}
+
+}  // namespace internal
+
+class InputStreamPipe : public InputStreamCallback {
+ public:
+  explicit InputStreamPipe(std::shared_ptr<io::OutputStream> output) : output_(std::move(output)) {}
+
+  int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
+    return internal::pipe(stream, output_);
+  }
+
+ private:
+  std::shared_ptr<io::OutputStream> output_;
+};
+
+class OutputStreamPipe : public OutputStreamCallback {
+ public:
+  explicit OutputStreamPipe(std::shared_ptr<io::InputStream> input) : input_(std::move(input)) {}
+
+  int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
+    return internal::pipe(input_, stream);
+  }
+
+ private:
+  std::shared_ptr<io::InputStream> input_;
+};
+
+
+
+}  // namespace minifi
+}  // namespace nifi
+}  // namespace apache
+}  // namespace org
diff --git a/extensions/sql/data/WriteCallback.h b/libminifi/include/serialization/FlowFileSerializer.h
similarity index 61%
copy from extensions/sql/data/WriteCallback.h
copy to libminifi/include/serialization/FlowFileSerializer.h
index ab961ad..02f21e4 100644
--- a/extensions/sql/data/WriteCallback.h
+++ b/libminifi/include/serialization/FlowFileSerializer.h
@@ -18,29 +18,40 @@
 
 #pragma once
 
-#include <string>
-
-#include "FlowFileRecord.h"
+#include <memory>
+#include <utility>
+#include <functional>
 
 namespace org {
 namespace apache {
 namespace nifi {
 namespace minifi {
+namespace io {
+
+class OutputStream;
+
+} /* namespace io */
+
+namespace core {
+
+class FlowFile;
+
+} /* namespace core */
+
+class InputStreamCallback;
+
+class FlowFileSerializer {
+ public:
+  using FlowFileReader = std::function<int(const std::shared_ptr<core::FlowFile>&, InputStreamCallback*)>;
 
-class WriteCallback : public OutputStreamCallback {
-public:
-  explicit WriteCallback(const std::string& data)
-    : data_(data) {
-  }
+  explicit FlowFileSerializer(FlowFileReader reader) : reader_(std::move(reader)) {}
 
- int64_t process(std::shared_ptr<io::BaseStream> stream) {
-    if (data_.empty())
-      return 0;
+  virtual int serialize(const std::shared_ptr<core::FlowFile>& flowFile, const std::shared_ptr<io::OutputStream>& out) = 0;
 
-    return stream->write(reinterpret_cast<uint8_t*>(const_cast<char*>(data_.c_str())), data_.size());
-  }
+  virtual ~FlowFileSerializer() = default;
 
- const std::string& data_;
+ protected:
+  FlowFileReader reader_;
 };
 
 } /* namespace minifi */
diff --git a/extensions/sql/data/WriteCallback.h b/libminifi/include/serialization/FlowFileV3Serializer.h
similarity index 59%
copy from extensions/sql/data/WriteCallback.h
copy to libminifi/include/serialization/FlowFileV3Serializer.h
index ab961ad..a6eb2cf 100644
--- a/extensions/sql/data/WriteCallback.h
+++ b/libminifi/include/serialization/FlowFileV3Serializer.h
@@ -18,29 +18,30 @@
 
 #pragma once
 
+#include <limits>
 #include <string>
-
-#include "FlowFileRecord.h"
+#include <memory>
+#include "io/OutputStream.h"
+#include "FlowFileSerializer.h"
 
 namespace org {
 namespace apache {
 namespace nifi {
 namespace minifi {
 
-class WriteCallback : public OutputStreamCallback {
-public:
-  explicit WriteCallback(const std::string& data)
-    : data_(data) {
-  }
+class FlowFileV3Serializer : public FlowFileSerializer {
+  static constexpr uint8_t MAGIC_HEADER[] = {'N', 'i', 'F', 'i', 'F', 'F', '3'};
+
+  static constexpr uint16_t MAX_2_BYTE_VALUE = (std::numeric_limits<uint16_t>::max)();
+
+  static int writeLength(std::size_t length, const std::shared_ptr<io::OutputStream>& out);
 
- int64_t process(std::shared_ptr<io::BaseStream> stream) {
-    if (data_.empty())
-      return 0;
+  static int writeString(const std::string& str, const std::shared_ptr<io::OutputStream>& out);
 
-    return stream->write(reinterpret_cast<uint8_t*>(const_cast<char*>(data_.c_str())), data_.size());
-  }
+ public:
+  using FlowFileSerializer::FlowFileSerializer;
 
- const std::string& data_;
+  int serialize(const std::shared_ptr<core::FlowFile>& flowFile, const std::shared_ptr<io::OutputStream>& out) override;
 };
 
 } /* namespace minifi */
diff --git a/extensions/sql/data/WriteCallback.h b/libminifi/include/serialization/PayloadSerializer.h
similarity index 70%
copy from extensions/sql/data/WriteCallback.h
copy to libminifi/include/serialization/PayloadSerializer.h
index ab961ad..ed26e85 100644
--- a/extensions/sql/data/WriteCallback.h
+++ b/libminifi/include/serialization/PayloadSerializer.h
@@ -18,29 +18,20 @@
 
 #pragma once
 
-#include <string>
-
-#include "FlowFileRecord.h"
+#include <memory>
+#include "io/OutputStream.h"
+#include "FlowFileSerializer.h"
 
 namespace org {
 namespace apache {
 namespace nifi {
 namespace minifi {
 
-class WriteCallback : public OutputStreamCallback {
-public:
-  explicit WriteCallback(const std::string& data)
-    : data_(data) {
-  }
-
- int64_t process(std::shared_ptr<io::BaseStream> stream) {
-    if (data_.empty())
-      return 0;
-
-    return stream->write(reinterpret_cast<uint8_t*>(const_cast<char*>(data_.c_str())), data_.size());
-  }
+class PayloadSerializer : public FlowFileSerializer {
+ public:
+  using FlowFileSerializer::FlowFileSerializer;
 
- const std::string& data_;
+  int serialize(const std::shared_ptr<core::FlowFile>& flowFile, const std::shared_ptr<io::OutputStream>& out) override;
 };
 
 } /* namespace minifi */
diff --git a/libminifi/include/sitetosite/SiteToSiteClient.h b/libminifi/include/sitetosite/SiteToSiteClient.h
index 18d2285..df967ee 100644
--- a/libminifi/include/sitetosite/SiteToSiteClient.h
+++ b/libminifi/include/sitetosite/SiteToSiteClient.h
@@ -290,7 +290,7 @@ class WriteCallback : public OutputStreamCallback {
   }
   DataPacket *_packet;
   // void process(std::ofstream *stream) {
-  int64_t process(std::shared_ptr<io::BaseStream> stream) {
+  int64_t process(const std::shared_ptr<io::BaseStream>& stream) {
     uint8_t buffer[16384];
     uint64_t len = _packet->_size;
     uint64_t total = 0;
@@ -316,7 +316,7 @@ class ReadCallback : public InputStreamCallback {
       : _packet(packet) {
   }
   DataPacket *_packet;
-  int64_t process(std::shared_ptr<io::BaseStream> stream) {
+  int64_t process(const std::shared_ptr<io::BaseStream>& stream) {
     _packet->_size = 0;
     uint8_t buffer[8192] = { 0 };
     int readSize;
diff --git a/libminifi/include/utils/ByteArrayCallback.h b/libminifi/include/utils/ByteArrayCallback.h
index 1639bf9..cbcdb4c 100644
--- a/libminifi/include/utils/ByteArrayCallback.h
+++ b/libminifi/include/utils/ByteArrayCallback.h
@@ -42,7 +42,7 @@ class ByteInputCallBack : public InputStreamCallback {
 
   virtual ~ByteInputCallBack() = default;
 
-  virtual int64_t process(std::shared_ptr<io::BaseStream> stream) {
+  virtual int64_t process(const std::shared_ptr<io::BaseStream>& stream) {
     stream->seek(0);
 
     if (stream->size() > 0) {
@@ -107,7 +107,7 @@ class ByteOutputCallback : public OutputStreamCallback {
     close();
   }
 
-  virtual int64_t process(std::shared_ptr<io::BaseStream> stream);
+  virtual int64_t process(const std::shared_ptr<io::BaseStream>& stream);
 
   virtual const std::vector<char> to_string();
 
@@ -155,7 +155,7 @@ class StreamOutputCallback : public ByteOutputCallback {
 
   virtual void write(char *data, size_t size);
 
-  virtual int64_t process(std::shared_ptr<io::BaseStream> stream);
+  virtual int64_t process(const std::shared_ptr<io::BaseStream>& stream);
 };
 
 }  // namespace utils
diff --git a/libminifi/include/utils/FileOutputCallback.h b/libminifi/include/utils/FileOutputCallback.h
index dfc906f..820ebe8 100644
--- a/libminifi/include/utils/FileOutputCallback.h
+++ b/libminifi/include/utils/FileOutputCallback.h
@@ -49,7 +49,7 @@ class FileOutputCallback : public ByteOutputCallback {
 
   virtual ~FileOutputCallback() = default;
 
-  int64_t process(std::shared_ptr<io::BaseStream> stream) override;
+  int64_t process(const std::shared_ptr<io::BaseStream>& stream) override;
 
   const std::vector<char> to_string() override;
 
diff --git a/libminifi/src/core/ProcessSession.cpp b/libminifi/src/core/ProcessSession.cpp
index 2abf4cf..5309c8e 100644
--- a/libminifi/src/core/ProcessSession.cpp
+++ b/libminifi/src/core/ProcessSession.cpp
@@ -281,7 +281,7 @@ void ProcessSession::append(const std::shared_ptr<core::FlowFile> &flow, OutputS
   }
 }
 
-void ProcessSession::read(const std::shared_ptr<core::FlowFile> &flow, InputStreamCallback *callback) {
+int ProcessSession::read(const std::shared_ptr<core::FlowFile> &flow, InputStreamCallback *callback) {
   try {
     std::shared_ptr<ResourceClaim> claim = nullptr;
 
@@ -289,7 +289,7 @@ void ProcessSession::read(const std::shared_ptr<core::FlowFile> &flow, InputStre
       // No existed claim for read, we throw exception
       logger_->log_debug("For %s, no resource claim but size is %d", flow->getUUIDStr(), flow->getSize());
       if (flow->getSize() == 0) {
-        return;
+        return 0;
       }
       throw Exception(FILE_OPERATION_EXCEPTION, "No Content Claim existed for read");
     }
@@ -304,9 +304,11 @@ void ProcessSession::read(const std::shared_ptr<core::FlowFile> &flow, InputStre
 
     stream->seek(flow->getOffset());
 
-    if (callback->process(stream) < 0) {
+    int ret = callback->process(stream);
+    if (ret < 0) {
       throw Exception(FILE_OPERATION_EXCEPTION, "Failed to process flowfile content");
     }
+    return ret;
   } catch (std::exception &exception) {
     logger_->log_debug("Caught Exception %s", exception.what());
     throw;
diff --git a/libminifi/src/core/ProcessSessionReadCallback.cpp b/libminifi/src/core/ProcessSessionReadCallback.cpp
index d0009bd..ff30f07 100644
--- a/libminifi/src/core/ProcessSessionReadCallback.cpp
+++ b/libminifi/src/core/ProcessSessionReadCallback.cpp
@@ -41,7 +41,7 @@ ProcessSessionReadCallback::ProcessSessionReadCallback(const std::string &tmpFil
 }
 
 // Copy the entire file contents to the temporary file
-int64_t ProcessSessionReadCallback::process(std::shared_ptr<io::BaseStream> stream) {
+int64_t ProcessSessionReadCallback::process(const std::shared_ptr<io::BaseStream>& stream) {
   // Copy file contents into tmp file
   _writeSucceeded = false;
   size_t size = 0;
diff --git a/libminifi/src/serialization/FlowFileV3Serializer.cpp b/libminifi/src/serialization/FlowFileV3Serializer.cpp
new file mode 100644
index 0000000..34a1087
--- /dev/null
+++ b/libminifi/src/serialization/FlowFileV3Serializer.cpp
@@ -0,0 +1,113 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "serialization/FlowFileV3Serializer.h"
+#include "core/ProcessSession.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+
+constexpr uint8_t FlowFileV3Serializer::MAGIC_HEADER[];
+
+int FlowFileV3Serializer::writeLength(std::size_t length, const std::shared_ptr<io::OutputStream>& out) {
+  if (length < MAX_2_BYTE_VALUE) {
+    return out->write(static_cast<uint16_t>(length));
+  }
+  int sum = 0;
+  int ret;
+  ret = out->write(static_cast<uint16_t>(MAX_2_BYTE_VALUE));
+  if (ret < 0) {
+    return ret;
+  }
+  sum += ret;
+  ret = out->write(static_cast<uint32_t>(length));
+  if (ret < 0) {
+    return ret;
+  }
+  sum += ret;
+  return sum;
+}
+
+int FlowFileV3Serializer::writeString(const std::string &str, const std::shared_ptr<io::OutputStream> &out) {
+  int sum = 0;
+  int ret;
+  ret = writeLength(str.length(), out);
+  if (ret < 0) {
+    return ret;
+  }
+  sum += ret;
+  ret = out->write(const_cast<uint8_t*>(reinterpret_cast<const uint8_t*>(str.data())), str.length());
+  if (ret < 0) {
+    return ret;
+  }
+  if (ret != str.length()) {
+    return -1;
+  }
+  sum += ret;
+  return sum;
+}
+
+int FlowFileV3Serializer::serialize(const std::shared_ptr<core::FlowFile>& flowFile, const std::shared_ptr<io::OutputStream>& out) {
+  int sum = 0;
+  int ret;
+  ret = out->write(const_cast<uint8_t*>(MAGIC_HEADER), sizeof(MAGIC_HEADER));
+  if (ret < 0) {
+    return ret;
+  }
+  if (ret != sizeof(MAGIC_HEADER)) {
+    return -1;
+  }
+  sum += ret;
+  const auto& attributes = flowFile->getAttributes();
+  ret = writeLength(attributes.size(), out);
+  if (ret < 0) {
+    return ret;
+  }
+  sum += ret;
+  for (const auto& attrIt : attributes) {
+    ret = writeString(attrIt.first, out);
+    if (ret < 0) {
+      return ret;
+    }
+    sum += ret;
+    ret = writeString(attrIt.second, out);
+    if (ret < 0) {
+      return ret;
+    }
+    sum += ret;
+  }
+  ret = out->write(static_cast<uint64_t>(flowFile->getSize()));
+  if (ret < 0) {
+    return ret;
+  }
+  sum += ret;
+  InputStreamPipe pipe(out);
+  ret = reader_(flowFile, &pipe);
+  if (ret < 0) {
+    return ret;
+  }
+  sum += ret;
+  return sum;
+}
+
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */
diff --git a/extensions/sql/data/WriteCallback.h b/libminifi/src/serialization/PayloadSerializer.cpp
similarity index 69%
copy from extensions/sql/data/WriteCallback.h
copy to libminifi/src/serialization/PayloadSerializer.cpp
index ab961ad..44b0ed4 100644
--- a/extensions/sql/data/WriteCallback.h
+++ b/libminifi/src/serialization/PayloadSerializer.cpp
@@ -16,32 +16,18 @@
  * limitations under the License.
  */
 
-#pragma once
-
-#include <string>
-
-#include "FlowFileRecord.h"
+#include "serialization/PayloadSerializer.h"
+#include "core/ProcessSession.h"
 
 namespace org {
 namespace apache {
 namespace nifi {
 namespace minifi {
 
-class WriteCallback : public OutputStreamCallback {
-public:
-  explicit WriteCallback(const std::string& data)
-    : data_(data) {
-  }
-
- int64_t process(std::shared_ptr<io::BaseStream> stream) {
-    if (data_.empty())
-      return 0;
-
-    return stream->write(reinterpret_cast<uint8_t*>(const_cast<char*>(data_.c_str())), data_.size());
-  }
-
- const std::string& data_;
-};
+int PayloadSerializer::serialize(const std::shared_ptr<core::FlowFile>& flowFile, const std::shared_ptr<io::OutputStream>& out) {
+  InputStreamPipe pipe(out);
+  return reader_(flowFile, &pipe);
+}
 
 } /* namespace minifi */
 } /* namespace nifi */
diff --git a/libminifi/src/utils/ByteArrayCallback.cpp b/libminifi/src/utils/ByteArrayCallback.cpp
index a2abe0f..c4545ec 100644
--- a/libminifi/src/utils/ByteArrayCallback.cpp
+++ b/libminifi/src/utils/ByteArrayCallback.cpp
@@ -26,7 +26,7 @@ namespace nifi {
 namespace minifi {
 namespace utils {
 
-int64_t ByteOutputCallback::process(std::shared_ptr<io::BaseStream> stream) {
+int64_t ByteOutputCallback::process(const std::shared_ptr<io::BaseStream>& stream) {
   stream->seek(0);
   if (stream->size() > 0) {
     std::unique_ptr<char> buffer = std::unique_ptr<char>(new char[stream->size()]);
@@ -37,7 +37,7 @@ int64_t ByteOutputCallback::process(std::shared_ptr<io::BaseStream> stream) {
   return size_.load();
 }
 
-int64_t StreamOutputCallback::process(std::shared_ptr<io::BaseStream> stream) {
+int64_t StreamOutputCallback::process(const std::shared_ptr<io::BaseStream>& stream) {
   stream->seek(0);
   std::unique_ptr<char> buffer = std::unique_ptr<char>(new char[size_.load()]);
   auto written = readFully(buffer.get(), size_);
diff --git a/libminifi/src/utils/FileOutputCallback.cpp b/libminifi/src/utils/FileOutputCallback.cpp
index 182cf09..6dfa565 100644
--- a/libminifi/src/utils/FileOutputCallback.cpp
+++ b/libminifi/src/utils/FileOutputCallback.cpp
@@ -26,7 +26,7 @@ namespace nifi {
 namespace minifi {
 namespace utils {
 
-int64_t FileOutputCallback::process(std::shared_ptr<io::BaseStream> stream) {
+int64_t FileOutputCallback::process(const std::shared_ptr<io::BaseStream>& stream) {
   if (stream->size() > 0) {
     file_stream_.write(reinterpret_cast<char*>(const_cast<uint8_t*>(stream->getBuffer())), stream->size());
     size_ += stream->size();
diff --git a/libminifi/test/BufferReader.h b/libminifi/test/BufferReader.h
index caa0aa8..783c95b 100644
--- a/libminifi/test/BufferReader.h
+++ b/libminifi/test/BufferReader.h
@@ -42,7 +42,7 @@ class BufferReader : public org::apache::nifi::minifi::InputStreamCallback {
     return total_read;
   }
 
-  int64_t process(std::shared_ptr<org::apache::nifi::minifi::io::BaseStream> stream) {
+  int64_t process(const std::shared_ptr<org::apache::nifi::minifi::io::BaseStream>& stream) {
     return write(*stream.get(), stream->size());
   }
 
diff --git a/libminifi/test/archive-tests/CompressContentTests.cpp b/libminifi/test/archive-tests/CompressContentTests.cpp
index 94623b5..cd9c08c 100644
--- a/libminifi/test/archive-tests/CompressContentTests.cpp
+++ b/libminifi/test/archive-tests/CompressContentTests.cpp
@@ -56,7 +56,7 @@ class ReadCallback: public org::apache::nifi::minifi::InputStreamCallback {
     if (archive_buffer_)
       delete[] archive_buffer_;
   }
-  int64_t process(std::shared_ptr<org::apache::nifi::minifi::io::BaseStream> stream) {
+  int64_t process(const std::shared_ptr<org::apache::nifi::minifi::io::BaseStream>& stream) {
     int64_t total_read = 0;
     int64_t ret = 0;
     do {
diff --git a/libminifi/test/archive-tests/MergeFileTests.cpp b/libminifi/test/archive-tests/MergeFileTests.cpp
index 3b1f68a..4f3bdc0 100644
--- a/libminifi/test/archive-tests/MergeFileTests.cpp
+++ b/libminifi/test/archive-tests/MergeFileTests.cpp
@@ -36,6 +36,8 @@
 #include "processors/LogAttribute.h"
 #include "../TestBase.h"
 #include "../unit/ProvenanceTestHelper.h"
+#include "serialization/FlowFileV3Serializer.h"
+#include "serialization/PayloadSerializer.h"
 
 std::string FLOW_FILE;
 std::string EXPECT_MERGE_CONTENT_FIRST;
@@ -93,7 +95,7 @@ class FixedBuffer : public org::apache::nifi::minifi::InputStreamCallback {
     } while (size_ != capacity_);
     return total_read;
   }
-  int64_t process(std::shared_ptr<org::apache::nifi::minifi::io::BaseStream> stream) {
+  int64_t process(const std::shared_ptr<org::apache::nifi::minifi::io::BaseStream>& stream) {
     return write(*stream.get(), capacity_);
   }
 
@@ -845,3 +847,98 @@ TEST_CASE_METHOD(MergeTestController, "Test Merge File Attributes Keeping All Un
   REQUIRE(attributes["tagCommon"] == "common");
   REQUIRE(attributes["mime.type"] == "application/tar");
 }
+
+void writeString(const std::string& str, const std::shared_ptr<minifi::io::BaseStream>& out) {
+  out->write(const_cast<uint8_t*>(reinterpret_cast<const uint8_t*>(str.data())), str.length());
+}
+
+TEST_CASE("FlowFile serialization", "[testFlowFileSerialization]") {
+  MergeTestController testController;
+  auto context = testController.context;
+  auto processor = testController.processor;
+  auto input = testController.input;
+  auto output = testController.output;
+
+  std::string header = "BEGIN{";
+  std::string footer = "}END";
+  std::string demarcator = "_";
+
+  core::ProcessSession session(context);
+
+  minifi::PayloadSerializer payloadSerializer([&] (const std::shared_ptr<core::FlowFile>& ff, minifi::InputStreamCallback* cb) {
+    return session.read(ff, cb);
+  });
+  minifi::FlowFileV3Serializer ffV3Serializer([&] (const std::shared_ptr<core::FlowFile>& ff, minifi::InputStreamCallback* cb) {
+    return session.read(ff, cb);
+  });
+
+  minifi::FlowFileSerializer* usedSerializer;
+
+  std::vector<std::shared_ptr<core::FlowFile>> files;
+
+  for (const auto& content : std::vector<std::string>{"first ff content", "second ff content", "some other data"}) {
+    minifi::io::BufferStream contentStream{reinterpret_cast<const uint8_t*>(content.data()), static_cast<unsigned>(content.length())};
+    auto ff = session.create();
+    ff->removeAttribute(core::SpecialFlowAttribute::FILENAME);
+    ff->addAttribute("one", "banana");
+    ff->addAttribute("two", "seven");
+    session.importFrom(contentStream, ff);
+    session.flushContent();
+    files.push_back(ff);
+    input->put(ff);
+  }
+
+  context->setProperty(processors::MergeContent::MergeStrategy, processors::merge_content_options::MERGE_STRATEGY_BIN_PACK);
+  context->setProperty(processors::MergeContent::DelimiterStrategy, processors::merge_content_options::DELIMITER_STRATEGY_TEXT);
+  context->setProperty(processors::MergeContent::Header, header);
+  context->setProperty(processors::MergeContent::Footer, footer);
+  context->setProperty(processors::MergeContent::Demarcator, demarcator);
+  context->setProperty(processors::BinFiles::MinEntries, "3");
+
+  SECTION("Payload Serializer") {
+    context->setProperty(processors::MergeContent::MergeFormat, processors::merge_content_options::MERGE_FORMAT_CONCAT_VALUE);
+    usedSerializer = &payloadSerializer;
+  }
+  SECTION("FlowFileV3 Serializer") {
+    context->setProperty(processors::MergeContent::MergeFormat, processors::merge_content_options::MERGE_FORMAT_FLOWFILE_STREAM_V3_VALUE);
+    usedSerializer = &ffV3Serializer;
+    // only Binary Concatenation take these into account
+    header = "";
+    demarcator = "";
+    footer = "";
+  }
+
+  auto result = std::make_shared<minifi::io::BufferStream>();
+  writeString(header, result);
+  bool first = true;
+  for (const auto& ff : files) {
+    if (!first) {
+      writeString(demarcator, result);
+    }
+    first = false;
+    usedSerializer->serialize(ff, result);
+  }
+  writeString(footer, result);
+
+  std::string expected{reinterpret_cast<const char*>(result->getBuffer()), result->size()};
+
+  auto factory = std::make_shared<core::ProcessSessionFactory>(context);
+  processor->onSchedule(context, factory);
+  for (int i = 0; i < 3; i++) {
+    auto mergeSession = std::make_shared<core::ProcessSession>(context);
+    processor->onTrigger(context, mergeSession);
+    mergeSession->commit();
+  }
+
+  std::set<std::shared_ptr<core::FlowFile>> expiredFlowRecords;
+  std::shared_ptr<core::FlowFile> flow = output->poll(expiredFlowRecords);
+
+  REQUIRE(expiredFlowRecords.empty());
+  {
+    FixedBuffer callback(flow->getSize());
+    session.read(flow, &callback);
+    REQUIRE(callback.to_string() == expected);
+  }
+
+  LogTestController::getInstance().reset();
+}
diff --git a/libminifi/test/unit/FlowFileSerializationTests.cpp b/libminifi/test/unit/FlowFileSerializationTests.cpp
new file mode 100644
index 0000000..d89e335
--- /dev/null
+++ b/libminifi/test/unit/FlowFileSerializationTests.cpp
@@ -0,0 +1,85 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <algorithm>
+#include <memory>
+#include <string>
+
+#include "io/BaseStream.h"
+#include "serialization/FlowFileV3Serializer.h"
+#include "serialization/PayloadSerializer.h"
+#include "core/FlowFile.h"
+#include "../TestBase.h"
+
+std::shared_ptr<minifi::FlowFileRecord> createEmptyFlowFile() {
+  auto flowFile = std::make_shared<minifi::FlowFileRecord>();
+  flowFile->removeAttribute(core::SpecialFlowAttribute::FILENAME);
+  return flowFile;
+}
+
+TEST_CASE("Payload Serializer", "[testPayload]") {
+  std::string content = "flowFileContent";
+  auto contentStream = std::make_shared<minifi::io::BufferStream>();
+  contentStream->write(const_cast<uint8_t*>(reinterpret_cast<const uint8_t*>(content.data())), content.length());
+
+  auto result = std::make_shared<minifi::io::BufferStream>();
+
+  auto flowFile = createEmptyFlowFile();
+  flowFile->setSize(content.size());
+  flowFile->addAttribute("first", "one");
+  flowFile->addAttribute("second", "two");
+
+  minifi::PayloadSerializer serializer([&] (const std::shared_ptr<core::FlowFile>&, minifi::InputStreamCallback* cb) {
+    return cb->process(contentStream);
+  });
+  serializer.serialize(flowFile, result);
+
+  std::string serialized{reinterpret_cast<const char*>(result->getBuffer()), result->size()};
+
+  REQUIRE(serialized == content);
+}
+
+TEST_CASE("FFv3 Serializer", "[testFFv3]") {
+  std::string content = "flowFileContent";
+  auto contentStream = std::make_shared<minifi::io::BufferStream>();
+  contentStream->write(const_cast<uint8_t*>(reinterpret_cast<const uint8_t*>(content.data())), content.length());
+
+  auto result = std::make_shared<minifi::io::BufferStream>();
+
+  auto flowFile = createEmptyFlowFile();
+  flowFile->setSize(content.size());
+  flowFile->addAttribute("first", "one");
+  flowFile->addAttribute("second", "two");
+
+  minifi::FlowFileV3Serializer serializer([&] (const std::shared_ptr<core::FlowFile>&, minifi::InputStreamCallback* cb) {
+    return cb->process(contentStream);
+  });
+  serializer.serialize(flowFile, result);
+
+  std::string serialized{reinterpret_cast<const char*>(result->getBuffer()), result->size()};
+
+  std::string expected = "NiFiFF3";
+  expected += std::string("\x00\x02", 2);  // number of attributes
+  expected += std::string("\x00\x05", 2) + "first";  // first key
+  expected += std::string("\x00\x03", 2) + "one";  // first value
+  expected += std::string("\x00\x06", 2) + "second";  // second key
+  expected += std::string("\x00\x03", 2) + "two";  // second value
+  expected += std::string("\x00\x00\x00\x00\x00\x00\x00\x0f", 8) + content;  // payload of the flowFile
+
+  REQUIRE(serialized == expected);
+}
+