You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by fg...@apache.org on 2021/06/15 13:42:31 UTC

[nifi-minifi-cpp] 01/02: MINIFICPP-1507 convert InputStream::read to size_t

This is an automated email from the ASF dual-hosted git repository.

fgerlits pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit 13a43e678e445a88726ac7d56b1670e717cf73ba
Author: Marton Szasz <sz...@gmail.com>
AuthorDate: Wed May 26 16:02:06 2021 +0200

    MINIFICPP-1507 convert InputStream::read to size_t
    
    Co-authored-by: adamdebreceni <64...@users.noreply.github.com>
    Co-authored-by: Ferenc Gerlits <fg...@users.noreply.github.com>
    
    Signed-off-by: Ferenc Gerlits <fg...@gmail.com>
    
    This closes #1028
---
 extensions/aws/processors/PutS3Object.h            |  11 +-
 extensions/civetweb/processors/ListenHTTP.h        |  13 +-
 extensions/coap/protocols/CoapC2Protocol.h         |   2 +-
 extensions/http-curl/client/HTTPCallback.h         |   2 +-
 extensions/http-curl/client/HTTPStream.cpp         |   9 +-
 extensions/http-curl/client/HTTPStream.h           |   4 +-
 extensions/http-curl/tests/CivetStream.h           |   8 +-
 extensions/http-curl/tests/HTTPHandlers.h          |  50 +++--
 extensions/jni/jvm/JniReferenceObjects.h           |   3 +-
 extensions/libarchive/CompressContent.h            |  79 ++++----
 extensions/libarchive/FocusArchiveEntry.cpp        |   9 +-
 extensions/librdkafka/PublishKafka.cpp             |   7 +-
 extensions/mqtt/processors/ConvertJSONAck.h        |  13 +-
 extensions/mqtt/processors/ConvertUpdate.cpp       |   2 +-
 extensions/mqtt/processors/PublishMQTT.h           |  23 ++-
 extensions/opc/src/putopc.cpp                      |  15 +-
 extensions/rocksdb-repos/RocksDbStream.cpp         |  35 ++--
 extensions/rocksdb-repos/RocksDbStream.h           |   4 +-
 extensions/script/lua/LuaBaseStream.cpp            |  15 +-
 extensions/script/python/PyBaseStream.cpp          |   8 +-
 extensions/script/python/PyBaseStream.h            |   2 +-
 extensions/sftp/client/SFTPClient.cpp              |  14 +-
 .../processors/ExecuteProcess.cpp                  |  17 +-
 .../standard-processors/processors/ExtractText.cpp |  27 +--
 .../standard-processors/processors/GetTCP.cpp      |  12 +-
 .../standard-processors/processors/LogAttribute.h  |  20 +-
 .../standard-processors/processors/PutFile.cpp     |  25 +--
 .../standard-processors/processors/PutFile.h       |   2 +-
 .../standard-processors/processors/TailFile.cpp    |   2 +-
 extensions/tensorflow/TFApplyGraph.cpp             |  20 +-
 extensions/tensorflow/TFConvertImageToTensor.cpp   |   6 +-
 extensions/tensorflow/TFExtractTopLabels.cpp       |  19 +-
 libminifi/include/io/AtomicEntryStream.h           |  13 +-
 libminifi/include/io/BufferStream.h                |   6 +-
 libminifi/include/io/CRCStream.h                   |   6 +-
 libminifi/include/io/ClientSocket.h                |   4 +-
 libminifi/include/io/DescriptorStream.h            |   4 +-
 libminifi/include/io/FileStream.h                  |   4 +-
 libminifi/include/io/InputStream.h                 |  38 ++--
 libminifi/include/io/Stream.h                      |  13 +-
 libminifi/include/io/StreamPipe.h                  |  12 +-
 libminifi/include/io/tls/SecureDescriptorStream.h  |   4 +-
 libminifi/include/io/tls/TLSSocket.h               |   4 +-
 libminifi/include/provenance/Provenance.h          |  16 +-
 libminifi/include/sitetosite/Peer.h                |   2 +-
 libminifi/include/sitetosite/SiteToSiteClient.h    |  95 ++++------
 libminifi/include/utils/ByteArrayCallback.h        |   7 +-
 libminifi/include/utils/Enum.h                     |   4 +-
 libminifi/src/FlowControlProtocol.cpp              |  22 +--
 libminifi/src/FlowFileRecord.cpp                   |  90 +++++----
 libminifi/src/c2/ControllerSocketProtocol.cpp      |  77 ++++----
 libminifi/src/core/ProcessSession.cpp              |  10 +-
 libminifi/src/core/ProcessSessionReadCallback.cpp  |  13 +-
 libminifi/src/io/BufferStream.cpp                  |  15 +-
 libminifi/src/io/ClientSocket.cpp                  |  21 ++-
 libminifi/src/io/DescriptorStream.cpp              |  15 +-
 libminifi/src/io/FileStream.cpp                    |  24 +--
 libminifi/src/io/InputStream.cpp                   |  58 +++---
 libminifi/src/io/tls/SecureDescriptorStream.cpp    |  44 +++--
 libminifi/src/io/tls/TLSSocket.cpp                 |  40 ++--
 libminifi/src/provenance/Provenance.cpp            | 196 ++++++++++++--------
 libminifi/src/sitetosite/RawSocketProtocol.cpp     | 206 +++++++++++----------
 libminifi/src/sitetosite/SiteToSiteClient.cpp      |  85 ++++-----
 libminifi/src/utils/ByteArrayCallback.cpp          |  10 +-
 libminifi/test/BufferReader.h                      |  12 +-
 .../test/archive-tests/CompressContentTests.cpp    |  45 +++--
 libminifi/test/archive-tests/MergeFileTests.cpp    |  11 +-
 .../test/rocksdb-tests/ContentSessionTests.cpp     |  10 +-
 .../rocksdb-tests/DBContentRepositoryTests.cpp     |  10 +-
 .../test/rocksdb-tests/RocksDBStreamTests.cpp      |   2 +-
 libminifi/test/unit/FileStreamTests.cpp            |  28 +--
 libminifi/test/unit/SiteToSiteHelper.h             |   2 +-
 nanofi/src/api/nanofi.cpp                          |   2 +-
 nanofi/tests/CSite2SiteTests.cpp                   |  13 +-
 74 files changed, 912 insertions(+), 859 deletions(-)

diff --git a/extensions/aws/processors/PutS3Object.h b/extensions/aws/processors/PutS3Object.h
index af983db..fb14d2b 100644
--- a/extensions/aws/processors/PutS3Object.h
+++ b/extensions/aws/processors/PutS3Object.h
@@ -31,6 +31,7 @@
 
 #include "S3Processor.h"
 #include "utils/GeneralUtils.h"
+#include "utils/gsl.h"
 
 template<typename T>
 class S3TestsFixture;
@@ -94,20 +95,20 @@ class PutS3Object : public S3Processor {
       auto data_stream = std::make_shared<std::stringstream>();
       read_size_ = 0;
       while (read_size_ < flow_size_) {
-        auto next_read_size = (std::min)(flow_size_ - read_size_, BUFFER_SIZE);
-        int read_ret = stream->read(buffer, next_read_size);
-        if (read_ret < 0) {
+        const auto next_read_size = (std::min)(flow_size_ - read_size_, BUFFER_SIZE);
+        const auto read_ret = stream->read(buffer, next_read_size);
+        if (io::isError(read_ret)) {
           return -1;
         }
         if (read_ret > 0) {
-          data_stream->write(reinterpret_cast<char*>(buffer.data()), next_read_size);
+          data_stream->write(reinterpret_cast<char*>(buffer.data()), gsl::narrow<std::streamsize>(next_read_size));
           read_size_ += read_ret;
         } else {
           break;
         }
       }
       result_ = s3_wrapper_.putObject(options_, data_stream);
-      return read_size_;
+      return gsl::narrow<int64_t>(read_size_);
     }
 
     uint64_t flow_size_;
diff --git a/extensions/civetweb/processors/ListenHTTP.h b/extensions/civetweb/processors/ListenHTTP.h
index 57cc1dd..de522c3 100644
--- a/extensions/civetweb/processors/ListenHTTP.h
+++ b/extensions/civetweb/processors/ListenHTTP.h
@@ -34,6 +34,7 @@
 #include "core/Resource.h"
 #include "core/logging/LoggerConfiguration.h"
 #include "utils/MinifiConcurrentQueue.h"
+#include "utils/gsl.h"
 
 namespace org {
 namespace apache {
@@ -58,7 +59,7 @@ class ListenHTTP : public core::Processor {
     callbacks_.log_access = &logAccess;
   }
   // Destructor
-  virtual ~ListenHTTP();
+  ~ListenHTTP() override;
   // Processor Name
   static constexpr char const *ProcessorName = "ListenHTTP";
   // Supported Properties
@@ -133,14 +134,12 @@ class ListenHTTP : public core::Processor {
     }
     int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
       out_str_->resize(stream->size());
-      uint64_t num_read = stream->read(reinterpret_cast<uint8_t *>(&(*out_str_)[0]),
-                                           gsl::narrow<int>(stream->size()));
-
+      const auto num_read = stream->read(reinterpret_cast<uint8_t *>(&(*out_str_)[0]), stream->size());
       if (num_read != stream->size()) {
         throw std::runtime_error("GraphReadCallback failed to fully read flow file input stream");
       }
 
-      return num_read;
+      return gsl::narrow<int64_t>(num_read);
     }
 
    private:
@@ -161,11 +160,11 @@ class ListenHTTP : public core::Processor {
     try {
       struct mg_context* ctx = mg_get_context(conn);
       /* CivetServer stores 'this' as the userdata when calling mg_start */
-      CivetServer* server = static_cast<CivetServer*>(mg_get_user_data(ctx));
+      auto* const server = static_cast<CivetServer*>(mg_get_user_data(ctx));
       if (server == nullptr) {
         return 0;
       }
-      std::shared_ptr<logging::Logger>* logger = static_cast<std::shared_ptr<logging::Logger>*>(const_cast<void*>(server->getUserContext()));
+      auto* const logger = static_cast<std::shared_ptr<logging::Logger>*>(const_cast<void*>(server->getUserContext()));
       if (logger == nullptr) {
         return 0;
       }
diff --git a/extensions/coap/protocols/CoapC2Protocol.h b/extensions/coap/protocols/CoapC2Protocol.h
index 03ce24b..c19ffd5 100644
--- a/extensions/coap/protocols/CoapC2Protocol.h
+++ b/extensions/coap/protocols/CoapC2Protocol.h
@@ -49,7 +49,7 @@ namespace coap {
 namespace c2 {
 
 #define REQUIRE_VALID(x) \
-  if (-1 == x) { \
+  if (io::isError(x)) { \
     return minifi::c2::C2Payload(payload.getOperation(), state::UpdateState::READ_ERROR); \
   }
 
diff --git a/extensions/http-curl/client/HTTPCallback.h b/extensions/http-curl/client/HTTPCallback.h
index 3a73cd5..60ed3b3 100644
--- a/extensions/http-curl/client/HTTPCallback.h
+++ b/extensions/http-curl/client/HTTPCallback.h
@@ -84,7 +84,7 @@ class HttpStreamingCallback : public ByteInputCallBack {
 
     if (stream->size() > 0) {
       vec.resize(stream->size());
-      stream->read(reinterpret_cast<uint8_t*>(vec.data()), gsl::narrow<int>(stream->size()));
+      stream->read(reinterpret_cast<uint8_t*>(vec.data()), stream->size());
     }
 
     return processInner(std::move(vec));
diff --git a/extensions/http-curl/client/HTTPStream.cpp b/extensions/http-curl/client/HTTPStream.cpp
index 218d200..9366f91 100644
--- a/extensions/http-curl/client/HTTPStream.cpp
+++ b/extensions/http-curl/client/HTTPStream.cpp
@@ -47,7 +47,7 @@ void HttpStream::close() {
   http_read_callback_.close();
 }
 
-void HttpStream::seek(uint64_t /*offset*/) {
+void HttpStream::seek(size_t /*offset*/) {
   // seek is an unnecessary part of this implementatino
   throw std::logic_error{"HttpStream::seek is unimplemented"};
 }
@@ -77,8 +77,7 @@ int HttpStream::write(const uint8_t *value, int size) {
   }
 }
 
-int HttpStream::read(uint8_t *buf, int buflen) {
-  gsl_Expects(buflen >= 0);
+size_t HttpStream::read(uint8_t *buf, size_t buflen) {
   if (buflen == 0) {
     return 0;
   }
@@ -93,10 +92,10 @@ int HttpStream::read(uint8_t *buf, int buflen) {
         started_ = true;
       }
     }
-    return gsl::narrow<int>(http_read_callback_.readFully(reinterpret_cast<char*>(buf), buflen));
+    return http_read_callback_.readFully(reinterpret_cast<char*>(buf), buflen);
 
   } else {
-    return -1;
+    return STREAM_ERROR;
   }
 }
 
diff --git a/extensions/http-curl/client/HTTPStream.h b/extensions/http-curl/client/HTTPStream.h
index b034252..b23d515 100644
--- a/extensions/http-curl/client/HTTPStream.h
+++ b/extensions/http-curl/client/HTTPStream.h
@@ -76,7 +76,7 @@ class HttpStream : public io::BaseStream {
    * Skip to the specified offset.
    * @param offset offset to which we will skip
    */
-  void seek(uint64_t offset) override;
+  void seek(size_t offset) override;
 
   size_t size() const override {
     return written;
@@ -90,7 +90,7 @@ class HttpStream : public io::BaseStream {
    * @param buf buffer in which we extract data
    * @param buflen
    */
-  int read(uint8_t *buf, int buflen) override;
+  size_t read(uint8_t *buf, size_t buflen) override;
 
   /**
    * writes value to stream
diff --git a/extensions/http-curl/tests/CivetStream.h b/extensions/http-curl/tests/CivetStream.h
index ce1c15d..d6e66e7 100644
--- a/extensions/http-curl/tests/CivetStream.h
+++ b/extensions/http-curl/tests/CivetStream.h
@@ -21,6 +21,8 @@
 
 #include "io/BaseStream.h"
 #include "civetweb.h"
+#include "utils/gsl.h"
+
 namespace org {
 namespace apache {
 namespace nifi {
@@ -42,8 +44,10 @@ class CivetStream : public io::InputStream {
    * @param buf buffer in which we extract data
    * @param buflen
    */
-  int read(uint8_t *buf, int buflen) override {
-    return mg_read(conn, buf, buflen);
+  size_t read(uint8_t *buf, size_t buflen) override {
+    const auto ret = mg_read(conn, buf, buflen);
+    if (ret < 0) return STREAM_ERROR;
+    return gsl::narrow<size_t>(ret);
   }
 
  protected:
diff --git a/extensions/http-curl/tests/HTTPHandlers.h b/extensions/http-curl/tests/HTTPHandlers.h
index 6f71ade..890acb7 100644
--- a/extensions/http-curl/tests/HTTPHandlers.h
+++ b/extensions/http-curl/tests/HTTPHandlers.h
@@ -213,37 +213,51 @@ class FlowFileResponder : public ServerAwareHandler {
     if (!wrong_uri) {
       minifi::io::CivetStream civet_stream(conn);
       minifi::io::CRCStream < minifi::io::CivetStream > stream(gsl::make_not_null(&civet_stream));
-      uint32_t num_attributes;
-      int read;
+      uint32_t num_attributes = 0;
       uint64_t total_size = 0;
-      read = stream.read(num_attributes);
-      if (!isServerRunning())return false;
-      assert(read > 0); total_size += read;
+      {
+        const auto read = stream.read(num_attributes);
+        if (!isServerRunning()) return false;
+        assert(read > 0);
+        total_size += read;
+      }
 
-      auto flow = std::make_shared<FlowObj>();
+      const auto flow = std::make_shared<FlowObj>();
 
       for (uint32_t i = 0; i < num_attributes; i++) {
         std::string name, value;
-        read = stream.read(name, true);
-        if (!isServerRunning())return false;
-        assert(read > 0); total_size += read;
-        read = stream.read(value, true);
-        if (!isServerRunning())return false;
-        assert(read > 0); total_size += read;
+        {
+          const auto read = stream.read(name, true);
+          if (!isServerRunning()) return false;
+          assert(read > 0);
+          total_size += read;
+        }
+        {
+          const auto read = stream.read(value, true);
+          if (!isServerRunning()) return false;
+          assert(read > 0);
+          total_size += read;
+        }
         flow->attributes[name] = value;
       }
       uint64_t length;
-      read = stream.read(length);
-      if (!isServerRunning())return false;
-      assert(read > 0); total_size += read;
+      {
+        const auto read = stream.read(length);
+        if (!isServerRunning()) return false;
+        assert(read > 0);
+        total_size += read;
+      }
 
       total_size += length;
       flow->data.resize(gsl::narrow<size_t>(length));
       flow->total_size = total_size;
 
-      read = stream.read(flow->data.data(), gsl::narrow<int>(length));
-      if (!isServerRunning())return false;
-      assert(read == gsl::narrow<int>(length));
+      {
+        const auto read = stream.read(flow->data.data(), length);
+        if (!isServerRunning()) return false;
+        (void)read;
+        assert(read == length);
+      }
 
       if (!invalid_checksum) {
         site2site_rest_resp = std::to_string(stream.getCRC());
diff --git a/extensions/jni/jvm/JniReferenceObjects.h b/extensions/jni/jvm/JniReferenceObjects.h
index a55ffcc..5f994bb 100644
--- a/extensions/jni/jvm/JniReferenceObjects.h
+++ b/extensions/jni/jvm/JniReferenceObjects.h
@@ -139,7 +139,8 @@ class JniByteInputStream : public minifi::InputStreamCallback {
     int writtenOffset = 0;
     int read = 0;
     do {
-      int actual = stream_->read(buffer_, std::min(remaining, buffer_size_));
+      // JNI takes size as int, there's not much we can do here to support 2GB+ sizes
+      int actual = static_cast<int>(stream_->read(buffer_, std::min(remaining, buffer_size_)));
       if (actual <= 0) {
         if (read == 0) {
           stream_ = nullptr;
diff --git a/extensions/libarchive/CompressContent.h b/extensions/libarchive/CompressContent.h
index 17c406b..9e06aa6 100644
--- a/extensions/libarchive/CompressContent.h
+++ b/extensions/libarchive/CompressContent.h
@@ -21,6 +21,7 @@
 #define __COMPRESS_CONTENT_H__
 
 #include <cinttypes>
+#include <utility>
 
 #include "archive_entry.h"
 #include "archive.h"
@@ -43,7 +44,7 @@ namespace minifi {
 namespace processors {
 
 // CompressContent Class
-class CompressContent: public core::Processor {
+class CompressContent : public core::Processor {
 public:
   // Constructor
   /*!
@@ -56,7 +57,7 @@ public:
     , encapsulateInTar_(false) {
   }
   // Destructor
-  virtual ~CompressContent() = default;
+  ~CompressContent() override = default;
   // Processor Name
   static constexpr char const* ProcessorName = "CompressContent";
   // Supported Properties
@@ -94,8 +95,8 @@ public:
     ReadCallbackCompress(std::shared_ptr<core::FlowFile> &flow, struct archive *arch, struct archive_entry *entry) :
         flow_(flow), arch_(arch), entry_(entry), status_(0), logger_(logging::LoggerFactory<CompressContent>::getLogger()) {
     }
-    ~ReadCallbackCompress() = default;
-    int64_t process(const std::shared_ptr<io::BaseStream>& stream) {
+    ~ReadCallbackCompress() override = default;
+    int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
       uint8_t buffer[4096U];
       int64_t ret = 0;
       uint64_t read_size = 0;
@@ -107,24 +108,24 @@ public:
         return -1;
       }
       while (read_size < flow_->getSize()) {
-        ret = stream->read(buffer, sizeof(buffer));
-        if (ret < 0) {
+        const auto readret = stream->read(buffer, sizeof(buffer));
+        if (io::isError(readret)) {
           status_ = -1;
           return -1;
         }
-        if (ret > 0) {
-          ret = archive_write_data(arch_, buffer, gsl::narrow<size_t>(ret));
+        if (readret > 0) {
+          ret = archive_write_data(arch_, buffer, readret);
           if (ret < 0) {
             logger_->log_error("Compress Content archive error %s", archive_error_string(arch_));
             status_ = -1;
             return -1;
           }
-          read_size += ret;
+          read_size += gsl::narrow<uint64_t>(ret);
         } else {
           break;
         }
       }
-      return read_size;
+      return gsl::narrow<int64_t>(read_size);
     }
     std::shared_ptr<core::FlowFile> flow_;
     struct archive *arch_;
@@ -133,28 +134,24 @@ public:
     std::shared_ptr<logging::Logger> logger_;
   };
   // Nest Callback Class for read stream from flow for decompress
-  class ReadCallbackDecompress: public InputStreamCallback {
-  public:
-    ReadCallbackDecompress(const std::shared_ptr<core::FlowFile> &flow) :
-        read_size_(0), offset_(0), flow_(flow) {
-      origin_offset_ = flow_->getOffset();
+  struct ReadCallbackDecompress : InputStreamCallback {
+    explicit ReadCallbackDecompress(std::shared_ptr<core::FlowFile> flow) :
+        flow_file(std::move(flow)) {
     }
-    ~ReadCallbackDecompress() = default;
-    int64_t process(const std::shared_ptr<io::BaseStream>& stream) {
-      read_size_ = 0;
-      stream->seek(offset_);
-      int readRet = stream->read(buffer_, sizeof(buffer_));
-      read_size_ = readRet;
-      if (readRet > 0) {
-        offset_ += read_size_;
+    ~ReadCallbackDecompress() override = default;
+    int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
+      stream->seek(offset);
+      const auto readRet = stream->read(buffer, sizeof(buffer));
+      stream_read_result = readRet;
+      if (!io::isError(readRet)) {
+        offset += readRet;
       }
-      return readRet;
+      return gsl::narrow<int64_t>(readRet);
     }
-    int64_t read_size_;
-    uint8_t buffer_[8192];
-    uint64_t offset_;
-    uint64_t origin_offset_;
-    std::shared_ptr<core::FlowFile> flow_;
+    size_t stream_read_result = 0;  // read size or error code, to be checked with io::isError
+    uint8_t buffer[8192] = {0};
+    size_t offset = 0;
+    std::shared_ptr<core::FlowFile> flow_file;
   };
   // Nest Callback Class for write stream
   class WriteCallback: public OutputStreamCallback {
@@ -190,16 +187,15 @@ public:
       return ret;
     }
 
-    static la_ssize_t archive_read(struct archive *arch, void *context, const void **buff) {
-      WriteCallback *callback = (WriteCallback *) context;
+    static la_ssize_t archive_read(struct archive* archive, void *context, const void **buff) {
+      auto *callback = (WriteCallback *) context;
       callback->session_->read(callback->flow_, &callback->readDecompressCb_);
-      if (callback->readDecompressCb_.read_size_ >= 0) {
-        *buff = callback->readDecompressCb_.buffer_;
-        return gsl::narrow<la_ssize_t>(callback->readDecompressCb_.read_size_);
-      } else {
-        archive_set_error(arch, EIO, "Error reading flowfile");
+      *buff = callback->readDecompressCb_.buffer;
+      if (io::isError(callback->readDecompressCb_.stream_read_result)) {
+        archive_set_error(archive, EIO, "Error reading flowfile");
         return -1;
       }
+      return gsl::narrow<la_ssize_t>(callback->readDecompressCb_.stream_read_result);
     }
 
     static la_int64_t archive_skip(struct archive* /*a*/, void* /*client_data*/, la_int64_t /*request*/) {
@@ -383,13 +379,14 @@ public:
           std::vector<uint8_t> buffer(16 * 1024U);
           int64_t read_size = 0;
           while (read_size < gsl::narrow<int64_t>(writer_.flow_->getSize())) {
-            int ret = inputStream->read(buffer.data(), gsl::narrow<int>(buffer.size()));
-            if (ret < 0) {
+            const auto ret = inputStream->read(buffer.data(), buffer.size());
+            if (io::isError(ret)) {
               return -1;
             } else if (ret == 0) {
               break;
             } else {
-              if (outputStream_->write(buffer.data(), ret) != ret) {
+              const auto writeret = outputStream_->write(buffer.data(), ret);
+              if (io::isError(writeret) || gsl::narrow<size_t>(writeret) != ret) {
                 return -1;
               }
               read_size += ret;
@@ -414,7 +411,7 @@ public:
 
       success_ = filterStream->isFinished();
 
-      return flow_->getSize();
+      return gsl::narrow<int64_t>(flow_->getSize());
     }
   };
 
@@ -444,7 +441,7 @@ private:
   }
 
   std::shared_ptr<logging::Logger> logger_;
-  int compressLevel_;
+  int compressLevel_{};
   CompressionMode compressMode_;
   ExtendedCompressionFormat compressFormat_;
   bool updateFileName_;
diff --git a/extensions/libarchive/FocusArchiveEntry.cpp b/extensions/libarchive/FocusArchiveEntry.cpp
index 7a91e2c..04bdb2c 100644
--- a/extensions/libarchive/FocusArchiveEntry.cpp
+++ b/extensions/libarchive/FocusArchiveEntry.cpp
@@ -34,6 +34,7 @@
 #include "core/ProcessContext.h"
 #include "core/ProcessSession.h"
 #include "Exception.h"
+#include "utils/gsl.h"
 
 namespace org {
 namespace apache {
@@ -149,20 +150,20 @@ typedef struct {
 la_ssize_t FocusArchiveEntry::ReadCallback::read_cb(struct archive * a, void *d, const void **buf) {
   auto data = static_cast<FocusArchiveEntryReadData *>(d);
   *buf = data->buf;
-  int read = 0;
-  int last_read = 0;
+  size_t read = 0;
+  size_t last_read = 0;
 
   do {
     last_read = data->stream->read(reinterpret_cast<uint8_t *>(data->buf), 8196 - read);
     read += last_read;
-  } while (data->processor->isRunning() && last_read > 0 && read < 8196);
+  } while (data->processor->isRunning() && last_read > 0 && !io::isError(last_read) && read < 8196);
 
   if (!data->processor->isRunning()) {
     archive_set_error(a, EINTR, "Processor shut down during read");
     return -1;
   }
 
-  return read;
+  return gsl::narrow<la_ssize_t>(read);
 }
 
 int64_t FocusArchiveEntry::ReadCallback::process(const std::shared_ptr<io::BaseStream>& stream) {
diff --git a/extensions/librdkafka/PublishKafka.cpp b/extensions/librdkafka/PublishKafka.cpp
index b83d028..dd7bac8 100644
--- a/extensions/librdkafka/PublishKafka.cpp
+++ b/extensions/librdkafka/PublishKafka.cpp
@@ -409,14 +409,13 @@ class ReadCallback : public InputStreamCallback {
     }
 
     for (size_t segment_num = 0; read_size_ < flow_size_; ++segment_num) {
-      const int readRet = stream->read(buffer.data(), gsl::narrow<int>(buffer.size()));
-      if (readRet < 0) {
+      const auto readRet = stream->read(buffer.data(), buffer.size());
+      if (io::isError(readRet)) {
         status_ = -1;
         error_ = "Failed to read from stream";
         return read_size_;
       }
-
-      if (readRet <= 0) { break; }
+      if (readRet == 0) { break; }
 
       const auto err = produce(segment_num, buffer, readRet);
       if (err) {
diff --git a/extensions/mqtt/processors/ConvertJSONAck.h b/extensions/mqtt/processors/ConvertJSONAck.h
index 47f53d0..6c9d3cd 100644
--- a/extensions/mqtt/processors/ConvertJSONAck.h
+++ b/extensions/mqtt/processors/ConvertJSONAck.h
@@ -31,6 +31,8 @@
 #include "MQTTClient.h"
 #include "c2/protocols/RESTProtocol.h"
 #include "ConvertBase.h"
+#include "utils/gsl.h"
+
 namespace org {
 namespace apache {
 namespace nifi {
@@ -52,7 +54,7 @@ class ConvertJSONAck : public ConvertBase {
         logger_(logging::LoggerFactory<ConvertJSONAck>::getLogger()) {
   }
   // Destructor
-  virtual ~ConvertJSONAck() = default;
+  ~ConvertJSONAck() override = default;
   // Processor Name
   static constexpr char const* ProcessorName = "ConvertJSONAck";
 
@@ -72,14 +74,13 @@ class ConvertJSONAck : public ConvertBase {
   class ReadCallback : public InputStreamCallback {
    public:
     ReadCallback() = default;
-    ~ReadCallback() = default;
-    int64_t process(const std::shared_ptr<io::BaseStream>& stream) {
-      int64_t ret = 0;
+    ~ReadCallback() override = default;
+    int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
       if (nullptr == stream)
         return 0;
       buffer_.resize(stream->size());
-      ret = stream->read(reinterpret_cast<uint8_t*>(buffer_.data()), stream->size());
-      return ret;
+      const auto ret = stream->read(reinterpret_cast<uint8_t*>(buffer_.data()), stream->size());
+      return !io::isError(ret) ? gsl::narrow<int64_t>(ret) : -1;
     }
     std::vector<char> buffer_;
   };
diff --git a/extensions/mqtt/processors/ConvertUpdate.cpp b/extensions/mqtt/processors/ConvertUpdate.cpp
index 3460de9..9f3b456 100644
--- a/extensions/mqtt/processors/ConvertUpdate.cpp
+++ b/extensions/mqtt/processors/ConvertUpdate.cpp
@@ -37,7 +37,7 @@ void ConvertUpdate::onTrigger(const std::shared_ptr<core::ProcessContext> &conte
   bool received_update = false;
   while (mqtt_service_->get(100, listening_topic, update)) {
     // first we have the input topic string followed by the update URI
-    if (update.size() > 0) {
+    if (!update.empty()) {
       io::BufferStream stream(update.data(), update.size());
 
       std::string returnTopic, url;
diff --git a/extensions/mqtt/processors/PublishMQTT.h b/extensions/mqtt/processors/PublishMQTT.h
index b140c74..04368c7 100644
--- a/extensions/mqtt/processors/PublishMQTT.h
+++ b/extensions/mqtt/processors/PublishMQTT.h
@@ -20,6 +20,8 @@
 #ifndef __PUBLISH_MQTT_H__
 #define __PUBLISH_MQTT_H__
 
+#include <limits>
+
 #include "FlowFileRecord.h"
 #include "core/Processor.h"
 #include "core/ProcessSession.h"
@@ -29,6 +31,7 @@
 #include "core/logging/LoggerConfiguration.h"
 #include "MQTTClient.h"
 #include "AbstractMQTTProcessor.h"
+#include "utils/gsl.h"
 
 namespace org {
 namespace apache {
@@ -50,7 +53,7 @@ class PublishMQTT : public processors::AbstractMQTTProcessor {
     max_seg_size_ = ULLONG_MAX;
   }
   // Destructor
-  virtual ~PublishMQTT() = default;
+  ~PublishMQTT() override = default;
   // Processor Name
   static constexpr char const* ProcessorName = "PublishMQTT";
   // Supported Properties
@@ -74,35 +77,37 @@ class PublishMQTT : public processors::AbstractMQTTProcessor {
       status_ = 0;
       read_size_ = 0;
     }
-    ~ReadCallback() = default;
-    int64_t process(const std::shared_ptr<io::BaseStream>& stream) {
+    ~ReadCallback() override = default;
+    int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
       if (flow_size_ < max_seg_size_)
         max_seg_size_ = flow_size_;
+      gsl_Expects(max_seg_size_ < gsl::narrow<uint64_t>(std::numeric_limits<int>::max()));
       std::vector<unsigned char> buffer(max_seg_size_);
       read_size_ = 0;
       status_ = 0;
       while (read_size_ < flow_size_) {
-        int readRet = stream->read(&buffer[0], max_seg_size_);
-        if (readRet < 0) {
+        // MQTTClient_message::payloadlen is int, so we can't handle 2GB+
+        const auto readRet = stream->read(&buffer[0], max_seg_size_);
+        if (io::isError(readRet)) {
           status_ = -1;
-          return read_size_;
+          return gsl::narrow<int64_t>(read_size_);
         }
         if (readRet > 0) {
           MQTTClient_message pubmsg = MQTTClient_message_initializer;
           pubmsg.payload = &buffer[0];
-          pubmsg.payloadlen = readRet;
+          pubmsg.payloadlen = gsl::narrow<int>(readRet);
           pubmsg.qos = qos_;
           pubmsg.retained = retain_;
           if (MQTTClient_publishMessage(client_, key_.c_str(), &pubmsg, &token_) != MQTTCLIENT_SUCCESS) {
             status_ = -1;
             return -1;
           }
-          read_size_ += readRet;
+          read_size_ += gsl::narrow<size_t>(readRet);
         } else {
           break;
         }
       }
-      return read_size_;
+      return gsl::narrow<int64_t>(read_size_);
     }
     uint64_t flow_size_;
     uint64_t max_seg_size_;
diff --git a/extensions/opc/src/putopc.cpp b/extensions/opc/src/putopc.cpp
index ef1be63..2e09369 100644
--- a/extensions/opc/src/putopc.cpp
+++ b/extensions/opc/src/putopc.cpp
@@ -37,6 +37,7 @@
 #include "core/logging/LoggerConfiguration.h"
 #include "utils/Id.h"
 #include "utils/StringUtils.h"
+#include "utils/gsl.h"
 
 namespace org {
 namespace apache {
@@ -427,21 +428,15 @@ namespace processors {
     uint64_t size = 0;
 
     do {
-      int read = stream->read(buf_.data() + size, 1024);
-
-      if (read < 0) {
-        return -1;
-      }
-
-      if (read == 0) {
-        break;
-      }
+      const auto read = stream->read(buf_.data() + size, 1024);
+      if (io::isError(read)) return -1;
+      if (read == 0) break;
       size += read;
     } while (size < stream->size());
 
     logger_->log_trace("Read %llu bytes from flowfile content to buffer", stream->size());
 
-    return size;
+    return gsl::narrow<int64_t>(size);
   }
 
 } /* namespace processors */
diff --git a/extensions/rocksdb-repos/RocksDbStream.cpp b/extensions/rocksdb-repos/RocksDbStream.cpp
index 86daa20..735692b 100644
--- a/extensions/rocksdb-repos/RocksDbStream.cpp
+++ b/extensions/rocksdb-repos/RocksDbStream.cpp
@@ -47,7 +47,7 @@ RocksDbStream::RocksDbStream(std::string path, gsl::not_null<minifi::internal::R
 void RocksDbStream::close() {
 }
 
-void RocksDbStream::seek(uint64_t /*offset*/) {
+void RocksDbStream::seek(size_t /*offset*/) {
   // noop
 }
 
@@ -84,28 +84,17 @@ int RocksDbStream::write(const uint8_t *value, int size) {
   }
 }
 
-int RocksDbStream::read(uint8_t *buf, int buflen) {
-  gsl_Expects(buflen >= 0);
-  if (!exists_) {
-    return -1;
-  }
-  if (buflen == 0) {
-    return 0;
-  }
-  if (!IsNullOrEmpty(buf)) {
-    size_t amtToRead = gsl::narrow<size_t>(buflen);
-    if (offset_ >= value_.size()) {
-      return 0;
-    }
-    if (amtToRead > value_.size() - offset_) {
-      amtToRead = value_.size() - offset_;
-    }
-    std::memcpy(buf, value_.data() + offset_, amtToRead);
-    offset_ += amtToRead;
-    return gsl::narrow<int>(amtToRead);
-  } else {
-    return -1;
-  }
+size_t RocksDbStream::read(uint8_t *buf, size_t buflen) {
+  // The check have to be in this order for RocksDBStreamTest "Read zero bytes" to succeed
+  if (!exists_) return STREAM_ERROR;
+  if (buflen == 0) return 0;
+  if (IsNullOrEmpty(buf)) return STREAM_ERROR;
+  if (offset_ >= value_.size()) return 0;
+
+  const auto amtToRead = std::min(buflen, value_.size() - offset_);
+  std::memcpy(buf, value_.data() + offset_, amtToRead);
+  offset_ += amtToRead;
+  return amtToRead;
 }
 
 } /* namespace io */
diff --git a/extensions/rocksdb-repos/RocksDbStream.h b/extensions/rocksdb-repos/RocksDbStream.h
index 6b4a525..cf4946f 100644
--- a/extensions/rocksdb-repos/RocksDbStream.h
+++ b/extensions/rocksdb-repos/RocksDbStream.h
@@ -56,7 +56,7 @@ class RocksDbStream : public io::BaseStream {
    * Skip to the specified offset.
    * @param offset offset to which we will skip
    */
-  void seek(uint64_t offset) override;
+  void seek(size_t offset) override;
 
   size_t size() const override {
     return size_;
@@ -70,7 +70,7 @@ class RocksDbStream : public io::BaseStream {
    * @param buf buffer in which we extract data
    * @param buflen
    */
-  int read(uint8_t *buf, int buflen) override;
+  size_t read(uint8_t *buf, size_t buflen) override;
 
   /**
    * writes value to stream
diff --git a/extensions/script/lua/LuaBaseStream.cpp b/extensions/script/lua/LuaBaseStream.cpp
index 1387fae..3545c77 100644
--- a/extensions/script/lua/LuaBaseStream.cpp
+++ b/extensions/script/lua/LuaBaseStream.cpp
@@ -39,7 +39,7 @@ std::string LuaBaseStream::read(size_t len) {
   }
 
   if (len <= 0) {
-    return nullptr;
+    return std::string{};
   }
 
   std::string buffer;
@@ -52,16 +52,11 @@ std::string LuaBaseStream::read(size_t len) {
   //     0 <= n < s.size()."
   //
   // http://www.open-std.org/jtc1/sc22/wg21/docs/papers/2012/n3337.pdf
-  auto read = stream_->read(reinterpret_cast<uint8_t *>(&buffer[0]), static_cast<int>(len));
-  if (read < 0) {
-    return nullptr;
+  const auto read = stream_->read(reinterpret_cast<uint8_t *>(&buffer[0]), len);
+  if (!io::isError(read) && read != len) {
+    buffer.resize(read);
   }
-
-  if (gsl::narrow<size_t>(read) != len) {
-    buffer.resize(gsl::narrow<size_t>(read));
-  }
-
-  return buffer;
+  return io::isError(read) ? std::string{} : buffer;
 }
 
 size_t LuaBaseStream::write(std::string buf) {
diff --git a/extensions/script/python/PyBaseStream.cpp b/extensions/script/python/PyBaseStream.cpp
index fcde480..5e583f7 100644
--- a/extensions/script/python/PyBaseStream.cpp
+++ b/extensions/script/python/PyBaseStream.cpp
@@ -49,13 +49,11 @@ py::bytes PyBaseStream::read(size_t len) {
 
   std::vector<uint8_t> buffer(len);
 
-  auto read = stream_->read(buffer.data(), static_cast<int>(len));
-  auto result = py::bytes(reinterpret_cast<char *>(buffer.data()), static_cast<size_t>(read));
-
-  return result;
+  const auto read = stream_->read(buffer.data(), len);
+  return py::bytes(reinterpret_cast<char *>(buffer.data()), read);
 }
 
-size_t PyBaseStream::write(py::bytes buf) {
+size_t PyBaseStream::write(const py::bytes& buf) {
   const auto &&buf_str = buf.operator std::string();
   return static_cast<size_t>(stream_->write(reinterpret_cast<uint8_t *>(const_cast<char *>(buf_str.data())),
                                                 static_cast<int>(buf_str.length())));
diff --git a/extensions/script/python/PyBaseStream.h b/extensions/script/python/PyBaseStream.h
index f07fe80..1f11065 100644
--- a/extensions/script/python/PyBaseStream.h
+++ b/extensions/script/python/PyBaseStream.h
@@ -37,7 +37,7 @@ class PyBaseStream {
 
   py::bytes read();
   py::bytes read(size_t len = 0);
-  size_t write(py::bytes buf);
+  size_t write(const py::bytes& buf);
 
  private:
   std::shared_ptr<io::BaseStream> stream_;
diff --git a/extensions/sftp/client/SFTPClient.cpp b/extensions/sftp/client/SFTPClient.cpp
index e657d2e..7af066d 100644
--- a/extensions/sftp/client/SFTPClient.cpp
+++ b/extensions/sftp/client/SFTPClient.cpp
@@ -562,12 +562,12 @@ bool SFTPClient::putFile(const std::string& path, io::BaseStream& input, bool ov
     return true;
   }
 
-  const size_t buf_size = expected_size < 0 ? MAX_BUFFER_SIZE : std::min<size_t>(expected_size, MAX_BUFFER_SIZE);
+  const size_t buf_size = expected_size < 0 ? MAX_BUFFER_SIZE : std::min(gsl::narrow<size_t>(expected_size), MAX_BUFFER_SIZE);
   std::vector<uint8_t> buf(buf_size);
   uint64_t total_read = 0U;
   do {
-    int read_ret = input.read(buf.data(), buf.size());
-    if (read_ret < 0) {
+    const auto read_ret = input.read(buf.data(), buf.size());
+    if (io::isError(read_ret)) {
       last_error_.setLibssh2Error(LIBSSH2_FX_OK);
       logger_->log_error("Error while reading input");
       return false;
@@ -577,20 +577,20 @@ bool SFTPClient::putFile(const std::string& path, io::BaseStream& input, bool ov
     }
     logger_->log_trace("Read %d bytes", read_ret);
     total_read += read_ret;
-    ssize_t remaining = read_ret;
+    auto remaining = read_ret;
     while (remaining > 0) {
-      int write_ret = libssh2_sftp_write(file_handle, reinterpret_cast<char*>(buf.data() + (read_ret - remaining)), remaining);
+      const auto write_ret = libssh2_sftp_write(file_handle, reinterpret_cast<char*>(buf.data() + (read_ret - remaining)), remaining);
       if (write_ret < 0) {
         last_error_.setSftpError(SFTPError::IoError);
         logger_->log_error("Failed to write remote file \"%s\"", path.c_str());
         return false;
       }
       logger_->log_trace("Wrote %d bytes to remote file \"%s\"", write_ret, path.c_str());
-      remaining -= write_ret;
+      remaining -= gsl::narrow<size_t>(write_ret);
     }
   } while (true);
 
-  if (expected_size >= 0 && total_read != gsl::narrow<uint64_t>(expected_size)) {
+  if (expected_size >= 0 && total_read != gsl::narrow<size_t>(expected_size)) {
     last_error_.setLibssh2Error(LIBSSH2_FX_OK);
     logger_->log_error("Input has unexpected size, expected: %ld, actual: %lu", path.c_str(), expected_size, total_read);
     return false;
diff --git a/extensions/standard-processors/processors/ExecuteProcess.cpp b/extensions/standard-processors/processors/ExecuteProcess.cpp
index 9350920..902d369 100644
--- a/extensions/standard-processors/processors/ExecuteProcess.cpp
+++ b/extensions/standard-processors/processors/ExecuteProcess.cpp
@@ -27,6 +27,7 @@
 #include "utils/StringUtils.h"
 #include "utils/TimeUtil.h"
 #include "core/TypedValues.h"
+#include "utils/gsl.h"
 
 #if defined(__clang__)
 #pragma clang diagnostic push
@@ -160,11 +161,11 @@ void ExecuteProcess::onTrigger(core::ProcessContext *context, core::ProcessSessi
           while (1) {
             std::this_thread::sleep_for(std::chrono::milliseconds(_batchDuration));
             char buffer[4096];
-            int numRead = read(_pipefd[0], buffer, sizeof(buffer));
+            const auto  numRead = read(_pipefd[0], buffer, sizeof(buffer));
             if (numRead <= 0)
               break;
-            logger_->log_debug("Execute Command Respond %d", numRead);
-            ExecuteProcess::WriteCallback callback(buffer, numRead);
+            logger_->log_debug("Execute Command Respond %zd", numRead);
+            ExecuteProcess::WriteCallback callback(buffer, gsl::narrow<uint64_t>(numRead));
             auto flowFile = session->create();
             if (!flowFile)
               continue;
@@ -177,13 +178,13 @@ void ExecuteProcess::onTrigger(core::ProcessContext *context, core::ProcessSessi
         } else {
           char buffer[4096];
           char *bufPtr = buffer;
-          int totalRead = 0;
+          size_t totalRead = 0;
           std::shared_ptr<core::FlowFile> flowFile = nullptr;
           while (true) {
-            int numRead = read(_pipefd[0], bufPtr, (sizeof(buffer) - totalRead));
+            const auto numRead = read(_pipefd[0], bufPtr, (sizeof(buffer) - totalRead));
             if (numRead <= 0) {
               if (totalRead > 0) {
-                logger_->log_debug("Execute Command Respond %d", totalRead);
+                logger_->log_debug("Execute Command Respond %zu", totalRead);
                 // child exits and close the pipe
                 ExecuteProcess::WriteCallback callback(buffer, totalRead);
                 if (!flowFile) {
@@ -200,9 +201,9 @@ void ExecuteProcess::onTrigger(core::ProcessContext *context, core::ProcessSessi
               }
               break;
             } else {
-              if (numRead == static_cast<int>((sizeof(buffer) - totalRead))) {
+              if (numRead == static_cast<ssize_t>((sizeof(buffer) - totalRead))) {
                 // we reach the max buffer size
-                logger_->log_debug("Execute Command Max Respond %d", sizeof(buffer));
+                logger_->log_debug("Execute Command Max Respond %zu", sizeof(buffer));
                 ExecuteProcess::WriteCallback callback(buffer, sizeof(buffer));
                 if (!flowFile) {
                   flowFile = session->create();
diff --git a/extensions/standard-processors/processors/ExtractText.cpp b/extensions/standard-processors/processors/ExtractText.cpp
index 345b84a..23e0fae 100644
--- a/extensions/standard-processors/processors/ExtractText.cpp
+++ b/extensions/standard-processors/processors/ExtractText.cpp
@@ -32,8 +32,8 @@
 #include "core/ProcessContext.h"
 #include "core/ProcessSession.h"
 #include "core/FlowFile.h"
-
 #include "utils/RegexUtils.h"
+#include "utils/gsl.h"
 
 namespace org {
 namespace apache {
@@ -113,10 +113,9 @@ void ExtractText::onTrigger(core::ProcessContext *context, core::ProcessSession
 }
 
 int64_t ExtractText::ReadCallback::process(const std::shared_ptr<io::BaseStream>& stream) {
-  int64_t ret = 0;
-  uint64_t read_size = 0;
+  size_t read_size = 0;
   bool regex_mode;
-  uint64_t size_limit = flowFile_->getSize();
+  size_t size_limit = flowFile_->getSize();
 
   std::string attrKey, sizeLimitStr;
   ctx_->getProperty(Attribute.getName(), attrKey);
@@ -126,22 +125,22 @@ int64_t ExtractText::ReadCallback::process(const std::shared_ptr<io::BaseStream>
   if (sizeLimitStr.empty())
     size_limit = DEFAULT_SIZE_LIMIT;
   else if (sizeLimitStr != "0")
-    size_limit = std::stoi(sizeLimitStr);
+    size_limit = static_cast<size_t>(std::stoi(sizeLimitStr));
 
   std::ostringstream contentStream;
 
   while (read_size < size_limit) {
     // Don't read more than config limit or the size of the buffer
-    int length = gsl::narrow<int>(std::min<uint64_t>(size_limit - read_size, buffer_.size()));
-    ret = stream->read(buffer_, length);
+    const auto length = std::min(size_limit - read_size, buffer_.size());
+    const auto ret = stream->read(buffer_, length);
 
-    if (ret < 0) {
+    if (io::isError(ret)) {
       return -1;  // Stream error
     } else if (ret == 0) {
       break;  // End of stream, no more data
     }
 
-    contentStream.write(reinterpret_cast<const char*>(buffer_.data()), ret);
+    contentStream.write(reinterpret_cast<const char*>(buffer_.data()), gsl::narrow<std::streamsize>(ret));
     read_size += ret;
     if (contentStream.fail()) {
       return -1;
@@ -162,9 +161,11 @@ int64_t ExtractText::ReadCallback::process(const std::shared_ptr<io::BaseStream>
     bool repeatingcapture;
     ctx_->getProperty(EnableRepeatingCaptureGroup.getName(), repeatingcapture);
 
-    int maxCaptureSizeProperty;
-    ctx_->getProperty(MaxCaptureGroupLen.getName(), maxCaptureSizeProperty);
-    size_t maxCaptureSize = gsl::narrow<size_t>(maxCaptureSizeProperty);
+    const size_t maxCaptureSize = [this] {
+      uint64_t val;
+      ctx_->getProperty(MaxCaptureGroupLen.getName(), val);
+      return gsl::narrow<size_t>(val);
+    }();
 
     std::string contentStr = contentStream.str();
 
@@ -212,7 +213,7 @@ int64_t ExtractText::ReadCallback::process(const std::shared_ptr<io::BaseStream>
   } else {
     flowFile_->setAttribute(attrKey, contentStream.str());
   }
-  return read_size;
+  return gsl::narrow<int64_t>(read_size);
 }
 
 ExtractText::ReadCallback::ReadCallback(std::shared_ptr<core::FlowFile> flowFile, core::ProcessContext *ctx,  std::shared_ptr<logging::Logger> lgr)
diff --git a/extensions/standard-processors/processors/GetTCP.cpp b/extensions/standard-processors/processors/GetTCP.cpp
index 749d515..1c93a8e 100644
--- a/extensions/standard-processors/processors/GetTCP.cpp
+++ b/extensions/standard-processors/processors/GetTCP.cpp
@@ -166,12 +166,12 @@ void GetTCP::onSchedule(const std::shared_ptr<core::ProcessContext> &context, co
       do {
         if ( socket_ring_buffer_.try_dequeue(socket_ptr) ) {
           buffer.resize(receive_buffer_size_);
-          int size_read = socket_ptr->read(buffer.data(), gsl::narrow<int>(receive_buffer_size_), false);
-          if (size_read >= 0) {
-            if (size_read > 0) {
+          const auto size_read = socket_ptr->read(buffer.data(), receive_buffer_size_, false);
+          if (!io::isError(size_read)) {
+            if (size_read != 0) {
               // determine cut location
-              int startLoc = 0, i = 0;
-              for (; i < size_read; i++) {
+              size_t startLoc = 0;
+              for (size_t i = 0; i < size_read; i++) {
                 if (buffer.at(i) == endOfMessageByte && i > 0) {
                   if (i-startLoc > 0) {
                     handler_->handle(socket_ptr->getHostname(), buffer.data()+startLoc, (i-startLoc), true);
@@ -193,7 +193,7 @@ void GetTCP::onSchedule(const std::shared_ptr<core::ProcessContext> &context, co
               reconnects = 0;
             }
             socket_ring_buffer_.enqueue(std::move(socket_ptr));
-          } else if (size_read == -2 && stay_connected_) {
+          } else if (size_read == static_cast<size_t>(-2) && stay_connected_) {
             if (++reconnects > connection_attempt_limit_) {
               logger_->log_info("Too many reconnects, exiting thread");
               socket_ptr->close();
diff --git a/extensions/standard-processors/processors/LogAttribute.h b/extensions/standard-processors/processors/LogAttribute.h
index 31445d6..ff886d0 100644
--- a/extensions/standard-processors/processors/LogAttribute.h
+++ b/extensions/standard-processors/processors/LogAttribute.h
@@ -47,14 +47,14 @@ class LogAttribute : public core::Processor {
    * Create a new processor
    */
   explicit LogAttribute(const std::string& name, const utils::Identifier& uuid = {})
-      : Processor(std::move(name), uuid),
+      : Processor(name, uuid),
         flowfiles_to_log_(1),
         hexencode_(false),
         max_line_length_(80U),
         logger_(logging::LoggerFactory<LogAttribute>::getLogger()) {
   }
   // Destructor
-  virtual ~LogAttribute() = default;
+  ~LogAttribute() override = default;
   // Processor Name
   static constexpr char const* ProcessorName = "LogAttribute";
   // Supported Properties
@@ -103,16 +103,14 @@ class LogAttribute : public core::Processor {
         : logger_(std::move(logger))
         , buffer_(size)  {
     }
-    int64_t process(const std::shared_ptr<io::BaseStream>& stream) {
-      if (buffer_.size() == 0U) {
-        return 0U;
-      }
-      int ret = stream->read(buffer_.data(), gsl::narrow<int>(buffer_.size()));
-      if (ret < 0 || static_cast<uint64_t>(ret) != buffer_.size()) {
-        logger_->log_error("%zu bytes were requested from the stream but %d bytes were read. Rolling back.", buffer_.size(), ret);
+    int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
+      if (buffer_.empty()) return 0U;
+      const auto ret = stream->read(buffer_.data(), buffer_.size());
+      if (ret != buffer_.size()) {
+        logger_->log_error("%zu bytes were requested from the stream but %zu bytes were read. Rolling back.", buffer_.size(), size_t{ret});
         throw Exception(PROCESSOR_EXCEPTION, "Failed to read the entire FlowFile.");
       }
-      return buffer_.size();
+      return gsl::narrow<int64_t>(buffer_.size());
     }
     std::shared_ptr<logging::Logger> logger_;
     std::vector<uint8_t> buffer_;
@@ -123,7 +121,7 @@ class LogAttribute : public core::Processor {
   // OnTrigger method, implemented by NiFi LogAttribute
   void onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) override;
   // Initialize, over write by NiFi LogAttribute
-  void initialize(void) override;
+  void initialize() override;
 
  private:
   core::annotation::Input getInputRequirement() const override {
diff --git a/extensions/standard-processors/processors/PutFile.cpp b/extensions/standard-processors/processors/PutFile.cpp
index 33e223c..9efae99 100644
--- a/extensions/standard-processors/processors/PutFile.cpp
+++ b/extensions/standard-processors/processors/PutFile.cpp
@@ -25,10 +25,12 @@
 #include <memory>
 #include <string>
 #include <set>
+#include <utility>
 #ifdef WIN32
 #include <Windows.h>
 #endif
 #include "utils/file/FileUtils.h"
+#include "utils/gsl.h"
 
 namespace org {
 namespace apache {
@@ -304,9 +306,9 @@ void PutFile::getDirectoryPermissions(core::ProcessContext *context) {
 }
 #endif
 
-PutFile::ReadCallback::ReadCallback(const std::string &tmp_file, const std::string &dest_file)
-    : tmp_file_(tmp_file),
-      dest_file_(dest_file) {
+PutFile::ReadCallback::ReadCallback(std::string tmp_file, std::string dest_file)
+    : tmp_file_(std::move(tmp_file)),
+      dest_file_(std::move(dest_file)) {
 }
 
 // Copy the entire file contents to the temporary file
@@ -319,17 +321,10 @@ int64_t PutFile::ReadCallback::process(const std::shared_ptr<io::BaseStream>& st
   std::ofstream tmp_file_os(tmp_file_, std::ios::out | std::ios::binary);
 
   do {
-    int read = stream->read(buffer, 1024);
-
-    if (read < 0) {
-      return -1;
-    }
-
-    if (read == 0) {
-      break;
-    }
-
-    tmp_file_os.write(reinterpret_cast<char *>(buffer), read);
+    const auto read = stream->read(buffer, 1024);
+    if (io::isError(read)) return -1;
+    if (read == 0) break;
+    tmp_file_os.write(reinterpret_cast<char *>(buffer), gsl::narrow<std::streamsize>(read));
     size += read;
   } while (size < stream->size());
 
@@ -339,7 +334,7 @@ int64_t PutFile::ReadCallback::process(const std::shared_ptr<io::BaseStream>& st
     write_succeeded_ = true;
   }
 
-  return size;
+  return gsl::narrow<int64_t>(size);
 }
 
 // Renames tmp file to final destination
diff --git a/extensions/standard-processors/processors/PutFile.h b/extensions/standard-processors/processors/PutFile.h
index 47e767d..47509e2 100644
--- a/extensions/standard-processors/processors/PutFile.h
+++ b/extensions/standard-processors/processors/PutFile.h
@@ -82,7 +82,7 @@ class PutFile : public core::Processor {
 
   class ReadCallback : public InputStreamCallback {
    public:
-    ReadCallback(const std::string &tmp_file, const std::string &dest_file);
+    ReadCallback(std::string tmp_file, std::string dest_file);
     ~ReadCallback() override;
     int64_t process(const std::shared_ptr<io::BaseStream>& stream) override;
     bool commit();
diff --git a/extensions/standard-processors/processors/TailFile.cpp b/extensions/standard-processors/processors/TailFile.cpp
index 836bbd2..26ea193 100644
--- a/extensions/standard-processors/processors/TailFile.cpp
+++ b/extensions/standard-processors/processors/TailFile.cpp
@@ -399,7 +399,7 @@ void TailFile::onSchedule(const std::shared_ptr<core::ProcessContext> &context,
   std::string rolling_filename_pattern_glob;
   context->getProperty(RollingFilenamePattern.getName(), rolling_filename_pattern_glob);
   rolling_filename_pattern_ = utils::file::globToRegex(rolling_filename_pattern_glob);
-  initial_start_position_ = utils::parsePropertyWithAllowableValuesOrThrow(*context, InitialStartPosition.getName(), InitialStartPositions::values());
+  initial_start_position_ = InitialStartPositions{utils::parsePropertyWithAllowableValuesOrThrow(*context, InitialStartPosition.getName(), InitialStartPositions::values())};
 }
 
 void TailFile::parseStateFileLine(char *buf, std::map<std::string, TailState> &state) const {
diff --git a/extensions/tensorflow/TFApplyGraph.cpp b/extensions/tensorflow/TFApplyGraph.cpp
index 4419caa..14d349e 100644
--- a/extensions/tensorflow/TFApplyGraph.cpp
+++ b/extensions/tensorflow/TFApplyGraph.cpp
@@ -16,10 +16,12 @@
  */
 
 #include "TFApplyGraph.h"
-#include <core/ProcessContext.h>
-#include <core/ProcessSession.h>
 #include <tensorflow/cc/ops/standard_ops.h>
 
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+#include "utils/gsl.h"
+
 namespace org {
 namespace apache {
 namespace nifi {
@@ -191,29 +193,23 @@ void TFApplyGraph::onTrigger(const std::shared_ptr<core::ProcessContext> &contex
 int64_t TFApplyGraph::GraphReadCallback::process(const std::shared_ptr<io::BaseStream>& stream) {
   std::string graph_proto_buf;
   graph_proto_buf.resize(stream->size());
-  auto num_read = stream->read(reinterpret_cast<uint8_t *>(&graph_proto_buf[0]),
-                                   static_cast<int>(stream->size()));
-
+  const auto num_read = stream->read(reinterpret_cast<uint8_t *>(&graph_proto_buf[0]), stream->size());
   if (num_read != stream->size()) {
     throw std::runtime_error("GraphReadCallback failed to fully read flow file input stream");
   }
-
   graph_def_->ParseFromString(graph_proto_buf);
-  return num_read;
+  return gsl::narrow<int64_t>(num_read);
 }
 
 int64_t TFApplyGraph::TensorReadCallback::process(const std::shared_ptr<io::BaseStream>& stream) {
   std::string tensor_proto_buf;
   tensor_proto_buf.resize(stream->size());
-  auto num_read = stream->read(reinterpret_cast<uint8_t *>(&tensor_proto_buf[0]),
-                                   static_cast<int>(stream->size()));
-
+  const auto num_read = stream->read(reinterpret_cast<uint8_t *>(&tensor_proto_buf[0]), stream->size());
   if (num_read != stream->size()) {
     throw std::runtime_error("TensorReadCallback failed to fully read flow file input stream");
   }
-
   tensor_proto_->ParseFromString(tensor_proto_buf);
-  return num_read;
+  return gsl::narrow<int64_t>(num_read);
 }
 
 int64_t TFApplyGraph::TensorWriteCallback::process(const std::shared_ptr<io::BaseStream>& stream) {
diff --git a/extensions/tensorflow/TFConvertImageToTensor.cpp b/extensions/tensorflow/TFConvertImageToTensor.cpp
index 5f94548..aea09f8 100644
--- a/extensions/tensorflow/TFConvertImageToTensor.cpp
+++ b/extensions/tensorflow/TFConvertImageToTensor.cpp
@@ -321,14 +321,10 @@ int64_t TFConvertImageToTensor::ImageReadCallback::process(const std::shared_ptr
   if (tensor_->AllocatedBytes() < stream->size()) {
     throw std::runtime_error("Tensor is not big enough to hold FlowFile bytes");
   }
-
-  auto num_read = stream->read(tensor_->flat<unsigned char>().data(),
-                                   static_cast<int>(stream->size()));
-
+  const auto num_read = stream->read(tensor_->flat<unsigned char>().data(), stream->size());
   if (num_read != stream->size()) {
     throw std::runtime_error("TensorReadCallback failed to fully read flow file input stream");
   }
-
   return num_read;
 }
 
diff --git a/extensions/tensorflow/TFExtractTopLabels.cpp b/extensions/tensorflow/TFExtractTopLabels.cpp
index 1786b6f..2f44f84 100644
--- a/extensions/tensorflow/TFExtractTopLabels.cpp
+++ b/extensions/tensorflow/TFExtractTopLabels.cpp
@@ -19,6 +19,8 @@
 
 #include "tensorflow/cc/ops/standard_ops.h"
 
+#include "utils/gsl.h"
+
 namespace org {
 namespace apache {
 namespace nifi {
@@ -122,7 +124,7 @@ void TFExtractTopLabels::onTrigger(const std::shared_ptr<core::ProcessContext> &
 }
 
 int64_t TFExtractTopLabels::LabelsReadCallback::process(const std::shared_ptr<io::BaseStream>& stream) {
-  int64_t total_read = 0;
+  size_t total_read = 0;
   std::string label;
   uint64_t max_label_len = 65536;
   label.resize(max_label_len);
@@ -132,9 +134,9 @@ int64_t TFExtractTopLabels::LabelsReadCallback::process(const std::shared_ptr<io
   buf.resize(buf_size);
 
   while (total_read < stream->size()) {
-    auto read = stream->read(reinterpret_cast<uint8_t *>(&buf[0]), static_cast<int>(buf_size));
-
-    for (auto i = 0; i < read; i++) {
+    const auto read = stream->read(reinterpret_cast<uint8_t *>(&buf[0]), buf_size);
+    if (io::isError(read)) break;
+    for (size_t i = 0; i < read; i++) {
       if (buf[i] == '\n' || total_read + i == stream->size()) {
         labels_->emplace_back(label.substr(0, label_size));
         label_size = 0;
@@ -147,21 +149,18 @@ int64_t TFExtractTopLabels::LabelsReadCallback::process(const std::shared_ptr<io
     total_read += read;
   }
 
-  return total_read;
+  return gsl::narrow<int64_t>(total_read);
 }
 
 int64_t TFExtractTopLabels::TensorReadCallback::process(const std::shared_ptr<io::BaseStream>& stream) {
   std::string tensor_proto_buf;
   tensor_proto_buf.resize(stream->size());
-  auto num_read = stream->read(reinterpret_cast<uint8_t *>(&tensor_proto_buf[0]),
-                                   static_cast<int>(stream->size()));
-
+  const auto num_read = stream->read(reinterpret_cast<uint8_t *>(&tensor_proto_buf[0]), stream->size());
   if (num_read != stream->size()) {
     throw std::runtime_error("TensorReadCallback failed to fully read flow file input stream");
   }
-
   tensor_proto_->ParseFromString(tensor_proto_buf);
-  return num_read;
+  return gsl::narrow<int64_t>(num_read);
 }
 
 } /* namespace processors */
diff --git a/libminifi/include/io/AtomicEntryStream.h b/libminifi/include/io/AtomicEntryStream.h
index 75b2d4c..6456ff2 100644
--- a/libminifi/include/io/AtomicEntryStream.h
+++ b/libminifi/include/io/AtomicEntryStream.h
@@ -64,7 +64,7 @@ class AtomicEntryStream : public BaseStream {
    * Skip to the specified offset.
    * @param offset offset to which we will skip
    */
-  void seek(uint64_t offset) override;
+  void seek(size_t offset) override;
 
   size_t size() const override {
     return length_;
@@ -75,7 +75,7 @@ class AtomicEntryStream : public BaseStream {
    * @param buf buffer in which we extract data
    * @param buflen
    */
-  int read(uint8_t *buf, int buflen) override;
+  size_t read(uint8_t *buf, size_t buflen) override;
 
   /**
    * writes value to stream
@@ -103,7 +103,7 @@ AtomicEntryStream<T>::~AtomicEntryStream() {
 }
 
 template<typename T>
-void AtomicEntryStream<T>::seek(uint64_t offset) {
+void AtomicEntryStream<T>::seek(size_t offset) {
   std::lock_guard<std::recursive_mutex> lock(entry_lock_);
   offset_ = gsl::narrow<size_t>(offset);
 }
@@ -129,14 +129,13 @@ int AtomicEntryStream<T>::write(const uint8_t *value, int size) {
 }
 
 template<typename T>
-int AtomicEntryStream<T>::read(uint8_t *buf, int buflen) {
-  gsl_Expects(buflen >= 0);
+size_t AtomicEntryStream<T>::read(uint8_t *buf, size_t buflen) {
   if (buflen == 0) {
     return 0;
   }
   if (nullptr != buf && !invalid_stream_) {
     std::lock_guard<std::recursive_mutex> lock(entry_lock_);
-    int len = buflen;
+    auto len = buflen;
     core::repository::RepoValue<T> *value;
     if (entry_->getValue(key_, &value)) {
       if (offset_ + len > value->getBufferSize()) {
@@ -152,7 +151,7 @@ int AtomicEntryStream<T>::read(uint8_t *buf, int buflen) {
       return len;
     }
   }
-  return -1;
+  return STREAM_ERROR;
 }
 
 }  // namespace io
diff --git a/libminifi/include/io/BufferStream.h b/libminifi/include/io/BufferStream.h
index 3f0bfdc..7ab2b9e 100644
--- a/libminifi/include/io/BufferStream.h
+++ b/libminifi/include/io/BufferStream.h
@@ -35,7 +35,7 @@ class BufferStream : public BaseStream {
  public:
   BufferStream() = default;
 
-  BufferStream(const uint8_t *buf, const unsigned int len) {
+  BufferStream(const uint8_t *buf, const size_t len) {
     write(buf, len);
   }
 
@@ -48,7 +48,7 @@ class BufferStream : public BaseStream {
 
   int write(const uint8_t* data, int len) final;
 
-  int read(uint8_t* buffer, int len) override;
+  size_t read(uint8_t* buffer, size_t len) override;
 
   int initialize() override {
     buffer_.clear();
@@ -56,7 +56,7 @@ class BufferStream : public BaseStream {
     return 0;
   }
 
-  void seek(uint64_t offset) override {
+  void seek(size_t offset) override {
     readOffset_ += offset;
   }
 
diff --git a/libminifi/include/io/CRCStream.h b/libminifi/include/io/CRCStream.h
index 2204537..b6ae751 100644
--- a/libminifi/include/io/CRCStream.h
+++ b/libminifi/include/io/CRCStream.h
@@ -84,9 +84,9 @@ class InputCRCStream : public virtual CRCStreamBase<StreamType>, public InputStr
  public:
   using InputStream::read;
 
-  int read(uint8_t *buf, int buflen) override {
-    int ret = child_stream_->read(buf, buflen);
-    if (ret > 0) {
+  size_t read(uint8_t *buf, size_t buflen) override {
+    const auto ret = child_stream_->read(buf, buflen);
+    if (ret > 0 && !io::isError(ret)) {
       crc_ = crc32(crc_, buf, ret);
     }
     return ret;
diff --git a/libminifi/include/io/ClientSocket.h b/libminifi/include/io/ClientSocket.h
index 84f3836..db71cba 100644
--- a/libminifi/include/io/ClientSocket.h
+++ b/libminifi/include/io/ClientSocket.h
@@ -159,7 +159,7 @@ class Socket : public BaseStream {
    * @param buflen
    * @param retrieve_all_bytes determines if we should read all bytes before returning
    */
-  int read(uint8_t *buf, int buflen) override {
+  size_t read(uint8_t *buf, size_t buflen) override {
     return read(buf, buflen, true);
   }
 
@@ -169,7 +169,7 @@ class Socket : public BaseStream {
    * @param buflen
    * @param retrieve_all_bytes determines if we should read all bytes before returning
    */
-  virtual int read(uint8_t *buf, int buflen, bool retrieve_all_bytes);
+  virtual size_t read(uint8_t *buf, size_t buflen, bool retrieve_all_bytes);
 
  protected:
   /**
diff --git a/libminifi/include/io/DescriptorStream.h b/libminifi/include/io/DescriptorStream.h
index a7ea713..ee5aec7 100644
--- a/libminifi/include/io/DescriptorStream.h
+++ b/libminifi/include/io/DescriptorStream.h
@@ -52,14 +52,14 @@ class DescriptorStream : public io::BaseStream {
    * Skip to the specified offset.
    * @param offset offset to which we will skip
    */
-  void seek(uint64_t offset) override;
+  void seek(size_t offset) override;
 
   /**
    * Reads data and places it into buf
    * @param buf buffer in which we extract data
    * @param buflen
    */
-  int read(uint8_t *buf, int buflen) override;
+  size_t read(uint8_t *buf, size_t buflen) override;
 
   /**
    * writes value to stream
diff --git a/libminifi/include/io/FileStream.h b/libminifi/include/io/FileStream.h
index 3156a30..ad7f0dc 100644
--- a/libminifi/include/io/FileStream.h
+++ b/libminifi/include/io/FileStream.h
@@ -63,7 +63,7 @@ class FileStream : public io::BaseStream {
    * Skip to the specified offset.
    * @param offset offset to which we will skip
    */
-  void seek(uint64_t offset) override;
+  void seek(size_t offset) override;
 
   size_t size() const override {
     return length_;
@@ -77,7 +77,7 @@ class FileStream : public io::BaseStream {
    * @param buf buffer in which we extract data
    * @param buflen
    */
-  int read(uint8_t *buf, int buflen) override;
+  size_t read(uint8_t *buf, size_t buflen) override;
 
   /**
    * writes value to stream
diff --git a/libminifi/include/io/InputStream.h b/libminifi/include/io/InputStream.h
index 60a297c..ba2342b 100644
--- a/libminifi/include/io/InputStream.h
+++ b/libminifi/include/io/InputStream.h
@@ -36,46 +36,46 @@ class InputStream : public virtual Stream {
     throw std::runtime_error("Querying size is not supported");
   }
   /**
-   * reads a byte array from the stream
+   * Reads a byte array from the stream. Use isError (Stream.h) to check for errors.
    * @param value reference in which will set the result
    * @param len length to read
-   * @return resulting read size
+   * @return resulting read size or STREAM_ERROR on error or static_cast<size_t>(-2) on EAGAIN
    **/
-  virtual int read(uint8_t *value, int len) = 0;
+  virtual size_t read(uint8_t *value, size_t len) = 0;
 
-  int read(std::vector<uint8_t>& buffer, int len);
+  size_t read(std::vector<uint8_t>& buffer, size_t len);
 
   /**
-   * read string from stream
+   * Read string from stream. Use isError (Stream.h) to check for errors.
    * @param str reference string
-   * @return resulting read size
+   * @return resulting read size or STREAM_ERROR on error or static_cast<size_t>(-2) on EAGAIN
    **/
-  int read(std::string &str, bool widen = false);
+  size_t read(std::string &str, bool widen = false);
 
   /**
-   * read a bool from stream
+   * Read a bool from stream. Use isError (Stream.h) to check for errors.
    * @param value reference to the output
-   * @return resulting read size
+   * @return resulting read size or STREAM_ERROR on error or static_cast<size_t>(-2) on EAGAIN
    **/
-  int read(bool& value);
+  size_t read(bool& value);
 
   /**
-   * read a uuid from stream
+   * Read a uuid from stream. Use isError (Stream.h) to check for errors.
    * @param value reference to the output
-   * @return resulting read size
+   * @return resulting read size or STREAM_ERROR on error or static_cast<size_t>(-2) on EAGAIN
    **/
-  int read(utils::Identifier& value);
+  size_t read(utils::Identifier& value);
 
   /**
-  * reads sizeof(Integral) bytes from the stream
-  * @param value reference in which will set the result
-  * @return resulting read size
-  **/
+   * Reads sizeof(Integral) bytes from the stream. Use isError (Stream.h) to check for errors.
+   * @param value reference in which will set the result
+   * @return resulting read size or STREAM_ERROR on error or static_cast<size_t>(-2) on EAGAIN
+   **/
   template<typename Integral, typename = std::enable_if<std::is_unsigned<Integral>::value && !std::is_same<Integral, bool>::value>>
-  int read(Integral& value) {
+  size_t read(Integral& value) {
     uint8_t buf[sizeof(Integral)]{};
     if (read(buf, sizeof(Integral)) != sizeof(Integral)) {
-      return -1;
+      return io::STREAM_ERROR;
     }
 
     value = 0;
diff --git a/libminifi/include/io/Stream.h b/libminifi/include/io/Stream.h
index e067b52..e694c2e 100644
--- a/libminifi/include/io/Stream.h
+++ b/libminifi/include/io/Stream.h
@@ -24,6 +24,17 @@ namespace nifi {
 namespace minifi {
 namespace io {
 
+constexpr size_t STREAM_ERROR = static_cast<size_t>(-1);
+
+inline bool isError(const size_t read_return) noexcept {
+  return read_return == STREAM_ERROR  // general error
+      || read_return == static_cast<size_t>(-2);  // Socket EAGAIN, to be refactored to eliminate this error condition
+}
+
+inline bool isError(const int write_return) noexcept {
+  return write_return == -1;
+}
+
 /**
  * All streams serialize/deserialize in big-endian
  */
@@ -31,7 +42,7 @@ class Stream {
  public:
   virtual void close() {}
 
-  virtual void seek(uint64_t /*offset*/) {
+  virtual void seek(size_t /*offset*/) {
     throw std::runtime_error("Seek is not supported");
   }
 
diff --git a/libminifi/include/io/StreamPipe.h b/libminifi/include/io/StreamPipe.h
index b3fafce..abac015 100644
--- a/libminifi/include/io/StreamPipe.h
+++ b/libminifi/include/io/StreamPipe.h
@@ -49,14 +49,10 @@ inline int64_t pipe(const std::shared_ptr<io::InputStream>& src, const std::shar
   uint8_t buffer[4096U];
   int64_t totalTransferred = 0;
   while (true) {
-    int readRet = src->read(buffer, sizeof(buffer));
-    if (readRet < 0) {
-      return readRet;
-    }
-    if (readRet == 0) {
-      break;
-    }
-    int remaining = readRet;
+    const auto readRet = src->read(buffer, sizeof(buffer));
+    if (io::isError(readRet)) return -1;
+    if (readRet == 0) break;
+    auto remaining = readRet;
     int transferred = 0;
     while (remaining > 0) {
       int writeRet = dst->write(buffer + transferred, remaining);
diff --git a/libminifi/include/io/tls/SecureDescriptorStream.h b/libminifi/include/io/tls/SecureDescriptorStream.h
index 7699910..5f285c1 100644
--- a/libminifi/include/io/tls/SecureDescriptorStream.h
+++ b/libminifi/include/io/tls/SecureDescriptorStream.h
@@ -63,7 +63,7 @@ class SecureDescriptorStream : public io::BaseStream {
    * Skip to the specified offset.
    * @param offset offset to which we will skip
    */
-  void seek(uint64_t offset) override;
+  void seek(size_t offset) override;
 
   size_t size() const override {
     return -1;
@@ -74,7 +74,7 @@ class SecureDescriptorStream : public io::BaseStream {
    * @param buf buffer in which we extract data
    * @param buflen
    */
-  int read(uint8_t *buf, int buflen) override;
+  size_t read(uint8_t *buf, size_t buflen) override;
 
   /**
    * writes value to stream
diff --git a/libminifi/include/io/tls/TLSSocket.h b/libminifi/include/io/tls/TLSSocket.h
index 6bf741d..24df1d9 100644
--- a/libminifi/include/io/tls/TLSSocket.h
+++ b/libminifi/include/io/tls/TLSSocket.h
@@ -136,14 +136,14 @@ class TLSSocket : public Socket {
   using Socket::read;
   using Socket::write;
 
-  int read(uint8_t *buf, int buflen, bool retrieve_all_bytes) override;
+  size_t read(uint8_t *buf, size_t buflen, bool retrieve_all_bytes) override;
 
   /**
    * Reads data and places it into buf
    * @param buf buffer in which we extract data
    * @param buflen
    */
-  int read(uint8_t *buf, int buflen) override;
+  size_t read(uint8_t *buf, size_t buflen) override;
 
   /**
    * Write value to the stream using uint8_t ptr
diff --git a/libminifi/include/provenance/Provenance.h b/libminifi/include/provenance/Provenance.h
index 3b7b4f7..ed1cadc 100644
--- a/libminifi/include/provenance/Provenance.h
+++ b/libminifi/include/provenance/Provenance.h
@@ -363,25 +363,23 @@ class ProvenanceEventRecord : public core::SerializableComponent {
 
   uint64_t getEventTime(const uint8_t *buffer, const size_t bufferSize) {
     const auto size = std::min<size_t>(72, bufferSize);
-    org::apache::nifi::minifi::io::BufferStream outStream(buffer, gsl::narrow<int>(size));
+    org::apache::nifi::minifi::io::BufferStream outStream(buffer, size);
 
     std::string uuid;
-    int ret = outStream.read(uuid);
-
-    if (ret <= 0) {
+    const auto uuidret = outStream.read(uuid);
+    if (uuidret == 0 || io::isError(uuidret)) {
       return 0;
     }
 
     uint32_t eventType;
-    ret = outStream.read(eventType);
-    if (ret != 4) {
+    const auto typeret = outStream.read(eventType);
+    if (typeret != 4) {
       return 0;
     }
 
     uint64_t event_time;
-
-    ret = outStream.read(event_time);
-    if (ret != 8) {
+    const auto timeret = outStream.read(event_time);
+    if (timeret != 8) {
       return 0;
     }
 
diff --git a/libminifi/include/sitetosite/Peer.h b/libminifi/include/sitetosite/Peer.h
index a82e3b4..d2138ea 100644
--- a/libminifi/include/sitetosite/Peer.h
+++ b/libminifi/include/sitetosite/Peer.h
@@ -299,7 +299,7 @@ class SiteToSitePeer : public org::apache::nifi::minifi::io::BaseStream {
     return stream_->write(data, len);
   }
 
-  int read(uint8_t* data, int len) override {
+  size_t read(uint8_t* data, size_t len) override {
     return stream_->read(data, len);
   }
 
diff --git a/libminifi/include/sitetosite/SiteToSiteClient.h b/libminifi/include/sitetosite/SiteToSiteClient.h
index fc1dc91..d142bfa 100644
--- a/libminifi/include/sitetosite/SiteToSiteClient.h
+++ b/libminifi/include/sitetosite/SiteToSiteClient.h
@@ -19,6 +19,7 @@
 #ifndef LIBMINIFI_INCLUDE_SITETOSITE_SITETOSITECLIENT_H_
 #define LIBMINIFI_INCLUDE_SITETOSITE_SITETOSITECLIENT_H_
 
+#include <algorithm>
 #include <map>
 #include <memory>
 #include <string>
@@ -30,6 +31,7 @@
 #include "core/ProcessSession.h"
 #include "core/ProcessContext.h"
 #include "core/Connectable.h"
+#include "utils/gsl.h"
 
 namespace org {
 namespace apache {
@@ -43,15 +45,14 @@ namespace sitetosite {
  */
 class DataPacket {
  public:
-  DataPacket(const std::shared_ptr<logging::Logger> &logger, const std::shared_ptr<Transaction> &transaction, std::map<std::string, std::string> attributes, const std::string &payload)
-      : payload_(payload),
-        logger_reference_(logger) {
-    _size = 0;
-    transaction_ = transaction;
-    _attributes = attributes;
+  DataPacket(std::shared_ptr<logging::Logger> logger, std::shared_ptr<Transaction> transaction, std::map<std::string, std::string> attributes, const std::string &payload)
+      : _attributes{std::move(attributes)},
+        transaction_{std::move(transaction)},
+        payload_{payload},
+        logger_reference_{std::move(logger)} {
   }
   std::map<std::string, std::string> _attributes;
-  uint64_t _size;
+  uint64_t _size{0};
   std::shared_ptr<Transaction> transaction_;
   const std::string & payload_;
   std::shared_ptr<logging::Logger> logger_reference_;
@@ -60,24 +61,10 @@ class DataPacket {
 class SiteToSiteClient : public core::Connectable {
  public:
   SiteToSiteClient()
-      : core::Connectable("SitetoSiteClient"),
-        peer_state_(IDLE),
-        _batchSendNanos(5000000000),
-        ssl_context_service_(nullptr),
-        logger_(logging::LoggerFactory<SiteToSiteClient>::getLogger()) {
-    _supportedVersion[0] = 5;
-    _supportedVersion[1] = 4;
-    _supportedVersion[2] = 3;
-    _supportedVersion[3] = 2;
-    _supportedVersion[4] = 1;
-    _currentVersion = _supportedVersion[0];
-    _currentVersionIndex = 0;
-    _supportedCodecVersion[0] = 1;
-    _currentCodecVersion = _supportedCodecVersion[0];
-    _currentCodecVersionIndex = 0;
+      : core::Connectable("SitetoSiteClient") {
   }
 
-  virtual ~SiteToSiteClient() = default;
+  ~SiteToSiteClient() override = default;
 
   void setSSLContextService(const std::shared_ptr<minifi::controllers::SSLContextService> &context_service) {
     ssl_context_service_ = context_service;
@@ -189,13 +176,13 @@ class SiteToSiteClient : public core::Connectable {
     return logger_;
   }
 
-  virtual void yield() {
+  void yield() override {
   }
 
   /**
    * Determines if we are connected and operating
    */
-  virtual bool isRunning() {
+  bool isRunning() override {
     return running_;
   }
 
@@ -203,7 +190,7 @@ class SiteToSiteClient : public core::Connectable {
    * Determines if work is available by this connectable
    * @return boolean if work is available.
    */
-  virtual bool isWorkAvailable() {
+  bool isWorkAvailable() override {
     return true;
   }
 
@@ -234,16 +221,16 @@ class SiteToSiteClient : public core::Connectable {
   virtual int writeResponse(const std::shared_ptr<Transaction> &transaction, RespondCode code, std::string message);
   // getRespondCodeContext
   virtual RespondCodeContext *getRespondCodeContext(RespondCode code) {
-    for (unsigned int i = 0; i < sizeof(SiteToSiteRequest::respondCodeContext) / sizeof(RespondCodeContext); i++) {
-      if (SiteToSiteRequest::respondCodeContext[i].code == code) {
-        return &SiteToSiteRequest::respondCodeContext[i];
+    for (auto & i : SiteToSiteRequest::respondCodeContext) {
+      if (i.code == code) {
+        return &i;
       }
     }
-    return NULL;
+    return nullptr;
   }
 
   // Peer State
-  PeerState peer_state_;
+  PeerState peer_state_{PeerState::IDLE};
 
   // portId
   utils::Identifier port_id_;
@@ -254,28 +241,28 @@ class SiteToSiteClient : public core::Connectable {
   // Peer Connection
   std::unique_ptr<SiteToSitePeer> peer_;
 
-  std::atomic<bool> running_;
+  std::atomic<bool> running_{false};
 
   // transaction map
   std::map<utils::Identifier, std::shared_ptr<Transaction>> known_transactions_;
 
   // BATCH_SEND_NANOS
-  uint64_t _batchSendNanos;
+  uint64_t _batchSendNanos{5000000000};
 
   /***
    * versioning
    */
-  uint32_t _supportedVersion[5];
-  uint32_t _currentVersion;
-  int _currentVersionIndex;
-  uint32_t _supportedCodecVersion[1];
-  uint32_t _currentCodecVersion;
-  int _currentCodecVersionIndex;
+  uint32_t _supportedVersion[5] = {5, 4, 3, 2, 1};
+  int _currentVersionIndex{0};
+  uint32_t _currentVersion{_supportedVersion[_currentVersionIndex]};
+  uint32_t _supportedCodecVersion[1] = {1};
+  int _currentCodecVersionIndex{0};
+  uint32_t _currentCodecVersion{_supportedCodecVersion[_currentCodecVersionIndex]};
 
   std::shared_ptr<minifi::controllers::SSLContextService> ssl_context_service_;
 
  private:
-  std::shared_ptr<logging::Logger> logger_;
+  std::shared_ptr<logging::Logger> logger_{logging::LoggerFactory<SiteToSiteClient>::getLogger()};
 };
 
 // Nest Callback Class for write stream
@@ -286,13 +273,13 @@ class WriteCallback : public OutputStreamCallback {
   }
   DataPacket *_packet;
   // void process(std::ofstream *stream) {
-  int64_t process(const std::shared_ptr<io::BaseStream>& stream) {
+  int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
     uint8_t buffer[16384];
     uint64_t len = _packet->_size;
     uint64_t total = 0;
     while (len > 0) {
-      int size = len < 16384 ? static_cast<int>(len) : 16384;
-      int ret = _packet->transaction_->getStream().read(buffer, size);
+      const auto size = std::min(len, uint64_t{16384});
+      const auto ret = _packet->transaction_->getStream().read(buffer, size);
       if (ret != size) {
         logging::LOG_ERROR(_packet->logger_reference_) << "Site2Site Receive Flow Size " << size << " Failed " << ret << ", should have received " << len;
         return -1;
@@ -302,7 +289,7 @@ class WriteCallback : public OutputStreamCallback {
       total += size;
     }
     logging::LOG_INFO(_packet->logger_reference_) << "Received " << total << " from stream";
-    return len;
+    return gsl::narrow<int64_t>(len);
   }
 };
 // Nest Callback Class for read stream
@@ -312,29 +299,23 @@ class ReadCallback : public InputStreamCallback {
       : _packet(packet) {
   }
   DataPacket *_packet;
-  int64_t process(const std::shared_ptr<io::BaseStream>& stream) {
+  int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
     _packet->_size = 0;
     uint8_t buffer[8192] = { 0 };
-    int readSize;
     size_t size = 0;
     do {
-      readSize = stream->read(buffer, 8192);
-
-      if (readSize == 0) {
-        break;
-      }
-      if (readSize < 0) {
-        return -1;
-      }
-      int ret = _packet->transaction_->getStream().write(buffer, readSize);
-      if (ret != readSize) {
+      const auto readSize = stream->read(buffer, 8192);
+      if (readSize == 0) break;
+      if (io::isError(readSize)) return -1;
+      const auto ret = _packet->transaction_->getStream().write(buffer, readSize);
+      if (ret < 0 || gsl::narrow<size_t>(ret) != readSize) {
         logging::LOG_INFO(_packet->logger_reference_) << "Site2Site Send Flow Size " << readSize << " Failed " << ret;
         return -1;
       }
       size += readSize;
     } while (size < stream->size());
     _packet->_size = size;
-    return size;
+    return gsl::narrow<int64_t>(size);
   }
 };
 
diff --git a/libminifi/include/utils/ByteArrayCallback.h b/libminifi/include/utils/ByteArrayCallback.h
index 84e3362..ace72b6 100644
--- a/libminifi/include/utils/ByteArrayCallback.h
+++ b/libminifi/include/utils/ByteArrayCallback.h
@@ -24,6 +24,7 @@
 #include "concurrentqueue.h"
 #include "FlowFileRecord.h"
 #include "core/logging/LoggerConfiguration.h"
+#include "utils/gsl.h"
 
 namespace org {
 namespace apache {
@@ -45,10 +46,10 @@ class ByteInputCallBack : public InputStreamCallback {
     if (stream->size() > 0) {
       vec.resize(stream->size());
 
-      stream->read(reinterpret_cast<uint8_t*>(vec.data()), gsl::narrow<int>(stream->size()));
+      stream->read(reinterpret_cast<uint8_t*>(vec.data()), stream->size());
     }
 
-    return vec.size();
+    return gsl::narrow<int64_t>(vec.size());
   }
 
   virtual void seek(size_t) { }
@@ -101,7 +102,7 @@ class ByteOutputCallback : public OutputStreamCallback {
 
   virtual int64_t process(const std::shared_ptr<io::BaseStream>& stream);
 
-  virtual const std::vector<char> to_string();
+  virtual std::vector<char> to_string();
 
   virtual void close();
 
diff --git a/libminifi/include/utils/Enum.h b/libminifi/include/utils/Enum.h
index fc4754a..b452307 100644
--- a/libminifi/include/utils/Enum.h
+++ b/libminifi/include/utils/Enum.h
@@ -83,8 +83,8 @@ namespace utils {
 
 #define SMART_ENUM_BODY(Clazz, ...) \
     constexpr Clazz(Type value = static_cast<Type>(-1)) : value_{value} {} \
-    Clazz(const std::string& str) : value_{parse(str.c_str()).value_} {} \
-    Clazz(const char* str) : value_{parse(str).value_} {} \
+    explicit Clazz(const std::string& str) : value_{parse(str.c_str()).value_} {} \
+    explicit Clazz(const char* str) : value_{parse(str).value_} {} \
    private: \
     Type value_; \
    public: \
diff --git a/libminifi/src/FlowControlProtocol.cpp b/libminifi/src/FlowControlProtocol.cpp
index 8d8ad4a..79334c2 100644
--- a/libminifi/src/FlowControlProtocol.cpp
+++ b/libminifi/src/FlowControlProtocol.cpp
@@ -71,26 +71,26 @@ int FlowControlProtocol::selectClient(int msec) {
 }
 
 int FlowControlProtocol::readData(uint8_t *buf, int buflen) {
+  gsl_Expects(buflen >= 0);
   int sendSize = buflen;
 
   while (buflen) {
-    int status;
-    status = selectClient(MAX_READ_TIMEOUT);
-    if (status <= 0) {
-      return status;
+    const auto selectstatus = selectClient(MAX_READ_TIMEOUT);
+    if (selectstatus <= 0) {
+      return selectstatus;
     }
 #ifdef WIN32
-    status = _read(_socket, buf, buflen);
+    const auto readstatus = _read(_socket, buf, buflen);
 #elif !defined(__MACH__)
-    status = read(_socket, buf, buflen);
+    const auto readstatus = read(_socket, buf, gsl::narrow<size_t>(buflen));
 #else
-    status = recv(_socket, buf, buflen, 0);
+    const auto readstatus = recv(_socket, buf, buflen, 0);
 #endif
-    if (status <= 0) {
-      return status;
+    if (readstatus <= 0) {
+      return gsl::narrow<int>(readstatus);
     }
-    buflen -= status;
-    buf += status;
+    buflen -= readstatus;
+    buf += readstatus;
   }
 
   return sendSize;
diff --git a/libminifi/src/FlowFileRecord.cpp b/libminifi/src/FlowFileRecord.cpp
index 813137b..89096a9 100644
--- a/libminifi/src/FlowFileRecord.cpp
+++ b/libminifi/src/FlowFileRecord.cpp
@@ -161,70 +161,90 @@ bool FlowFileRecord::Persist(const std::shared_ptr<core::Repository>& flowReposi
 }
 
 std::shared_ptr<FlowFileRecord> FlowFileRecord::DeSerialize(io::InputStream& inStream, const std::shared_ptr<core::ContentRepository>& content_repo, utils::Identifier& container) {
-  int ret;
-
   auto file = std::make_shared<FlowFileRecord>();
 
-  ret = inStream.read(file->event_time_);
-  if (ret != 8) {
-    return {};
+  {
+    const auto ret = inStream.read(file->event_time_);
+    if (ret != 8) {
+      return {};
+    }
   }
 
-  ret = inStream.read(file->entry_date_);
-  if (ret != 8) {
-    return {};
+  {
+    const auto ret = inStream.read(file->entry_date_);
+    if (ret != 8) {
+      return {};
+    }
   }
 
-  ret = inStream.read(file->lineage_start_date_);
-  if (ret != 8) {
-    return {};
+  {
+    const auto ret = inStream.read(file->lineage_start_date_);
+    if (ret != 8) {
+      return {};
+    }
   }
 
-  ret = inStream.read(file->uuid_);
-  if (ret <= 0) {
-    return {};
+  {
+    const auto ret = inStream.read(file->uuid_);
+    if (ret == 0 || io::isError(ret)) {
+      return {};
+    }
   }
 
-  ret = inStream.read(container);
-  if (ret <= 0) {
-    return {};
+  {
+    const auto ret = inStream.read(container);
+    if (ret == 0 || io::isError(ret)) {
+      return {};
+    }
   }
 
   // read flow attributes
   uint32_t numAttributes = 0;
-  ret = inStream.read(numAttributes);
-  if (ret != 4) {
-    return {};
+  {
+    const auto ret = inStream.read(numAttributes);
+    if (ret != 4) {
+      return {};
+    }
   }
 
   for (uint32_t i = 0; i < numAttributes; i++) {
     std::string key;
-    ret = inStream.read(key, true);
-    if (ret <= 0) {
-      return {};
+    {
+      const auto ret = inStream.read(key, true);
+      if (ret == 0 || io::isError(ret)) {
+        return {};
+      }
     }
     std::string value;
-    ret = inStream.read(value, true);
-    if (ret <= 0) {
-      return {};
+    {
+      const auto ret = inStream.read(value, true);
+      if (ret == 0 || io::isError(ret)) {
+        return {};
+      }
     }
     file->attributes_[key] = value;
   }
 
   std::string content_full_path;
-  ret = inStream.read(content_full_path);
-  if (ret <= 0) {
-    return {};
+  {
+    const auto ret = inStream.read(content_full_path);
+    if (ret == 0 || io::isError(ret)) {
+      return {};
+    }
   }
 
-  ret = inStream.read(file->size_);
-  if (ret != 8) {
-    return {};
+  {
+    const auto ret = inStream.read(file->size_);
+    if (ret != 8) {
+      return {};
+    }
   }
 
-  ret = inStream.read(file->offset_);
-  if (ret != 8) {
-    return {};
+  {
+    const auto ret = inStream.read(file->offset_);
+    if (ret != 8) {
+      return {};
+    }
   }
 
   file->claim_ = std::make_shared<ResourceClaim>(content_full_path, content_repo);
diff --git a/libminifi/src/c2/ControllerSocketProtocol.cpp b/libminifi/src/c2/ControllerSocketProtocol.cpp
index 756c61c..049a3d0 100644
--- a/libminifi/src/c2/ControllerSocketProtocol.cpp
+++ b/libminifi/src/c2/ControllerSocketProtocol.cpp
@@ -94,8 +94,8 @@ void ControllerSocketProtocol::initialize(core::controller::ControllerServicePro
         case Operation::START:
         {
           std::string componentStr;
-          int size = stream->read(componentStr);
-          if ( size != -1 ) {
+          const auto size = stream->read(componentStr);
+          if (!io::isError(size)) {
             auto components = update_sink_->getComponents(componentStr);
             for (const auto& component : components) {
               component->start();
@@ -108,8 +108,8 @@ void ControllerSocketProtocol::initialize(core::controller::ControllerServicePro
         case Operation::STOP:
         {
           std::string componentStr;
-          int size = stream->read(componentStr);
-          if ( size != -1 ) {
+          const auto size = stream->read(componentStr);
+          if (!io::isError(size)) {
             auto components = update_sink_->getComponents(componentStr);
             for (const auto& component : components) {
               component->stop();
@@ -122,8 +122,8 @@ void ControllerSocketProtocol::initialize(core::controller::ControllerServicePro
         case Operation::CLEAR:
         {
           std::string connection;
-          int size = stream->read(connection);
-          if ( size != -1 ) {
+          const auto size = stream->read(connection);
+          if (!io::isError(size)) {
             update_sink_->clearConnection(connection);
           }
         }
@@ -131,21 +131,25 @@ void ControllerSocketProtocol::initialize(core::controller::ControllerServicePro
         case Operation::UPDATE:
         {
           std::string what;
-          int size = stream->read(what);
-          if (size == -1) {
-            logger_->log_debug("Connection broke");
-            break;
+          {
+            const auto size = stream->read(what);
+            if (io::isError(size)) {
+              logger_->log_debug("Connection broke");
+              break;
+            }
           }
           if (what == "flow") {
             std::string ff_loc;
-            int size = stream->read(ff_loc);
+            {
+              const auto size = stream->read(ff_loc);
+              if (io::isError(size)) {
+                logger_->log_debug("Connection broke");
+                break;
+              }
+            }
             std::ifstream tf(ff_loc);
             std::string configuration((std::istreambuf_iterator<char>(tf)),
                 std::istreambuf_iterator<char>());
-            if (size == -1) {
-              logger_->log_debug("Connection broke");
-              break;
-            }
             update_sink_->applyUpdate("ControllerSocketProtocol", configuration);
           }
         }
@@ -153,15 +157,15 @@ void ControllerSocketProtocol::initialize(core::controller::ControllerServicePro
         case Operation::DESCRIBE:
         {
           std::string what;
-          int size = stream->read(what);
-          if (size == -1) {
+          const auto size = stream->read(what);
+          if (io::isError(size)) {
             logger_->log_debug("Connection broke");
             break;
           }
           if (what == "queue") {
             std::string connection;
-            int size = stream->read(connection);
-            if (size == -1) {
+            const auto size_ = stream->read(connection);
+            if (io::isError(size_)) {
               logger_->log_debug("Connection broke");
               break;
             }
@@ -177,8 +181,8 @@ void ControllerSocketProtocol::initialize(core::controller::ControllerServicePro
           } else if (what == "components") {
             io::BufferStream resp;
             resp.write(&head, 1);
-            uint16_t size = gsl::narrow<uint16_t>(update_sink_->getAllComponents().size());
-            resp.write(size);
+            const auto size_ = gsl::narrow<uint16_t>(update_sink_->getAllComponents().size());
+            resp.write(size_);
             for (const auto &component : update_sink_->getAllComponents()) {
               resp.write(component->getComponentName());
               resp.write(component->isRunning() ? "true" : "false");
@@ -203,8 +207,8 @@ void ControllerSocketProtocol::initialize(core::controller::ControllerServicePro
           } else if (what == "connections") {
             io::BufferStream resp;
             resp.write(&head, 1);
-            uint16_t size = gsl::narrow<uint16_t>(queue_full_.size());
-            resp.write(size);
+            const auto size_ = gsl::narrow<uint16_t>(queue_full_.size());
+            resp.write(size_);
             for (const auto &connection : queue_full_) {
               resp.write(connection.first, false);
             }
@@ -213,17 +217,17 @@ void ControllerSocketProtocol::initialize(core::controller::ControllerServicePro
             std::vector<std::string> full_connections;
             {
               std::lock_guard<std::mutex> lock(controller_mutex_);
-              for (auto conn : queue_full_) {
-                if (conn.second == true) {
+              for (const auto& conn : queue_full_) {
+                if (conn.second) {
                   full_connections.push_back(conn.first);
                 }
               }
             }
             io::BufferStream resp;
             resp.write(&head, 1);
-            uint16_t full_connection_count = gsl::narrow<uint16_t>(full_connections.size());
+            const auto full_connection_count = gsl::narrow<uint16_t>(full_connections.size());
             resp.write(full_connection_count);
-            for (auto conn : full_connections) {
+            for (const auto& conn : full_connections) {
               resp.write(conn);
             }
             stream->write(const_cast<uint8_t*>(resp.getBuffer()), gsl::narrow<int>(resp.size()));
@@ -241,10 +245,10 @@ void ControllerSocketProtocol::initialize(core::controller::ControllerServicePro
 void ControllerSocketProtocol::parse_content(const std::vector<C2ContentResponse> &content) {
   for (const auto &payload_content : content) {
     if (payload_content.name == "Components") {
-      for (auto content : payload_content.operation_arguments) {
-        bool is_enabled = minifi::utils::StringUtils::toBool(content.second.to_string()).value_or(false);
+      for (const auto& operation_argument : payload_content.operation_arguments) {
+        bool is_enabled = minifi::utils::StringUtils::toBool(operation_argument.second.to_string()).value_or(false);
         std::lock_guard<std::mutex> lock(controller_mutex_);
-        component_map_[content.first] = is_enabled;
+        component_map_[operation_argument.first] = is_enabled;
       }
     }
   }
@@ -253,21 +257,20 @@ void ControllerSocketProtocol::parse_content(const std::vector<C2ContentResponse
 int16_t ControllerSocketProtocol::heartbeat(const C2Payload &payload) {
   if (server_socket_ == nullptr)
     return 0;
-  const std::vector<C2ContentResponse> &content = payload.getContent();
   for (const auto &pc : payload.getNestedPayloads()) {
     if (pc.getLabel() == "flowInfo" || pc.getLabel() == "metrics") {
       for (const auto &metrics_payload : pc.getNestedPayloads()) {
         if (metrics_payload.getLabel() == "QueueMetrics" || metrics_payload.getLabel() == "queues") {
           for (const auto &queue_metrics : metrics_payload.getNestedPayloads()) {
-            auto metric_content = queue_metrics.getContent();
-            for (const auto &payload_content : queue_metrics.getContent()) {
+            const auto& metric_content = queue_metrics.getContent();
+            for (const auto &payload_content : metric_content) {
               uint64_t size = 0;
               uint64_t max = 0;
-              for (auto content : payload_content.operation_arguments) {
+              for (const auto& content : payload_content.operation_arguments) {
                 if (content.first == "datasize") {
-                  size = std::stol(content.second.to_string());
+                  size = std::stoull(content.second.to_string());
                 } else if (content.first == "datasizemax") {
-                  max = std::stol(content.second.to_string());
+                  max = std::stoull(content.second.to_string());
                 }
               }
               std::lock_guard<std::mutex> lock(controller_mutex_);
@@ -285,7 +288,7 @@ int16_t ControllerSocketProtocol::heartbeat(const C2Payload &payload) {
     }
   }
 
-  parse_content(content);
+  parse_content(payload.getContent());
 
   std::vector<uint8_t> buffer;
   buffer.resize(1024);
diff --git a/libminifi/src/core/ProcessSession.cpp b/libminifi/src/core/ProcessSession.cpp
index e3ce368..5e3454d 100644
--- a/libminifi/src/core/ProcessSession.cpp
+++ b/libminifi/src/core/ProcessSession.cpp
@@ -337,8 +337,8 @@ void ProcessSession::importFrom(io::InputStream&& stream, const std::shared_ptr<
  *
  */
 void ProcessSession::importFrom(io::InputStream &stream, const std::shared_ptr<core::FlowFile> &flow) {
-  std::shared_ptr<ResourceClaim> claim = content_session_->create();
-  int max_read = getpagesize();
+  const std::shared_ptr<ResourceClaim> claim = content_session_->create();
+  const auto max_read = gsl::narrow_cast<size_t>(getpagesize());
   std::vector<uint8_t> charBuffer(max_read);
 
   try {
@@ -348,10 +348,10 @@ void ProcessSession::importFrom(io::InputStream &stream, const std::shared_ptr<c
     if (nullptr == content_stream) {
       throw Exception(FILE_OPERATION_EXCEPTION, "Could not obtain claim for " + claim->getContentFullPath());
     }
-    int position = 0;
-    const int max_size = gsl::narrow<int>(stream.size());
+    size_t position = 0;
+    const auto max_size = stream.size();
     while (position < max_size) {
-      const int read_size = std::min(max_read, max_size - position);
+      const auto read_size = std::min(max_read, max_size - position);
       stream.read(charBuffer, read_size);
 
       content_stream->write(charBuffer.data(), read_size);
diff --git a/libminifi/src/core/ProcessSessionReadCallback.cpp b/libminifi/src/core/ProcessSessionReadCallback.cpp
index 8a21f73..0d822c7 100644
--- a/libminifi/src/core/ProcessSessionReadCallback.cpp
+++ b/libminifi/src/core/ProcessSessionReadCallback.cpp
@@ -24,6 +24,7 @@
 
 #include "core/logging/LoggerConfiguration.h"
 #include "io/BaseStream.h"
+#include "utils/gsl.h"
 
 namespace org {
 namespace apache {
@@ -47,20 +48,16 @@ int64_t ProcessSessionReadCallback::process(const std::shared_ptr<io::BaseStream
   size_t size = 0;
   uint8_t buffer[8192];
   do {
-    int read = stream->read(buffer, 8192);
-    if (read < 0) {
-      return -1;
-    }
-    if (read == 0) {
-      break;
-    }
+    const auto read = stream->read(buffer, 8192);
+    if (io::isError(read)) return -1;
+    if (read == 0) break;
     if (!_tmpFileOs.write(reinterpret_cast<char*>(buffer), read)) {
       return -1;
     }
     size += read;
   } while (size < stream->size());
   _writeSucceeded = true;
-  return size;
+  return gsl::narrow<int64_t>(size);
 }
 
 // Renames tmp file to final destination
diff --git a/libminifi/src/io/BufferStream.cpp b/libminifi/src/io/BufferStream.cpp
index 6d3ebab..bd0a9d2 100644
--- a/libminifi/src/io/BufferStream.cpp
+++ b/libminifi/src/io/BufferStream.cpp
@@ -36,17 +36,16 @@ int BufferStream::write(const uint8_t *value, int size) {
   return size;
 }
 
-int BufferStream::read(uint8_t *buf, int len) {
-  gsl_Expects(len >= 0);
-  int bytes_available_in_buffer = gsl::narrow<int>(buffer_.size() - readOffset_);
-  len = std::min(len, bytes_available_in_buffer);
-  auto begin = buffer_.begin() + readOffset_;
-  std::copy(begin, begin + len, buf);
+size_t BufferStream::read(uint8_t *buf, size_t len) {
+  const auto bytes_available_in_buffer = buffer_.size() - readOffset_;
+  const auto readlen = std::min(len, gsl::narrow<size_t>(bytes_available_in_buffer));
+  const auto begin = buffer_.begin() + gsl::narrow<decltype(buffer_)::difference_type>(readOffset_);
+  std::copy(begin, begin + gsl::narrow<decltype(buffer_)::difference_type>(readlen), buf);
 
   // increase offset for the next read
-  readOffset_ += len;
+  readOffset_ += readlen;
 
-  return len;
+  return readlen;
 }
 
 } /* namespace io */
diff --git a/libminifi/src/io/ClientSocket.cpp b/libminifi/src/io/ClientSocket.cpp
index 6427447..57c6915 100644
--- a/libminifi/src/io/ClientSocket.cpp
+++ b/libminifi/src/io/ClientSocket.cpp
@@ -47,6 +47,8 @@
 #include "core/logging/LoggerConfiguration.h"
 #include "utils/file/FileUtils.h"
 #include "utils/GeneralUtils.h"
+#include "utils/gsl.h"
+
 namespace util = org::apache::nifi::minifi::utils;
 namespace mio = org::apache::nifi::minifi::io;
 
@@ -523,9 +525,8 @@ int Socket::write(const uint8_t *value, int size) {
   return bytes;
 }
 
-int Socket::read(uint8_t *buf, int buflen, bool retrieve_all_bytes) {
-  gsl_Expects(buflen >= 0);
-  int32_t total_read = 0;
+size_t Socket::read(uint8_t *buf, size_t buflen, bool retrieve_all_bytes) {
+  size_t total_read = 0;
   while (buflen) {
     int16_t fd = select_descriptor(1000);
     if (fd < 0) {
@@ -533,9 +534,9 @@ int Socket::read(uint8_t *buf, int buflen, bool retrieve_all_bytes) {
         logger_->log_debug("fd %d close %i", fd, buflen);
         utils::file::FileUtils::close(socket_file_descriptor_);
       }
-      return -1;
+      return STREAM_ERROR;
     }
-    int bytes_read = recv(fd, reinterpret_cast<char*>(buf), buflen, 0);
+    const auto bytes_read = recv(fd, reinterpret_cast<char*>(buf), buflen, 0);
     logger_->log_trace("Recv call %d", bytes_read);
     if (bytes_read <= 0) {
       if (bytes_read == 0) {
@@ -545,23 +546,23 @@ int Socket::read(uint8_t *buf, int buflen, bool retrieve_all_bytes) {
         int err = WSAGetLastError();
         if (err == WSAEWOULDBLOCK) {
           // continue
-          return -2;
+          return static_cast<size_t>(-2);
         }
         logger_->log_error("Could not recv on %d (port %d), error code: %d", fd, port_, err);
 #else
         if (errno == EAGAIN || errno == EWOULDBLOCK) {
           // continue
-          return -2;
+          return static_cast<size_t>(-2);
         }
         logger_->log_error("Could not recv on %d (port %d), error: %s", fd, port_, strerror(errno));
 
 #endif  // WIN32
       }
-      return -1;
+      return STREAM_ERROR;
     }
-    buflen -= bytes_read;
+    buflen -= gsl::narrow<size_t>(bytes_read);
     buf += bytes_read;
-    total_read += bytes_read;
+    total_read += gsl::narrow<size_t>(bytes_read);
     if (!retrieve_all_bytes) {
       break;
     }
diff --git a/libminifi/src/io/DescriptorStream.cpp b/libminifi/src/io/DescriptorStream.cpp
index 2b5fc70..a3a4ee8 100644
--- a/libminifi/src/io/DescriptorStream.cpp
+++ b/libminifi/src/io/DescriptorStream.cpp
@@ -40,7 +40,7 @@ DescriptorStream::DescriptorStream(int fd)
       logger_(logging::LoggerFactory<DescriptorStream>::getLogger()) {
 }
 
-void DescriptorStream::seek(uint64_t offset) {
+void DescriptorStream::seek(size_t offset) {
   std::lock_guard<std::recursive_mutex> lock(file_lock_);
 #ifdef WIN32
   _lseeki64(fd_, gsl::narrow<int64_t>(offset), 0x00);
@@ -70,25 +70,24 @@ int DescriptorStream::write(const uint8_t *value, int size) {
   }
 }
 
-int DescriptorStream::read(uint8_t *buf, int buflen) {
-  gsl_Expects(buflen >= 0);
+size_t DescriptorStream::read(uint8_t *buf, size_t buflen) {
   if (buflen == 0) {
     return 0;
   }
   if (!IsNullOrEmpty(buf)) {
 #ifdef WIN32
-    auto size_read = _read(fd_, buf, buflen);
+    const auto size_read = _read(fd_, buf, buflen);
 #else
-    auto size_read = ::read(fd_, buf, buflen);
+    const auto size_read = ::read(fd_, buf, buflen);
 #endif
 
     if (size_read < 0) {
-      return -1;
+      return STREAM_ERROR;
     }
-    return  size_read;
+    return gsl::narrow<size_t>(size_read);
 
   } else {
-    return -1;
+    return STREAM_ERROR;
   }
 }
 
diff --git a/libminifi/src/io/FileStream.cpp b/libminifi/src/io/FileStream.cpp
index 4f759ae..90fb0a2 100644
--- a/libminifi/src/io/FileStream.cpp
+++ b/libminifi/src/io/FileStream.cpp
@@ -25,6 +25,8 @@
 #include "io/FileStream.h"
 #include "io/InputStream.h"
 #include "io/OutputStream.h"
+#include "utils/gsl.h"
+
 namespace org {
 namespace apache {
 namespace nifi {
@@ -97,13 +99,13 @@ void FileStream::close() {
   file_stream_.reset();
 }
 
-void FileStream::seek(uint64_t offset) {
+void FileStream::seek(size_t offset) {
   std::lock_guard<std::mutex> lock(file_lock_);
   if (file_stream_ == nullptr || !file_stream_->is_open()) {
     logging::LOG_ERROR(logger_) << SEEK_ERROR_MSG << INVALID_FILE_STREAM_ERROR_MSG;
     return;
   }
-  offset_ = gsl::narrow<size_t>(offset);
+  offset_ = offset;
   file_stream_->clear();
   if (!file_stream_->seekg(offset_))
     logging::LOG_ERROR(logger_) << SEEK_ERROR_MSG << SEEKG_CALL_ERROR_MSG;
@@ -142,8 +144,7 @@ int FileStream::write(const uint8_t *value, int size) {
   }
 }
 
-int FileStream::read(uint8_t *buf, int buflen) {
-  gsl_Expects(buflen >= 0);
+size_t FileStream::read(uint8_t *buf, size_t buflen) {
   if (buflen == 0) {
     return 0;
   }
@@ -151,32 +152,31 @@ int FileStream::read(uint8_t *buf, int buflen) {
     std::lock_guard<std::mutex> lock(file_lock_);
     if (file_stream_ == nullptr || !file_stream_->is_open()) {
       logging::LOG_ERROR(logger_) << READ_ERROR_MSG << INVALID_FILE_STREAM_ERROR_MSG;
-      return -1;
+      return STREAM_ERROR;
     }
-    file_stream_->read(reinterpret_cast<char*>(buf), buflen);
+    file_stream_->read(reinterpret_cast<char*>(buf), gsl::narrow<std::streamsize>(buflen));
     if (file_stream_->eof() || file_stream_->fail()) {
       file_stream_->clear();
       seekToEndOfFile(READ_ERROR_MSG);
       auto tellg_result = file_stream_->tellg();
       if (tellg_result == std::streampos(-1)) {
         logging::LOG_ERROR(logger_) << READ_ERROR_MSG << TELLG_CALL_ERROR_MSG;
-        return -1;
+        return STREAM_ERROR;
       }
-      size_t len = gsl::narrow<size_t>(tellg_result);
+      const auto len = gsl::narrow<size_t>(tellg_result);
       size_t ret = len - offset_;
       offset_ = len;
       length_ = len;
       logging::LOG_DEBUG(logger_) << path_ << " eof bit, ended at " << offset_;
-      return gsl::narrow<int>(ret);
+      return ret;
     } else {
       offset_ += buflen;
-      file_stream_->seekp(offset_);
+      file_stream_->seekp(gsl::narrow<std::streamoff>(offset_));
       return buflen;
     }
-
   } else {
     logging::LOG_ERROR(logger_) << READ_ERROR_MSG << INVALID_BUFFER_ERROR_MSG;
-    return -1;
+    return STREAM_ERROR;
   }
 }
 
diff --git a/libminifi/src/io/InputStream.cpp b/libminifi/src/io/InputStream.cpp
index cdd101e..8daa499 100644
--- a/libminifi/src/io/InputStream.cpp
+++ b/libminifi/src/io/InputStream.cpp
@@ -16,13 +16,10 @@
  * limitations under the License.
  */
 #include <cstdio>
-#include <iostream>
 #include <vector>
 #include <string>
-#include <algorithm>
 #include "io/InputStream.h"
 #include "utils/gsl.h"
-#include "utils/OptionalUtils.h"
 
 namespace org {
 namespace apache {
@@ -30,67 +27,68 @@ namespace nifi {
 namespace minifi {
 namespace io {
 
-int InputStream::read(std::vector<uint8_t>& buffer, int len) {
-  if (buffer.size() < gsl::narrow<size_t>(len)) {
+size_t InputStream::read(std::vector<uint8_t>& buffer, size_t len) {
+  if (buffer.size() < len) {
     buffer.resize(len);
   }
-  int ret = read(buffer.data(), len);
-  buffer.resize((std::max)(ret, 0));
+  const auto ret = read(buffer.data(), len);
+  if (io::isError(ret)) return ret;
+  buffer.resize(ret);
   return ret;
 }
 
-int InputStream::read(bool &value) {
+size_t InputStream::read(bool &value) {
   uint8_t buf = 0;
 
   if (read(&buf, 1) != 1) {
-    return -1;
+    return STREAM_ERROR;
   }
   value = buf;
   return 1;
 }
 
-int InputStream::read(utils::Identifier &value) {
+size_t InputStream::read(utils::Identifier &value) {
   std::string uuidStr;
-  int ret = read(uuidStr);
-  if (ret < 0) {
+  const auto ret = read(uuidStr);
+  if (isError(ret)) {
     return ret;
   }
   auto optional_uuid = utils::Identifier::parse(uuidStr);
   if (!optional_uuid) {
-    return -1;
+    return STREAM_ERROR;
   }
   value = optional_uuid.value();
   return ret;
 }
 
-int InputStream::read(std::string &str, bool widen) {
-  uint32_t len = 0;
-  int ret = 0;
+size_t InputStream::read(std::string &str, bool widen) {
+  uint32_t string_length = 0;
+  size_t length_return = 0;
   if (!widen) {
     uint16_t shortLength = 0;
-    ret = read(shortLength);
-    len = shortLength;
+    length_return = read(shortLength);
+    string_length = shortLength;
   } else {
-    ret = read(len);
+    length_return = read(string_length);
   }
 
-  if (ret <= 0) {
-    return ret;
+  if (length_return == 0 || isError(length_return)) {
+    return length_return;
   }
 
-  if (len == 0) {
-    str = "";
-    return ret;
+  if (string_length == 0) {
+    str.clear();
+    return length_return;
   }
 
-  std::vector<uint8_t> buffer(len);
-  uint32_t bytes_read = gsl::narrow<uint32_t>(read(buffer.data(), len));
-  if (bytes_read != len) {
-    return -1;
+  std::vector<uint8_t> buffer(string_length);
+  const auto read_return = read(buffer.data(), string_length);
+  if (read_return != string_length) {
+    return read_return;
   }
 
-  str = std::string(reinterpret_cast<const char*>(buffer.data()), len);
-  return ret + len;
+  str = std::string(reinterpret_cast<const char*>(buffer.data()), string_length);
+  return length_return + string_length;
 }
 
 } /* namespace io */
diff --git a/libminifi/src/io/tls/SecureDescriptorStream.cpp b/libminifi/src/io/tls/SecureDescriptorStream.cpp
index c5b0b3f..ea93dea 100644
--- a/libminifi/src/io/tls/SecureDescriptorStream.cpp
+++ b/libminifi/src/io/tls/SecureDescriptorStream.cpp
@@ -23,6 +23,8 @@
 #include <string>
 #include <Exception.h>
 #include "io/validation.h"
+#include "utils/gsl.h"
+
 namespace org {
 namespace apache {
 namespace nifi {
@@ -34,7 +36,7 @@ SecureDescriptorStream::SecureDescriptorStream(int fd, SSL *ssl)
       logger_(logging::LoggerFactory<SecureDescriptorStream>::getLogger()) {
 }
 
-void SecureDescriptorStream::seek(uint64_t offset) {
+void SecureDescriptorStream::seek(size_t offset) {
   std::lock_guard<std::recursive_mutex> lock(file_lock_);
 #ifdef WIN32
   _lseeki64(fd_, gsl::narrow<int64_t>(offset), 0x00);
@@ -71,34 +73,30 @@ int SecureDescriptorStream::write(const uint8_t *value, int size) {
   }
 }
 
-int SecureDescriptorStream::read(uint8_t *buf, int buflen) {
-  gsl_Expects(buflen >= 0);
+size_t SecureDescriptorStream::read(uint8_t * const buf, const size_t buflen) {
   if (buflen == 0) {
     return 0;
   }
-  if (!IsNullOrEmpty(buf)) {
-    int total_read = 0;
-      int status = 0;
-      while (buflen) {
-        int sslStatus;
-        do {
-          status = SSL_read(ssl_, buf, buflen);
-          sslStatus = SSL_get_error(ssl_, status);
-        } while (status < 0 && sslStatus == SSL_ERROR_WANT_READ);
-
-        if (status < 0)
-              break;
-
-        buflen -= status;
-        buf += status;
-        total_read += status;
-      }
+  if (IsNullOrEmpty(buf)) return STREAM_ERROR;
+  size_t total_read = 0;
+  uint8_t* writepos = buf;
+  while (buflen > total_read) {
+    int status;
+    int sslStatus;
+    do {
+      const auto ssl_read_size = gsl::narrow<int>(std::min(buflen - total_read, gsl::narrow<size_t>(std::numeric_limits<int>::max())));
+      status = SSL_read(ssl_, writepos, ssl_read_size);
+      sslStatus = SSL_get_error(ssl_, status);
+    } while (status < 0 && sslStatus == SSL_ERROR_WANT_READ);
 
-      return total_read;
+    if (status < 0)
+      break;
 
-  } else {
-    return -1;
+    writepos += status;
+    total_read += gsl::narrow<size_t>(status);
   }
+
+  return total_read;
 }
 
 } /* namespace io */
diff --git a/libminifi/src/io/tls/TLSSocket.cpp b/libminifi/src/io/tls/TLSSocket.cpp
index a19e5b9..93422fa 100644
--- a/libminifi/src/io/tls/TLSSocket.cpp
+++ b/libminifi/src/io/tls/TLSSocket.cpp
@@ -350,36 +350,38 @@ int16_t TLSSocket::select_descriptor(const uint16_t msec) {
   return -1;
 }
 
-int TLSSocket::read(uint8_t *buf, int buflen, bool /*retrieve_all_bytes*/) {
-  gsl_Expects(buflen >= 0);
-  int total_read = 0;
+size_t TLSSocket::read(uint8_t *buf, size_t buflen, bool /*retrieve_all_bytes*/) {
+  size_t total_read = 0;
   int status = 0;
   int loc = 0;
   int16_t fd = select_descriptor(1000);
   if (fd < 0) {
     close();
-    return -1;
+    return STREAM_ERROR;
   }
   auto fd_ssl = get_ssl(fd);
   if (IsNullOrEmpty(fd_ssl)) {
-    return -1;
+    return STREAM_ERROR;
   }
   if (!SSL_pending(fd_ssl)) {
     return 0;
   }
   while (buflen) {
     if (fd <= 0) {
-      return -1;
+      return STREAM_ERROR;
     }
     int sslStatus;
     do {
-      status = SSL_read(fd_ssl, buf + loc, buflen);
+      const auto ssl_read_size = gsl::narrow<int>(std::min(buflen, gsl::narrow<size_t>(std::numeric_limits<int>::max())));
+      status = SSL_read(fd_ssl, buf + loc, ssl_read_size);
       sslStatus = SSL_get_error(fd_ssl, status);
     } while (status < 0 && sslStatus == SSL_ERROR_WANT_READ && SSL_pending(fd_ssl));
 
-    buflen -= status;
+    if (status < 0) break;
+
+    buflen -= gsl::narrow<size_t>(status);
     loc += status;
-    total_read += status;
+    total_read += gsl::narrow<size_t>(status);
   }
 
   return total_read;
@@ -417,33 +419,33 @@ int TLSSocket::write(const uint8_t *value, int size) {
   return writeData(value, size, fd);
 }
 
-int TLSSocket::read(uint8_t *buf, int buflen) {
-  gsl_Expects(buflen >= 0);
-  int total_read = 0;
+size_t TLSSocket::read(uint8_t *buf, size_t buflen) {
+  size_t total_read = 0;
   int status = 0;
   while (buflen) {
-    int16_t fd = select_descriptor(1000);
+    const int16_t fd = select_descriptor(1000);
     if (fd < 0) {
       close();
-      return -1;
+      return STREAM_ERROR;
     }
 
     int sslStatus;
     do {
-      auto fd_ssl = get_ssl(fd);
+      const auto fd_ssl = get_ssl(fd);
       if (IsNullOrEmpty(fd_ssl)) {
-        return -1;
+        return STREAM_ERROR;
       }
-      status = SSL_read(fd_ssl, buf, buflen);
+      const auto ssl_read_size = gsl::narrow<int>(std::min(buflen, gsl::narrow<size_t>(std::numeric_limits<int>::max())));
+      status = SSL_read(fd_ssl, buf, ssl_read_size);
       sslStatus = SSL_get_error(fd_ssl, status);
     } while (status < 0 && sslStatus == SSL_ERROR_WANT_READ);
 
     if (status < 0)
       break;
 
-    buflen -= status;
+    buflen -= gsl::narrow<size_t>(status);
     buf += status;
-    total_read += status;
+    total_read += gsl::narrow<size_t>(status);
   }
 
   return total_read;
diff --git a/libminifi/src/provenance/Provenance.cpp b/libminifi/src/provenance/Provenance.cpp
index f9af7f9..f3b0715 100644
--- a/libminifi/src/provenance/Provenance.cpp
+++ b/libminifi/src/provenance/Provenance.cpp
@@ -235,145 +235,191 @@ bool ProvenanceEventRecord::Serialize(const std::shared_ptr<core::SerializableCo
 }
 
 bool ProvenanceEventRecord::DeSerialize(const uint8_t *buffer, const size_t bufferSize) {
-  int ret;
-
   org::apache::nifi::minifi::io::BufferStream outStream(buffer, gsl::narrow<unsigned int>(bufferSize));
 
-  ret = outStream.read(uuid_);
-  if (ret <= 0) {
-    return false;
+  {
+    const auto ret = outStream.read(uuid_);
+    if (ret == 0 || io::isError(ret)) {
+      return false;
+    }
   }
 
   uint32_t eventType;
-  ret = outStream.read(eventType);
-  if (ret != 4) {
-    return false;
+  {
+    const auto ret = outStream.read(eventType);
+    if (ret != 4) {
+      return false;
+    }
   }
-  this->_eventType = (ProvenanceEventRecord::ProvenanceEventType) eventType;
 
-  ret = outStream.read(this->_eventTime);
-  if (ret != 8) {
-    return false;
+  this->_eventType = (ProvenanceEventRecord::ProvenanceEventType) eventType;
+  {
+    const auto ret = outStream.read(this->_eventTime);
+    if (ret != 8) {
+      return false;
+    }
   }
 
-  ret = outStream.read(this->_entryDate);
-  if (ret != 8) {
-    return false;
+  {
+    const auto ret = outStream.read(this->_entryDate);
+    if (ret != 8) {
+      return false;
+    }
   }
 
-  ret = outStream.read(this->_eventDuration);
-  if (ret != 8) {
-    return false;
+  {
+    const auto ret = outStream.read(this->_eventDuration);
+    if (ret != 8) {
+      return false;
+    }
   }
 
-  ret = outStream.read(this->_lineageStartDate);
-  if (ret != 8) {
-    return false;
+  {
+    const auto ret = outStream.read(this->_lineageStartDate);
+    if (ret != 8) {
+      return false;
+    }
   }
 
-  ret = outStream.read(this->_componentId);
-  if (ret <= 0) {
-    return false;
+  {
+    const auto ret = outStream.read(this->_componentId);
+    if (ret == 0 || io::isError(ret)) {
+      return false;
+    }
   }
 
-  ret = outStream.read(this->_componentType);
-  if (ret <= 0) {
-    return false;
+  {
+    const auto ret = outStream.read(this->_componentType);
+    if (ret == 0 || io::isError(ret)) {
+      return false;
+    }
   }
 
-  ret = outStream.read(this->flow_uuid_);
-  if (ret <= 0) {
-    return false;
+  {
+    const auto ret = outStream.read(this->flow_uuid_);
+    if (ret == 0 || io::isError(ret)) {
+      return false;
+    }
   }
 
-  ret = outStream.read(this->_details);
-  if (ret <= 0) {
-    return false;
+  {
+    const auto ret = outStream.read(this->_details);
+    if (ret == 0 || io::isError(ret)) {
+      return false;
+    }
   }
 
   // read flow attributes
   uint32_t numAttributes = 0;
-  ret = outStream.read(numAttributes);
-  if (ret != 4) {
-    return false;
+  {
+    const auto ret = outStream.read(numAttributes);
+    if (ret != 4) {
+      return false;
+    }
   }
 
   for (uint32_t i = 0; i < numAttributes; i++) {
     std::string key;
-    ret = outStream.read(key);
-    if (ret <= 0) {
-      return false;
+    {
+      const auto ret = outStream.read(key);
+      if (ret == 0 || io::isError(ret)) {
+        return false;
+      }
     }
     std::string value;
-    ret = outStream.read(value);
-    if (ret <= 0) {
-      return false;
+    {
+      const auto ret = outStream.read(value);
+      if (ret == 0 || io::isError(ret)) {
+        return false;
+      }
     }
     this->_attributes[key] = value;
   }
 
-  ret = outStream.read(this->_contentFullPath);
-  if (ret <= 0) {
-    return false;
+  {
+    const auto ret = outStream.read(this->_contentFullPath);
+    if (ret == 0 || io::isError(ret)) {
+      return false;
+    }
   }
 
-  ret = outStream.read(this->_size);
-  if (ret != 8) {
-    return false;
+  {
+    const auto ret = outStream.read(this->_size);
+    if (ret != 8) {
+      return false;
+    }
   }
 
-  ret = outStream.read(this->_offset);
-  if (ret != 8) {
-    return false;
+  {
+    const auto ret = outStream.read(this->_offset);
+    if (ret != 8) {
+      return false;
+    }
   }
 
-  ret = outStream.read(this->_sourceQueueIdentifier);
-  if (ret <= 0) {
-    return false;
+  {
+    const auto ret = outStream.read(this->_sourceQueueIdentifier);
+    if (ret == 0 || io::isError(ret)) {
+      return false;
+    }
   }
 
   if (this->_eventType == ProvenanceEventRecord::FORK || this->_eventType == ProvenanceEventRecord::CLONE || this->_eventType == ProvenanceEventRecord::JOIN) {
     // read UUIDs
     uint32_t number = 0;
-    ret = outStream.read(number);
-    if (ret != 4) {
-      return false;
+    {
+      const auto ret = outStream.read(number);
+      if (ret != 4) {
+        return false;
+      }
     }
 
     for (uint32_t i = 0; i < number; i++) {
       utils::Identifier parentUUID;
-      ret = outStream.read(parentUUID);
-      if (ret <= 0) {
-        return false;
+      {
+        const auto ret = outStream.read(parentUUID);
+        if (ret == 0 || io::isError(ret)) {
+          return false;
+        }
       }
       this->addParentUuid(parentUUID);
     }
     number = 0;
-    ret = outStream.read(number);
-    if (ret != 4) {
-      return false;
+    {
+      const auto ret = outStream.read(number);
+      if (ret != 4) {
+        return false;
+      }
     }
     for (uint32_t i = 0; i < number; i++) {
       utils::Identifier childUUID;
-      ret = outStream.read(childUUID);
-      if (ret <= 0) {
-        return false;
+      {
+        const auto ret = outStream.read(childUUID);
+        if (ret == 0 || io::isError(ret)) {
+          return false;
+        }
       }
       this->addChildUuid(childUUID);
     }
   } else if (this->_eventType == ProvenanceEventRecord::SEND || this->_eventType == ProvenanceEventRecord::FETCH) {
-    ret = outStream.read(this->_transitUri);
-    if (ret <= 0) {
-      return false;
+    {
+      const auto ret = outStream.read(this->_transitUri);
+      if (ret == 0 || io::isError(ret)) {
+        return false;
+      }
     }
   } else if (this->_eventType == ProvenanceEventRecord::RECEIVE) {
-    ret = outStream.read(this->_transitUri);
-    if (ret <= 0) {
-      return false;
+    {
+      const auto ret = outStream.read(this->_transitUri);
+      if (ret == 0 || io::isError(ret)) {
+        return false;
+      }
     }
-    ret = outStream.read(this->_sourceSystemFlowFileIdentifier);
-    if (ret <= 0) {
-      return false;
+    {
+      const auto ret = outStream.read(this->_sourceSystemFlowFileIdentifier);
+      if (ret == 0 || io::isError(ret)) {
+        return false;
+      }
     }
   }
 
diff --git a/libminifi/src/sitetosite/RawSocketProtocol.cpp b/libminifi/src/sitetosite/RawSocketProtocol.cpp
index 7ea213f..b60b2f4 100644
--- a/libminifi/src/sitetosite/RawSocketProtocol.cpp
+++ b/libminifi/src/sitetosite/RawSocketProtocol.cpp
@@ -114,28 +114,31 @@ bool RawSiteToSiteClient::initiateResourceNegotiation() {
 
   logger_->log_debug("Negotiate protocol version with destination port %s current version %d", port_id_.to_string(), _currentVersion);
 
-  int ret = peer_->write(getResourceName());
-
-  logger_->log_trace("result of writing resource name is %i", ret);
-  if (ret <= 0) {
-    logger_->log_debug("result of writing resource name is %i", ret);
-    // tearDown();
-    return false;
+  {
+    const auto ret = peer_->write(getResourceName());
+    logger_->log_trace("result of writing resource name is %i", ret);
+    if (ret <= 0) {
+      logger_->log_debug("result of writing resource name is %i", ret);
+      // tearDown();
+      return false;
+    }
   }
 
-  ret = peer_->write(_currentVersion);
-
-  if (ret <= 0) {
-    logger_->log_debug("result of writing version is %i", ret);
-    return false;
+  {
+    const auto ret = peer_->write(_currentVersion);
+    if (ret <= 0) {
+      logger_->log_debug("result of writing version is %i", ret);
+      return false;
+    }
   }
 
   uint8_t statusCode;
-  ret = peer_->read(statusCode);
-
-  if (ret <= 0) {
-    logger_->log_debug("result of writing version status code  %i", ret);
-    return false;
+  {
+    const auto ret = peer_->read(statusCode);
+    if (ret == 0 || io::isError(ret)) {
+      logger_->log_debug("result of writing version status code  %i", ret);
+      return false;
+    }
   }
   logger_->log_debug("status code is %i", statusCode);
   switch (statusCode) {
@@ -144,9 +147,11 @@ bool RawSiteToSiteClient::initiateResourceNegotiation() {
       return true;
     case DIFFERENT_RESOURCE_VERSION:
       uint32_t serverVersion;
-      ret = peer_->read(serverVersion);
-      if (ret <= 0) {
-        return false;
+      {
+        const auto ret = peer_->read(serverVersion);
+        if (ret == 0 || io::isError(ret)) {
+          return false;
+        }
       }
 
       logging::LOG_INFO(logger_) << "Site2Site Server Response asked for a different protocol version " << serverVersion;
@@ -178,36 +183,40 @@ bool RawSiteToSiteClient::initiateCodecResourceNegotiation() {
 
   logger_->log_trace("Negotiate Codec version with destination port %s current version %d", port_id_.to_string(), _currentCodecVersion);
 
-  int ret = peer_->write(getCodecResourceName());
-
-  if (ret <= 0) {
-    logger_->log_debug("result of getCodecResourceName is %i", ret);
-    return false;
+  {
+    const auto ret = peer_->write(getCodecResourceName());
+    if (ret <= 0) {
+      logger_->log_debug("result of getCodecResourceName is %i", ret);
+      return false;
+    }
   }
 
-  ret = peer_->write(_currentCodecVersion);
-
-  if (ret <= 0) {
-    logger_->log_debug("result of _currentCodecVersion is %i", ret);
-    return false;
+  {
+    const auto ret = peer_->write(_currentCodecVersion);
+    if (ret <= 0) {
+      logger_->log_debug("result of _currentCodecVersion is %i", ret);
+      return false;
+    }
   }
 
   uint8_t statusCode;
-  ret = peer_->read(statusCode);
-
-  if (ret <= 0) {
-    return false;
+  {
+    const auto ret = peer_->read(statusCode);
+    if (ret == 0 || io::isError(ret)) {
+      return false;
+    }
   }
-
   switch (statusCode) {
     case RESOURCE_OK:
       logger_->log_trace("Site2Site Codec Negotiate version OK");
       return true;
     case DIFFERENT_RESOURCE_VERSION:
       uint32_t serverVersion;
-      ret = peer_->read(serverVersion);
-      if (ret <= 0) {
-        return false;
+      {
+        const auto ret = peer_->read(serverVersion);
+        if (ret == 0 || io::isError(ret)) {
+          return false;
+        }
       }
       logging::LOG_INFO(logger_) << "Site2Site Server Response asked for a different protocol version " << serverVersion;
 
@@ -237,10 +246,11 @@ bool RawSiteToSiteClient::handShake() {
   logger_->log_debug("Site2Site Protocol Perform hand shake with destination port %s", port_id_.to_string());
   _commsIdentifier = id_generator_->generate();
 
-  int ret = peer_->write(_commsIdentifier);
-
-  if (ret <= 0) {
-    return false;
+  {
+    const auto ret = peer_->write(_commsIdentifier);
+    if (ret <= 0) {
+      return false;
+    }
   }
 
   std::map<std::string, std::string> properties;
@@ -257,27 +267,33 @@ bool RawSiteToSiteClient::handShake() {
   }
 
   if (_currentVersion >= 3) {
-    ret = peer_->write(peer_->getURL());
+    const auto ret = peer_->write(peer_->getURL());
     if (ret <= 0) {
       return false;
     }
   }
 
-  uint32_t size = gsl::narrow<uint32_t>(properties.size());
-  ret = peer_->write(size);
-  if (ret <= 0) {
-    return false;
+  {
+    const auto size = gsl::narrow<uint32_t>(properties.size());
+    const auto ret = peer_->write(size);
+    if (ret <= 0) {
+      return false;
+    }
   }
 
   std::map<std::string, std::string>::iterator it;
   for (it = properties.begin(); it != properties.end(); it++) {
-    ret = peer_->write(it->first);
-    if (ret <= 0) {
-      return false;
+    {
+      const auto ret = peer_->write(it->first);
+      if (ret <= 0) {
+        return false;
+      }
     }
-    ret = peer_->write(it->second);
-    if (ret <= 0) {
-      return false;
+    {
+      const auto ret = peer_->write(it->second);
+      if (ret <= 0) {
+        return false;
+      }
     }
     logger_->log_debug("Site2Site Protocol Send handshake properties %s %s", it->first, it->second);
   }
@@ -285,10 +301,11 @@ bool RawSiteToSiteClient::handShake() {
   RespondCode code;
   std::string message;
 
-  ret = readRespond(nullptr, code, message);
-
-  if (ret <= 0) {
-    return false;
+  {
+    const auto ret = readRespond(nullptr, code, message);
+    if (ret <= 0) {
+      return false;
+    }
   }
 
   std::string error;
@@ -310,13 +327,11 @@ bool RawSiteToSiteClient::handShake() {
     // Unknown error
     default:
       logger_->log_error("HandShake Failed because of unknown respond code %d", code);
-      ret = -1;
       return false;
   }
 
   // All known error cases handled here
   logger_->log_error("Site2Site HandShake Failed because destination port, %s, is %s", port_id_.to_string(), error);
-  ret = -1;
   return false;
 }
 
@@ -334,48 +349,54 @@ void RawSiteToSiteClient::tearDown() {
 
 bool RawSiteToSiteClient::getPeerList(std::vector<PeerStatus> &peers) {
   if (establish() && handShake()) {
-    int status = writeRequestType(REQUEST_PEER_LIST);
-
-    if (status <= 0) {
+    if (writeRequestType(REQUEST_PEER_LIST) <= 0) {
       tearDown();
       return false;
     }
 
-    uint32_t number;
-    status = peer_->read(number);
-
-    if (status <= 0) {
-      tearDown();
-      return false;
+    uint32_t number_of_peers;
+    {
+      const auto ret = peer_->read(number_of_peers);
+      if (ret == 0 || io::isError(ret)) {
+        tearDown();
+        return false;
+      }
     }
 
-    for (uint32_t i = 0; i < number; i++) {
+    for (uint32_t i = 0; i < number_of_peers; i++) {
       std::string host;
-      status = peer_->read(host);
-      if (status <= 0) {
-        tearDown();
-        return false;
+      {
+        const auto ret = peer_->read(host);
+        if (ret == 0 || io::isError(ret)) {
+          tearDown();
+          return false;
+        }
       }
       uint32_t port;
-      status = peer_->read(port);
-      if (status <= 0) {
-        tearDown();
-        return false;
+      {
+        const auto ret = peer_->read(port);
+        if (ret == 0 || io::isError(ret)) {
+          tearDown();
+          return false;
+        }
       }
       uint8_t secure;
-      status = peer_->read(secure);
-      if (status <= 0) {
-        tearDown();
-        return false;
+      {
+        const auto ret = peer_->read(secure);
+        if (ret == 0 || io::isError(ret)) {
+          tearDown();
+          return false;
+        }
       }
       uint32_t count;
-      status = peer_->read(count);
-      if (status <= 0) {
-        tearDown();
-        return false;
+      {
+        const auto ret = peer_->read(count);
+        if (ret == 0 || io::isError(ret)) {
+          tearDown();
+          return false;
+        }
       }
-      PeerStatus status(std::make_shared<Peer>(port_id_, host, gsl::narrow<uint16_t>(port), secure != 0), count, true);
-      peers.push_back(std::move(status));
+      peers.push_back(PeerStatus{std::make_shared<Peer>(port_id_, host, gsl::narrow<uint16_t>(port), secure != 0), count, true});
       logging::LOG_TRACE(logger_) << "Site2Site Peer host " << host << " port " << port << " Secure " << std::to_string(secure);
     }
 
@@ -397,15 +418,14 @@ bool RawSiteToSiteClient::getPeerList(std::vector<PeerStatus> &peers) {
   int RawSiteToSiteClient::readRequestType(RequestType &type) {
     std::string requestTypeStr;
 
-    int ret = peer_->read(requestTypeStr);
-
-    if (ret <= 0)
-      return ret;
+    const auto ret = peer_->read(requestTypeStr);
+    if (ret == 0 || io::isError(ret))
+      return static_cast<int>(ret);
 
     for (int i = NEGOTIATE_FLOWFILE_CODEC; i <= SHUTDOWN; i++) {
       if (SiteToSiteRequest::RequestTypeStr[i] == requestTypeStr) {
         type = (RequestType) i;
-        return ret;
+        return static_cast<int>(ret);
       }
     }
 
@@ -417,7 +437,7 @@ int RawSiteToSiteClient::readRespond(const std::shared_ptr<Transaction> &transac
 }
 
 int RawSiteToSiteClient::writeRespond(const std::shared_ptr<Transaction> &transaction, RespondCode code, std::string message) {
-  return writeResponse(transaction, code, message);
+  return writeResponse(transaction, code, std::move(message));
 }
 
 bool RawSiteToSiteClient::negotiateCodec() {
diff --git a/libminifi/src/sitetosite/SiteToSiteClient.cpp b/libminifi/src/sitetosite/SiteToSiteClient.cpp
index 1833144..60cef54 100644
--- a/libminifi/src/sitetosite/SiteToSiteClient.cpp
+++ b/libminifi/src/sitetosite/SiteToSiteClient.cpp
@@ -31,47 +31,43 @@ namespace sitetosite {
 
 int SiteToSiteClient::readResponse(const std::shared_ptr<Transaction>& /*transaction*/, RespondCode &code, std::string &message) {
   uint8_t firstByte;
-
-  int ret = peer_->read(firstByte);
-
-  if (ret <= 0 || firstByte != CODE_SEQUENCE_VALUE_1)
-    return -1;
+  {
+    const auto ret = peer_->read(firstByte);
+    if (ret == 0 || io::isError(ret) || firstByte != CODE_SEQUENCE_VALUE_1)
+      return -1;
+  }
 
   uint8_t secondByte;
-
-  ret = peer_->read(secondByte);
-
-  if (ret <= 0 || secondByte != CODE_SEQUENCE_VALUE_2)
-    return -1;
+  {
+    const auto ret = peer_->read(secondByte);
+    if (ret == 0 || io::isError(ret) || secondByte != CODE_SEQUENCE_VALUE_2)
+      return -1;
+  }
 
   uint8_t thirdByte;
-
-  ret = peer_->read(thirdByte);
-
-  if (ret <= 0)
-    return ret;
+  {
+    const auto ret = peer_->read(thirdByte);
+    if (ret == 0 || io::isError(ret))
+      return static_cast<int>(ret);
+  }
 
   code = (RespondCode) thirdByte;
-
   RespondCodeContext *resCode = this->getRespondCodeContext(code);
-
-  if (resCode == NULL) {
-    // Not a valid respond code
+  if (!resCode) {
     return -1;
   }
   if (resCode->hasDescription) {
-    ret = peer_->read(message);
-    if (ret <= 0)
+    const auto ret = peer_->read(message);
+    if (ret == 0 || io::isError(ret))
       return -1;
   }
   return gsl::narrow<int>(3 + message.size());
 }
 
 void SiteToSiteClient::deleteTransaction(const utils::Identifier& transactionID) {
-  std::shared_ptr<Transaction> transaction = NULL;
+  std::shared_ptr<Transaction> transaction;
 
   auto it = this->known_transactions_.find(transactionID);
-
   if (it == known_transactions_.end()) {
     return;
   } else {
@@ -85,7 +81,7 @@ void SiteToSiteClient::deleteTransaction(const utils::Identifier& transactionID)
 int SiteToSiteClient::writeResponse(const std::shared_ptr<Transaction>& /*transaction*/, RespondCode code, std::string message) {
   RespondCodeContext *resCode = this->getRespondCodeContext(code);
 
-  if (resCode == NULL) {
+  if (!resCode) {
     // Not a valid respond code
     return -1;
   }
@@ -205,7 +201,7 @@ bool SiteToSiteClient::transferFlowFiles(const std::shared_ptr<core::ProcessCont
 
 bool SiteToSiteClient::confirm(const utils::Identifier& transactionID) {
   int ret;
-  std::shared_ptr<Transaction> transaction = NULL;
+  std::shared_ptr<Transaction> transaction;
 
   if (peer_state_ != READY) {
     bootstrap();
@@ -529,8 +525,7 @@ int16_t SiteToSiteClient::send(const utils::Identifier &transactionID, DataPacke
 }
 
 bool SiteToSiteClient::receive(const utils::Identifier& transactionID, DataPacket *packet, bool &eof) {
-  int ret;
-  std::shared_ptr<Transaction> transaction = NULL;
+  std::shared_ptr<Transaction> transaction;
 
   if (peer_state_ != READY) {
     bootstrap();
@@ -568,9 +563,7 @@ bool SiteToSiteClient::receive(const utils::Identifier& transactionID, DataPacke
     RespondCode code;
     std::string message;
 
-    ret = readResponse(transaction, code, message);
-
-    if (ret <= 0) {
+    if (readResponse(transaction, code, message) <= 0) {
       return false;
     }
     if (code == CONTINUE_TRANSACTION) {
@@ -595,9 +588,11 @@ bool SiteToSiteClient::receive(const utils::Identifier& transactionID, DataPacke
 
   // start to read the packet
   uint32_t numAttributes;
-  ret = transaction->getStream().read(numAttributes);
-  if (ret <= 0 || numAttributes > MAX_NUM_ATTRIBUTES) {
-    return false;
+  {
+    const auto ret = transaction->getStream().read(numAttributes);
+    if (ret == 0 || io::isError(ret) || numAttributes > MAX_NUM_ATTRIBUTES) {
+      return false;
+    }
   }
 
   // read the attributes
@@ -605,22 +600,28 @@ bool SiteToSiteClient::receive(const utils::Identifier& transactionID, DataPacke
   for (unsigned int i = 0; i < numAttributes; i++) {
     std::string key;
     std::string value;
-    ret = transaction->getStream().read(key, true);
-    if (ret <= 0) {
-      return false;
+    {
+      const auto ret = transaction->getStream().read(key, true);
+      if (ret == 0 || io::isError(ret)) {
+        return false;
+      }
     }
-    ret = transaction->getStream().read(value, true);
-    if (ret <= 0) {
-      return false;
+    {
+      const auto ret = transaction->getStream().read(value, true);
+      if (ret == 0 || io::isError(ret)) {
+        return false;
+      }
     }
     packet->_attributes[key] = value;
     logger_->log_debug("Site2Site transaction %s receives attribute key %s value %s", transactionID.to_string(), key, value);
   }
 
   uint64_t len;
-  ret = transaction->getStream().read(len);
-  if (ret <= 0) {
-    return false;
+  {
+    const auto ret = transaction->getStream().read(len);
+    if (ret == 0 || io::isError(ret)) {
+      return false;
+    }
   }
 
   packet->_size = len;
diff --git a/libminifi/src/utils/ByteArrayCallback.cpp b/libminifi/src/utils/ByteArrayCallback.cpp
index 33c9634..a64c3ac 100644
--- a/libminifi/src/utils/ByteArrayCallback.cpp
+++ b/libminifi/src/utils/ByteArrayCallback.cpp
@@ -35,10 +35,10 @@ int64_t ByteOutputCallback::process(const std::shared_ptr<io::BaseStream>& strea
   if (stream->size() > 0) {
     std::unique_ptr<char> buffer = std::unique_ptr<char>(new char[stream->size()]);
     readFully(buffer.get(), stream->size());
-    stream->read(reinterpret_cast<uint8_t*>(buffer.get()), gsl::narrow<int>(stream->size()));
-    return stream->size();
+    stream->read(reinterpret_cast<uint8_t*>(buffer.get()), stream->size());
+    return gsl::narrow<int64_t>(stream->size());
   }
-  return size_.load();
+  return gsl::narrow<int64_t>(size_.load());
 }
 
 int64_t StreamOutputCallback::process(const std::shared_ptr<io::BaseStream>& stream) {
@@ -46,7 +46,7 @@ int64_t StreamOutputCallback::process(const std::shared_ptr<io::BaseStream>& str
   std::unique_ptr<char> buffer = std::unique_ptr<char>(new char[size_.load()]);
   auto written = readFully(buffer.get(), size_);
   stream->write(reinterpret_cast<uint8_t*>(buffer.get()), gsl::narrow<int>(written));
-  return stream->size();
+  return gsl::narrow<int64_t>(stream->size());
 }
 
 void StreamOutputCallback::write(char *data, size_t size) {
@@ -55,7 +55,7 @@ void StreamOutputCallback::write(char *data, size_t size) {
   write_and_notify(data, size);
 }
 
-const std::vector<char> ByteOutputCallback::to_string() {
+std::vector<char> ByteOutputCallback::to_string() {
   std::vector<char> buffer;
   buffer.resize(size_.load());
   readFully(buffer.data(), size_.load());
diff --git a/libminifi/test/BufferReader.h b/libminifi/test/BufferReader.h
index df39220..8ce8ff2 100644
--- a/libminifi/test/BufferReader.h
+++ b/libminifi/test/BufferReader.h
@@ -24,20 +24,21 @@
 #include <vector>
 
 #include "FlowFileRecord.h"
+#include "Stream.h"
 #include "utils/gsl.h"
 
 class BufferReader : public org::apache::nifi::minifi::InputStreamCallback {
  public:
   explicit BufferReader(std::vector<uint8_t>& buffer) : buffer_(buffer) {}
 
-  int write(org::apache::nifi::minifi::io::BaseStream& input, std::size_t len) {
+  size_t write(org::apache::nifi::minifi::io::BaseStream& input, std::size_t len) {
     uint8_t tmpBuffer[4096]{};
     std::size_t remaining_len = len;
-    int total_read = 0;
+    size_t total_read = 0;
     while (remaining_len > 0) {
-      auto ret = input.read(tmpBuffer, gsl::narrow<int>(std::min(remaining_len, sizeof(tmpBuffer))));
+      const auto ret = input.read(tmpBuffer, std::min(remaining_len, sizeof(tmpBuffer)));
       if (ret == 0) break;
-      if (ret < 0) return ret;
+      if (minifi::io::isError(ret)) return ret;
       remaining_len -= ret;
       total_read += ret;
       auto prevSize = buffer_.size();
@@ -48,7 +49,8 @@ class BufferReader : public org::apache::nifi::minifi::InputStreamCallback {
   }
 
   int64_t process(const std::shared_ptr<org::apache::nifi::minifi::io::BaseStream>& stream) {
-    return write(*stream.get(), stream->size());
+    const auto write_result = write(*stream.get(), stream->size());
+    return minifi::io::isError(write_result) ? -1 : gsl::narrow<int64_t>(write_result);
   }
 
  private:
diff --git a/libminifi/test/archive-tests/CompressContentTests.cpp b/libminifi/test/archive-tests/CompressContentTests.cpp
index 76507df..de1a774 100644
--- a/libminifi/test/archive-tests/CompressContentTests.cpp
+++ b/libminifi/test/archive-tests/CompressContentTests.cpp
@@ -41,29 +41,28 @@
 #include "processors/PutFile.h"
 #include "utils/file/FileUtils.h"
 #include "../Utils.h"
+#include "utils/gsl.h"
 
 class ReadCallback: public minifi::InputStreamCallback {
  public:
-  explicit ReadCallback(size_t size) :
-      read_size_(0) {
-    buffer_size_ = size;
-    buffer_ = new uint8_t[buffer_size_];
-    archive_buffer_ = nullptr;
-    archive_buffer_size_ = 0;
-  }
-  ~ReadCallback() {
-    if (buffer_)
-      delete[] buffer_;
-    if (archive_buffer_)
-      delete[] archive_buffer_;
-  }
-  int64_t process(const std::shared_ptr<minifi::io::BaseStream>& stream) {
+  explicit ReadCallback(size_t size)
+      :buffer_size_{size}  // default member initializers use this
+  {}
+  ReadCallback(const ReadCallback&) = delete;
+  ReadCallback(ReadCallback&&) = delete;
+  ReadCallback& operator=(const ReadCallback&) = delete;
+  ReadCallback& operator=(ReadCallback&&) = delete;
+
+  ~ReadCallback() override {
+    delete[] buffer_;
+    delete[] archive_buffer_;
+  }
+  int64_t process(const std::shared_ptr<minifi::io::BaseStream>& stream) override {
     int64_t total_read = 0;
-    int64_t ret = 0;
     do {
-      ret = stream->read(buffer_ + read_size_, gsl::narrow<int>(buffer_size_ - read_size_));
+      const auto ret = stream->read(buffer_ + read_size_, buffer_size_ - read_size_);
       if (ret == 0) break;
-      if (ret < 0) return ret;
+      if (minifi::io::isError(ret)) return -1;
       read_size_ += gsl::narrow<size_t>(ret);
       total_read += ret;
     } while (buffer_size_ != read_size_);
@@ -78,18 +77,18 @@ class ReadCallback: public minifi::InputStreamCallback {
     struct archive_entry *ae;
 
     REQUIRE(archive_read_next_header(a, &ae) == ARCHIVE_OK);
-    int size = gsl::narrow<int>(archive_entry_size(ae));
+    const auto size = archive_entry_size(ae);
     archive_buffer_ = new char[size];
     archive_buffer_size_ = size;
-    archive_read_data(a, archive_buffer_, size);
+    archive_read_data(a, archive_buffer_, gsl::narrow<size_t>(size));
     archive_read_free(a);
   }
 
-  uint8_t *buffer_;
   size_t buffer_size_;
-  size_t read_size_;
-  char *archive_buffer_;
-  int archive_buffer_size_;
+  uint8_t *buffer_ = new uint8_t[buffer_size_];
+  size_t read_size_ = 0;
+  char *archive_buffer_ = nullptr;
+  int64_t archive_buffer_size_ = 0;
 };
 
 /**
diff --git a/libminifi/test/archive-tests/MergeFileTests.cpp b/libminifi/test/archive-tests/MergeFileTests.cpp
index 60e233f..4ad473d 100644
--- a/libminifi/test/archive-tests/MergeFileTests.cpp
+++ b/libminifi/test/archive-tests/MergeFileTests.cpp
@@ -88,9 +88,9 @@ class FixedBuffer : public minifi::InputStreamCallback {
     REQUIRE(size_ + len <= capacity_);
     int total_read = 0;
     do {
-      auto ret = input.read(end(), gsl::narrow<int>(len));
+      const auto ret = input.read(end(), len);
       if (ret == 0) break;
-      if (ret < 0) return ret;
+      if (minifi::io::isError(ret)) return -1;
       size_ += ret;
       len -= ret;
       total_read += ret;
@@ -98,7 +98,7 @@ class FixedBuffer : public minifi::InputStreamCallback {
     return total_read;
   }
   int64_t process(const std::shared_ptr<minifi::io::BaseStream>& stream) {
-    return write(*stream.get(), capacity_);
+    return write(*stream, capacity_);
   }
 
  private:
@@ -111,8 +111,9 @@ std::vector<FixedBuffer> read_archives(const FixedBuffer& input) {
   class ArchiveEntryReader {
    public:
     explicit ArchiveEntryReader(archive* arch) : arch(arch) {}
-    int read(uint8_t* out, std::size_t len) {
-      return gsl::narrow<int>(archive_read_data(arch, out, len));
+    size_t read(uint8_t* out, std::size_t len) {
+      const auto ret = archive_read_data(arch, out, len);
+      return ret < 0 ? minifi::io::STREAM_ERROR : gsl::narrow<size_t>(ret);
     }
    private:
     archive* arch;
diff --git a/libminifi/test/rocksdb-tests/ContentSessionTests.cpp b/libminifi/test/rocksdb-tests/ContentSessionTests.cpp
index 059dfa9..f63947b 100644
--- a/libminifi/test/rocksdb-tests/ContentSessionTests.cpp
+++ b/libminifi/test/rocksdb-tests/ContentSessionTests.cpp
@@ -56,12 +56,10 @@ const std::shared_ptr<minifi::io::BaseStream>& operator>>(const std::shared_ptr<
   str = "";
   uint8_t buffer[4096]{};
   while (true) {
-    auto ret = stream->read(buffer, sizeof(buffer));
-    REQUIRE(ret >= 0);
-    if (ret == 0) {
-      break;
-    }
-    str += std::string{reinterpret_cast<char*>(buffer), static_cast<std::size_t>(ret)};
+    const auto ret = stream->read(buffer, sizeof(buffer));
+    REQUIRE(!minifi::io::isError(ret));
+    if (ret == 0) { break; }
+    str += std::string{reinterpret_cast<char*>(buffer), ret};
   }
   return stream;
 }
diff --git a/libminifi/test/rocksdb-tests/DBContentRepositoryTests.cpp b/libminifi/test/rocksdb-tests/DBContentRepositoryTests.cpp
index 6409f3e..712d9f8 100644
--- a/libminifi/test/rocksdb-tests/DBContentRepositoryTests.cpp
+++ b/libminifi/test/rocksdb-tests/DBContentRepositoryTests.cpp
@@ -103,8 +103,8 @@ TEST_CASE("Delete Claim", "[TestDBCR2]") {
 
   std::string readstr;
 
-  // -1 tell us we have an invalid stream
-  REQUIRE(read_stream->read(readstr) == -1);
+  // error tells us we have an invalid stream
+  REQUIRE(minifi::io::isError(read_stream->read(readstr)));
 }
 
 TEST_CASE("Test Empty Claim", "[TestDBCR3]") {
@@ -139,8 +139,8 @@ TEST_CASE("Test Empty Claim", "[TestDBCR3]") {
 
   std::string readstr;
 
-  // -1 tell us we have an invalid stream
-  REQUIRE(read_stream->read(readstr) == -1);
+  // error tells us we have an invalid stream
+  REQUIRE(minifi::io::isError(read_stream->read(readstr)));
 }
 
 TEST_CASE("Delete NonExistent Claim", "[TestDBCR4]") {
@@ -179,7 +179,6 @@ TEST_CASE("Delete NonExistent Claim", "[TestDBCR4]") {
 
   std::string readstr;
 
-  // -1 tell us we have an invalid stream
   read_stream->read(readstr);
 
   REQUIRE(readstr == "well hello there");
@@ -227,7 +226,6 @@ TEST_CASE("Delete Remove Count Claim", "[TestDBCR5]") {
 
   std::string readstr;
 
-  // -1 tell us we have an invalid stream
   read_stream->read(readstr);
 
   REQUIRE(readstr == "well hello there");
diff --git a/libminifi/test/rocksdb-tests/RocksDBStreamTests.cpp b/libminifi/test/rocksdb-tests/RocksDBStreamTests.cpp
index 88b3929..69336b8 100644
--- a/libminifi/test/rocksdb-tests/RocksDBStreamTests.cpp
+++ b/libminifi/test/rocksdb-tests/RocksDBStreamTests.cpp
@@ -72,5 +72,5 @@ TEST_CASE_METHOD(RocksDBStreamTest, "Read zero bytes") {
 
   minifi::io::RocksDbStream nonExistingStream("two", gsl::make_not_null(db.get()));
 
-  REQUIRE(nonExistingStream.read(nullptr, 0) == -1);
+  REQUIRE(minifi::io::isError(nonExistingStream.read(nullptr, 0)));
 }
diff --git a/libminifi/test/unit/FileStreamTests.cpp b/libminifi/test/unit/FileStreamTests.cpp
index b13dcfa..57bae57 100644
--- a/libminifi/test/unit/FileStreamTests.cpp
+++ b/libminifi/test/unit/FileStreamTests.cpp
@@ -45,7 +45,7 @@ TEST_CASE("TestFileOverWrite", "[TestFiles]") {
 
   minifi::io::FileStream stream(path, 0, true);
   std::vector<uint8_t> readBuffer;
-  REQUIRE(stream.read(readBuffer, gsl::narrow<int>(stream.size())) == stream.size());
+  REQUIRE(stream.read(readBuffer, stream.size()) == stream.size());
 
   uint8_t* data = readBuffer.data();
 
@@ -59,7 +59,7 @@ TEST_CASE("TestFileOverWrite", "[TestFiles]") {
 
   std::vector<uint8_t> verifybuffer;
 
-  REQUIRE(stream.read(verifybuffer, gsl::narrow<int>(stream.size())) == stream.size());
+  REQUIRE(stream.read(verifybuffer, stream.size()) == stream.size());
 
   data = verifybuffer.data();
 
@@ -83,7 +83,7 @@ TEST_CASE("TestFileBadArgumentNoChange", "[TestLoader]") {
 
   minifi::io::FileStream stream(path, 0, true);
   std::vector<uint8_t> readBuffer;
-  REQUIRE(stream.read(readBuffer, gsl::narrow<int>(stream.size())) == stream.size());
+  REQUIRE(stream.read(readBuffer, stream.size()) == stream.size());
 
   uint8_t* data = readBuffer.data();
 
@@ -97,7 +97,7 @@ TEST_CASE("TestFileBadArgumentNoChange", "[TestLoader]") {
 
   std::vector<uint8_t> verifybuffer;
 
-  REQUIRE(stream.read(verifybuffer, gsl::narrow<int>(stream.size())) == stream.size());
+  REQUIRE(stream.read(verifybuffer, stream.size()) == stream.size());
 
   data = verifybuffer.data();
 
@@ -121,7 +121,7 @@ TEST_CASE("TestFileBadArgumentNoChange2", "[TestLoader]") {
 
   minifi::io::FileStream stream(path, 0, true);
   std::vector<uint8_t> readBuffer;
-  REQUIRE(stream.read(readBuffer, gsl::narrow<int>(stream.size())) == stream.size());
+  REQUIRE(stream.read(readBuffer, stream.size()) == stream.size());
 
   uint8_t* data = readBuffer.data();
 
@@ -135,7 +135,7 @@ TEST_CASE("TestFileBadArgumentNoChange2", "[TestLoader]") {
 
   std::vector<uint8_t> verifybuffer;
 
-  REQUIRE(stream.read(verifybuffer, gsl::narrow<int>(stream.size())) == stream.size());
+  REQUIRE(stream.read(verifybuffer, stream.size()) == stream.size());
 
   data = verifybuffer.data();
 
@@ -159,7 +159,7 @@ TEST_CASE("TestFileBadArgumentNoChange3", "[TestLoader]") {
 
   minifi::io::FileStream stream(path, 0, true);
   std::vector<uint8_t> readBuffer;
-  REQUIRE(stream.read(readBuffer, gsl::narrow<int>(stream.size())) == stream.size());
+  REQUIRE(stream.read(readBuffer, stream.size()) == stream.size());
 
   uint8_t* data = readBuffer.data();
 
@@ -173,11 +173,11 @@ TEST_CASE("TestFileBadArgumentNoChange3", "[TestLoader]") {
 
   std::vector<uint8_t> verifybuffer;
 
-  REQUIRE(stream.read(nullptr, gsl::narrow<int>(stream.size())) == -1);
+  REQUIRE(minifi::io::isError(stream.read(nullptr, stream.size())));
 
   data = verifybuffer.data();
 
-  REQUIRE(std::string(reinterpret_cast<char*>(data), verifybuffer.size()) == "");
+  REQUIRE(std::string(reinterpret_cast<char*>(data), verifybuffer.size()).empty());
 
   std::remove(ss.str().c_str());
 }
@@ -197,7 +197,7 @@ TEST_CASE("TestFileBeyondEnd3", "[TestLoader]") {
 
   minifi::io::FileStream stream(path, 0, true);
   std::vector<uint8_t> readBuffer;
-  REQUIRE(stream.read(readBuffer, gsl::narrow<int>(stream.size())) == stream.size());
+  REQUIRE(stream.read(readBuffer, stream.size()) == stream.size());
 
   uint8_t* data = readBuffer.data();
 
@@ -232,7 +232,7 @@ TEST_CASE("TestFileExceedSize", "[TestLoader]") {
 
   minifi::io::FileStream stream(path, 0, true);
   std::vector<uint8_t> readBuffer;
-  REQUIRE(stream.read(readBuffer, gsl::narrow<int>(stream.size())) == stream.size());
+  REQUIRE(stream.read(readBuffer, stream.size()) == stream.size());
 
   uint8_t* data = readBuffer.data();
 
@@ -280,7 +280,7 @@ TEST_CASE("Non-existing file read/write test") {
   REQUIRE(test_controller.getLog().getInstance().contains("Error writing to file: invalid file stream", std::chrono::seconds(0)));
   std::vector<uint8_t> readBuffer;
   stream.seek(0);
-  REQUIRE(stream.read(readBuffer, 1) == -1);
+  REQUIRE(minifi::io::isError(stream.read(readBuffer, 1)));
   REQUIRE(test_controller.getLog().getInstance().contains("Error reading from file: invalid file stream", std::chrono::seconds(0)));
 }
 
@@ -300,10 +300,10 @@ TEST_CASE("Existing file read/write test") {
   REQUIRE_FALSE(test_controller.getLog().getInstance().contains("Error writing to file", std::chrono::seconds(0)));
   std::vector<uint8_t> readBuffer;
   stream.seek(0);
-  REQUIRE_FALSE(stream.read(readBuffer, 11) == -1);
+  REQUIRE_FALSE(minifi::io::isError(stream.read(readBuffer, 11)));
   REQUIRE_FALSE(test_controller.getLog().getInstance().contains("Error reading from file", std::chrono::seconds(0)));
   stream.seek(0);
-  REQUIRE(stream.read(nullptr, 11) == -1);
+  REQUIRE(minifi::io::isError(stream.read(nullptr, 11)));
   REQUIRE(test_controller.getLog().getInstance().contains("Error reading from file: invalid buffer", std::chrono::seconds(0)));
 }
 
diff --git a/libminifi/test/unit/SiteToSiteHelper.h b/libminifi/test/unit/SiteToSiteHelper.h
index b0c9139..6e913b9 100644
--- a/libminifi/test/unit/SiteToSiteHelper.h
+++ b/libminifi/test/unit/SiteToSiteHelper.h
@@ -60,7 +60,7 @@ class SiteToSiteResponder : public minifi::io::BaseStream {
    * @param len length to read
    * @return resulting read size
    **/
-  int read(uint8_t *value, int len) override {
+  size_t read(uint8_t *value, size_t len) override {
     return server_responses_.read(value, len);
   }
 };
diff --git a/nanofi/src/api/nanofi.cpp b/nanofi/src/api/nanofi.cpp
index bdb7e37..d3808a1 100644
--- a/nanofi/src/api/nanofi.cpp
+++ b/nanofi/src/api/nanofi.cpp
@@ -459,7 +459,7 @@ int get_content(const flow_file_record* ff, uint8_t* target, int size) {
     std::shared_ptr<minifi::ResourceClaim> claim = std::make_shared<minifi::ResourceClaim>(ff->contentLocation,
                                                                                            *content_repo);
     auto stream = (*content_repo)->read(*claim);
-    return stream->read(target, size);
+    return static_cast<int>(stream->read(target, static_cast<size_t>(size)));
   } else {
     file_buffer fb = file_to_buffer(ff->contentLocation);
     if (size < 0) {
diff --git a/nanofi/tests/CSite2SiteTests.cpp b/nanofi/tests/CSite2SiteTests.cpp
index e5af7b7..3db3f1c 100644
--- a/nanofi/tests/CSite2SiteTests.cpp
+++ b/nanofi/tests/CSite2SiteTests.cpp
@@ -39,6 +39,7 @@
 #include "core/cstructs.h"
 #include "RandomServerSocket.h"
 #include "core/log.h"
+#include "utils/gsl.h"
 
 #define FMT_DEFAULT fmt_lower
 
@@ -144,15 +145,15 @@ void sunny_path_bootstrap(minifi::io::BaseStream* stream, TransferState& transfe
   size_t read_len = 0;
   while (!found_codec) {
     uint8_t handshake_data[1000];
-    int actual_len = stream->read(handshake_data+read_len, 1000-read_len);
-    if (actual_len <= 0) {
+    const auto actual_len = stream->read(handshake_data+read_len, 1000-read_len);
+    if(actual_len == 0 || minifi::io::isError(actual_len)) {
       continue;
     }
     read_len += actual_len;
-    std::string incoming_data(reinterpret_cast<const char *>(handshake_data), read_len);
-    auto it = std::search(incoming_data.begin(), incoming_data.end(), CODEC_NAME.begin(), CODEC_NAME.end());
-    if (it != incoming_data.end()) {
-      size_t idx = std::distance(incoming_data.begin(), it);
+    const std::string incoming_data(reinterpret_cast<const char *>(handshake_data), read_len);
+    const auto it = std::search(incoming_data.begin(), incoming_data.end(), CODEC_NAME.begin(), CODEC_NAME.end());
+    if(it != incoming_data.end()){
+      const auto idx = gsl::narrow<size_t>(std::distance(incoming_data.begin(), it));
       // Actual version follows the string as an uint32_t // that should be the end of the buffer
       found_codec = idx + CODEC_NAME.length() + sizeof(uint32_t) == read_len;
     }