You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by sz...@apache.org on 2021/05/26 12:46:09 UTC

[nifi-minifi-cpp] 01/17: convert InputStream::read to size_t

This is an automated email from the ASF dual-hosted git repository.

szaszm pushed a commit to branch MINIFICPP-1507
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit 5c41fff9c012860cebb8c830c38057b93efef7dc
Author: Marton Szasz <sz...@gmail.com>
AuthorDate: Fri Mar 12 14:10:55 2021 +0100

    convert InputStream::read to size_t
---
 extensions/aws/processors/PutS3Object.h            | 11 ++--
 extensions/civetweb/processors/ListenHTTP.h        | 17 +++---
 extensions/http-curl/client/HTTPCallback.h         |  2 +-
 extensions/http-curl/client/HTTPStream.cpp         |  9 ++--
 extensions/http-curl/client/HTTPStream.h           |  4 +-
 extensions/http-curl/tests/CivetStream.h           |  8 ++-
 extensions/http-curl/tests/HTTPHandlers.h          |  9 ++--
 extensions/jni/jvm/JniReferenceObjects.h           |  3 +-
 extensions/libarchive/CompressContent.h            | 49 +++++++++--------
 extensions/libarchive/FocusArchiveEntry.cpp        |  9 ++--
 extensions/librdkafka/PublishKafka.cpp             |  7 ++-
 extensions/mqtt/processors/ConvertJSONAck.h        | 13 ++---
 extensions/mqtt/processors/ConvertUpdate.cpp       |  2 +-
 extensions/mqtt/processors/PublishMQTT.h           | 18 ++++---
 extensions/opc/src/putopc.cpp                      | 14 ++---
 extensions/rocksdb-repos/RocksDbStream.cpp         | 35 +++++-------
 extensions/rocksdb-repos/RocksDbStream.h           |  4 +-
 extensions/script/python/PyBaseStream.cpp          |  8 ++-
 extensions/script/python/PyBaseStream.h            |  2 +-
 extensions/sftp/client/SFTPClient.cpp              | 14 ++---
 .../standard-processors/processors/ExtractText.cpp | 22 ++++----
 .../standard-processors/processors/LogAttribute.h  | 20 ++++---
 .../standard-processors/processors/PutFile.cpp     | 25 ++++-----
 .../standard-processors/processors/PutFile.h       |  2 +-
 libminifi/include/io/AtomicEntryStream.h           | 13 +++--
 libminifi/include/io/BufferStream.h                |  6 +--
 libminifi/include/io/CRCStream.h                   |  6 +--
 libminifi/include/io/ClientSocket.h                |  4 +-
 libminifi/include/io/DescriptorStream.h            |  4 +-
 libminifi/include/io/FileStream.h                  |  4 +-
 libminifi/include/io/InputStream.h                 | 16 +++---
 libminifi/include/io/Stream.h                      |  2 +-
 libminifi/include/io/StreamPipe.h                  | 12 ++---
 libminifi/include/io/tls/SecureDescriptorStream.h  |  4 +-
 libminifi/include/io/tls/TLSSocket.h               |  4 +-
 libminifi/include/provenance/Provenance.h          | 16 +++---
 libminifi/include/sitetosite/Peer.h                |  2 +-
 libminifi/include/sitetosite/SiteToSiteClient.h    | 55 ++++++++-----------
 libminifi/include/utils/ByteArrayCallback.h        |  7 +--
 libminifi/include/utils/Enum.h                     |  4 +-
 libminifi/include/utils/FileOutputCallback.h       |  2 +-
 libminifi/src/FlowFileRecord.cpp                   | 14 +++--
 libminifi/src/c2/ControllerSocketProtocol.cpp      | 63 +++++++++++-----------
 libminifi/src/core/ProcessSession.cpp              | 10 ++--
 libminifi/src/core/ProcessSessionReadCallback.cpp  | 13 ++---
 libminifi/src/io/BufferStream.cpp                  | 15 +++---
 libminifi/src/io/ClientSocket.cpp                  | 21 ++++----
 libminifi/src/io/DescriptorStream.cpp              | 15 +++---
 libminifi/src/io/FileStream.cpp                    | 22 ++++----
 libminifi/src/io/InputStream.cpp                   | 27 +++++-----
 libminifi/src/io/tls/SecureDescriptorStream.cpp    | 41 +++++++-------
 libminifi/src/io/tls/TLSSocket.cpp                 | 36 ++++++-------
 libminifi/src/provenance/Provenance.cpp            | 32 ++++++-----
 libminifi/src/sitetosite/RawSocketProtocol.cpp     | 35 ++++++------
 libminifi/src/sitetosite/SiteToSiteClient.cpp      | 35 ++++++------
 libminifi/src/utils/ByteArrayCallback.cpp          | 10 ++--
 libminifi/src/utils/FileOutputCallback.cpp         |  2 +-
 libminifi/test/BufferReader.h                      | 10 ++--
 .../test/archive-tests/CompressContentTests.cpp    | 40 ++++++--------
 libminifi/test/archive-tests/MergeFileTests.cpp    | 10 ++--
 .../test/rocksdb-tests/ContentSessionTests.cpp     |  9 ++--
 .../rocksdb-tests/DBContentRepositoryTests.cpp     |  4 +-
 .../test/rocksdb-tests/RocksDBStreamTests.cpp      |  2 +-
 libminifi/test/unit/FileStreamTests.cpp            | 28 +++++-----
 libminifi/test/unit/SiteToSiteHelper.h             |  2 +-
 nanofi/src/api/nanofi.cpp                          |  2 +-
 nanofi/tests/CSite2SiteTests.cpp                   | 11 ++--
 67 files changed, 458 insertions(+), 519 deletions(-)

diff --git a/extensions/aws/processors/PutS3Object.h b/extensions/aws/processors/PutS3Object.h
index ad3072a..e55ac1e 100644
--- a/extensions/aws/processors/PutS3Object.h
+++ b/extensions/aws/processors/PutS3Object.h
@@ -31,6 +31,7 @@
 
 #include "S3Processor.h"
 #include "utils/GeneralUtils.h"
+#include "utils/gsl.h"
 
 template<typename T>
 class S3TestsFixture;
@@ -94,20 +95,20 @@ class PutS3Object : public S3Processor {
       auto data_stream = std::make_shared<std::stringstream>();
       read_size_ = 0;
       while (read_size_ < flow_size_) {
-        auto next_read_size = (std::min)(flow_size_ - read_size_, BUFFER_SIZE);
-        int read_ret = stream->read(buffer, next_read_size);
-        if (read_ret < 0) {
+        const auto next_read_size = (std::min)(flow_size_ - read_size_, BUFFER_SIZE);
+        const auto read_ret = stream->read(buffer.data(), next_read_size);
+        if (read_ret == static_cast<size_t>(-1)) {
           return -1;
         }
         if (read_ret > 0) {
-          data_stream->write(reinterpret_cast<char*>(buffer.data()), next_read_size);
+          data_stream->write(reinterpret_cast<char*>(buffer.data()), gsl::narrow<std::streamsize>(next_read_size));
           read_size_ += read_ret;
         } else {
           break;
         }
       }
       result_ = s3_wrapper_.putObject(options_, data_stream);
-      return read_size_;
+      return gsl::narrow<int64_t>(read_size_);
     }
 
     uint64_t flow_size_;
diff --git a/extensions/civetweb/processors/ListenHTTP.h b/extensions/civetweb/processors/ListenHTTP.h
index adf4a76..960185c 100644
--- a/extensions/civetweb/processors/ListenHTTP.h
+++ b/extensions/civetweb/processors/ListenHTTP.h
@@ -32,6 +32,7 @@
 #include "core/Resource.h"
 #include "core/logging/LoggerConfiguration.h"
 #include "utils/MinifiConcurrentQueue.h"
+#include "utils/gsl.h"
 
 namespace org {
 namespace apache {
@@ -48,7 +49,7 @@ class ListenHTTP : public core::Processor {
   /*!
    * Create a new processor
    */
-  ListenHTTP(const std::string& name, const utils::Identifier& uuid = {})
+  explicit ListenHTTP(const std::string& name, const utils::Identifier& uuid = {})
       : Processor(name, uuid),
         logger_(logging::LoggerFactory<ListenHTTP>::getLogger()),
         batch_size_(0) {
@@ -56,7 +57,7 @@ class ListenHTTP : public core::Processor {
     callbacks_.log_access = &logAccess;
   }
   // Destructor
-  virtual ~ListenHTTP();
+  ~ListenHTTP() override;
   // Processor Name
   static constexpr char const *ProcessorName = "ListenHTTP";
   // Supported Properties
@@ -131,14 +132,12 @@ class ListenHTTP : public core::Processor {
     }
     int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
       out_str_->resize(stream->size());
-      uint64_t num_read = stream->read(reinterpret_cast<uint8_t *>(&(*out_str_)[0]),
-                                           gsl::narrow<int>(stream->size()));
-
+      const auto num_read = stream->read(reinterpret_cast<uint8_t *>(&(*out_str_)[0]), stream->size());
       if (num_read != stream->size()) {
         throw std::runtime_error("GraphReadCallback failed to fully read flow file input stream");
       }
 
-      return num_read;
+      return gsl::narrow<int64_t>(num_read);
     }
 
    private:
@@ -148,7 +147,7 @@ class ListenHTTP : public core::Processor {
   // Write callback for transferring data from HTTP request to content repo
   class WriteCallback : public OutputStreamCallback {
    public:
-    WriteCallback(std::unique_ptr<io::BufferStream>);
+    explicit WriteCallback(std::unique_ptr<io::BufferStream>);
     int64_t process(const std::shared_ptr<io::BaseStream>& stream) override;
 
    private:
@@ -159,11 +158,11 @@ class ListenHTTP : public core::Processor {
     try {
       struct mg_context* ctx = mg_get_context(conn);
       /* CivetServer stores 'this' as the userdata when calling mg_start */
-      CivetServer* server = static_cast<CivetServer*>(mg_get_user_data(ctx));
+      auto* const server = static_cast<CivetServer*>(mg_get_user_data(ctx));
       if (server == nullptr) {
         return 0;
       }
-      std::shared_ptr<logging::Logger>* logger = static_cast<std::shared_ptr<logging::Logger>*>(const_cast<void*>(server->getUserContext()));
+      auto* const logger = static_cast<std::shared_ptr<logging::Logger>*>(const_cast<void*>(server->getUserContext()));
       if (logger == nullptr) {
         return 0;
       }
diff --git a/extensions/http-curl/client/HTTPCallback.h b/extensions/http-curl/client/HTTPCallback.h
index 921a8c5..242990e 100644
--- a/extensions/http-curl/client/HTTPCallback.h
+++ b/extensions/http-curl/client/HTTPCallback.h
@@ -83,7 +83,7 @@ class HttpStreamingCallback : public ByteInputCallBack {
 
     if (stream->size() > 0) {
       vec.resize(stream->size());
-      stream->read(reinterpret_cast<uint8_t*>(vec.data()), gsl::narrow<int>(stream->size()));
+      stream->read(reinterpret_cast<uint8_t*>(vec.data()), stream->size());
     }
 
     return processInner(std::move(vec));
diff --git a/extensions/http-curl/client/HTTPStream.cpp b/extensions/http-curl/client/HTTPStream.cpp
index 5ba7f3f..f9b8f49 100644
--- a/extensions/http-curl/client/HTTPStream.cpp
+++ b/extensions/http-curl/client/HTTPStream.cpp
@@ -51,7 +51,7 @@ void HttpStream::close() {
   http_read_callback_.close();
 }
 
-void HttpStream::seek(uint64_t /*offset*/) {
+void HttpStream::seek(size_t /*offset*/) {
   // seek is an unnecessary part of this implementatino
   throw std::logic_error{"HttpStream::seek is unimplemented"};
 }
@@ -81,8 +81,7 @@ int HttpStream::write(const uint8_t *value, int size) {
   }
 }
 
-int HttpStream::read(uint8_t *buf, int buflen) {
-  gsl_Expects(buflen >= 0);
+size_t HttpStream::read(uint8_t *buf, size_t buflen) {
   if (buflen == 0) {
     return 0;
   }
@@ -97,10 +96,10 @@ int HttpStream::read(uint8_t *buf, int buflen) {
         started_ = true;
       }
     }
-    return gsl::narrow<int>(http_read_callback_.readFully((char*) buf, buflen));
+    return http_read_callback_.readFully(reinterpret_cast<char*>(buf), buflen);
 
   } else {
-    return -1;
+    return static_cast<size_t>(-1);
   }
 }
 
diff --git a/extensions/http-curl/client/HTTPStream.h b/extensions/http-curl/client/HTTPStream.h
index 711d1d2..8d49d02 100644
--- a/extensions/http-curl/client/HTTPStream.h
+++ b/extensions/http-curl/client/HTTPStream.h
@@ -77,7 +77,7 @@ class HttpStream : public io::BaseStream {
    * Skip to the specified offset.
    * @param offset offset to which we will skip
    */
-  void seek(uint64_t offset) override;
+  void seek(size_t offset) override;
 
   size_t size() const override {
     return written;
@@ -91,7 +91,7 @@ class HttpStream : public io::BaseStream {
    * @param buf buffer in which we extract data
    * @param buflen
    */
-  int read(uint8_t *buf, int buflen) override;
+  size_t read(uint8_t *buf, size_t buflen) override;
 
   /**
    * writes value to stream
diff --git a/extensions/http-curl/tests/CivetStream.h b/extensions/http-curl/tests/CivetStream.h
index 9cbba27..e333b54 100644
--- a/extensions/http-curl/tests/CivetStream.h
+++ b/extensions/http-curl/tests/CivetStream.h
@@ -28,6 +28,8 @@
 #include "io/BaseStream.h"
 #include "civetweb.h"
 #include "CivetServer.h"
+#include "utils/gsl.h"
+
 namespace org {
 namespace apache {
 namespace nifi {
@@ -49,8 +51,10 @@ class CivetStream : public io::InputStream {
    * @param buf buffer in which we extract data
    * @param buflen
    */
-  int read(uint8_t *buf, int buflen) override {
-    return mg_read(conn, buf, buflen);
+  size_t read(uint8_t *buf, size_t buflen) override {
+    const auto ret = mg_read(conn, buf, buflen);
+    if (ret < 0) return static_cast<size_t>(-1);
+    return gsl::narrow_cast<size_t>(ret);
   }
 
  protected:
diff --git a/extensions/http-curl/tests/HTTPHandlers.h b/extensions/http-curl/tests/HTTPHandlers.h
index cedb834..5c49205 100644
--- a/extensions/http-curl/tests/HTTPHandlers.h
+++ b/extensions/http-curl/tests/HTTPHandlers.h
@@ -214,7 +214,7 @@ class FlowFileResponder : public ServerAwareHandler {
       minifi::io::CivetStream civet_stream(conn);
       minifi::io::CRCStream < minifi::io::CivetStream > stream(gsl::make_not_null(&civet_stream));
       uint32_t num_attributes;
-      int read;
+      size_t read;
       uint64_t total_size = 0;
       read = stream.read(num_attributes);
       if(!isServerRunning())return false;
@@ -241,9 +241,10 @@ class FlowFileResponder : public ServerAwareHandler {
       flow->data.resize(gsl::narrow<size_t>(length));
       flow->total_size = total_size;
 
-      read = stream.read(flow->data.data(), gsl::narrow<int>(length));
-      if(!isServerRunning())return false;
-      assert(read == gsl::narrow<int>(length));
+      read = stream.read(flow->data.data(), length);
+      if (!isServerRunning()) return false;
+      (void)read;
+      assert(read == length);
 
       if (!invalid_checksum) {
         site2site_rest_resp = std::to_string(stream.getCRC());
diff --git a/extensions/jni/jvm/JniReferenceObjects.h b/extensions/jni/jvm/JniReferenceObjects.h
index 67798e9..f81391b 100644
--- a/extensions/jni/jvm/JniReferenceObjects.h
+++ b/extensions/jni/jvm/JniReferenceObjects.h
@@ -142,7 +142,8 @@ class JniByteInputStream : public minifi::InputStreamCallback {
     int writtenOffset = 0;
     int read = 0;
     do {
-      int actual = stream_->read(buffer_, std::min(remaining, buffer_size_));
+      // JNI takes size as int, there's not much we can do here to support 2GB+ sizes
+      int actual = static_cast<int>(stream_->read(buffer_, std::min(remaining, buffer_size_)));
       if (actual <= 0) {
         if (read == 0) {
           stream_ = nullptr;
diff --git a/extensions/libarchive/CompressContent.h b/extensions/libarchive/CompressContent.h
index fc7851e..df7fc75 100644
--- a/extensions/libarchive/CompressContent.h
+++ b/extensions/libarchive/CompressContent.h
@@ -21,6 +21,7 @@
 #define __COMPRESS_CONTENT_H__
 
 #include <cinttypes>
+#include <utility>
 
 #include "archive_entry.h"
 #include "archive.h"
@@ -56,7 +57,7 @@ public:
     , encapsulateInTar_(false) {
   }
   // Destructor
-  virtual ~CompressContent() = default;
+  ~CompressContent() override = default;
   // Processor Name
   static constexpr char const* ProcessorName = "CompressContent";
   // Supported Properties
@@ -94,8 +95,8 @@ public:
     ReadCallbackCompress(std::shared_ptr<core::FlowFile> &flow, struct archive *arch, struct archive_entry *entry) :
         flow_(flow), arch_(arch), entry_(entry), status_(0), logger_(logging::LoggerFactory<CompressContent>::getLogger()) {
     }
-    ~ReadCallbackCompress() = default;
-    int64_t process(const std::shared_ptr<io::BaseStream>& stream) {
+    ~ReadCallbackCompress() override = default;
+    int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
       uint8_t buffer[4096U];
       int64_t ret = 0;
       uint64_t read_size = 0;
@@ -107,13 +108,13 @@ public:
         return -1;
       }
       while (read_size < flow_->getSize()) {
-        ret = stream->read(buffer, sizeof(buffer));
-        if (ret < 0) {
+        const auto readret = stream->read(buffer, sizeof(buffer));
+        if (readret == static_cast<size_t>(-1)) {
           status_ = -1;
           return -1;
         }
-        if (ret > 0) {
-          ret = archive_write_data(arch_, buffer, gsl::narrow<size_t>(ret));
+        if (readret > 0) {
+          ret = archive_write_data(arch_, buffer, readret);
           if (ret < 0) {
             logger_->log_error("Compress Content archive error %s", archive_error_string(arch_));
             status_ = -1;
@@ -124,7 +125,7 @@ public:
           break;
         }
       }
-      return read_size;
+      return gsl::narrow<int64_t>(read_size);
     }
     std::shared_ptr<core::FlowFile> flow_;
     struct archive *arch_;
@@ -135,25 +136,23 @@ public:
   // Nest Callback Class for read stream from flow for decompress
   class ReadCallbackDecompress: public InputStreamCallback {
   public:
-    ReadCallbackDecompress(const std::shared_ptr<core::FlowFile> &flow) :
-        read_size_(0), offset_(0), flow_(flow) {
-      origin_offset_ = flow_->getOffset();
+    explicit ReadCallbackDecompress(std::shared_ptr<core::FlowFile> flow) :
+        flow_(std::move(flow)) {
     }
-    ~ReadCallbackDecompress() = default;
-    int64_t process(const std::shared_ptr<io::BaseStream>& stream) {
+    ~ReadCallbackDecompress() override = default;
+    int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
       read_size_ = 0;
       stream->seek(offset_);
-      int readRet = stream->read(buffer_, sizeof(buffer_));
+      const auto readRet = stream->read(buffer_, sizeof(buffer_));
       read_size_ = readRet;
-      if (readRet > 0) {
-        offset_ += read_size_;
+      if (readRet != static_cast<size_t>(-1)) {
+        offset_ += readRet;
       }
-      return readRet;
+      return gsl::narrow<int64_t>(readRet);
     }
-    int64_t read_size_;
-    uint8_t buffer_[8192];
-    uint64_t offset_;
-    uint64_t origin_offset_;
+    size_t read_size_ = 0;
+    uint8_t buffer_[8192] = {0};
+    size_t offset_ = 0;
     std::shared_ptr<core::FlowFile> flow_;
   };
   // Nest Callback Class for write stream
@@ -383,8 +382,8 @@ public:
           std::vector<uint8_t> buffer(16 * 1024U);
           int64_t read_size = 0;
           while (read_size < gsl::narrow<int64_t>(writer_.flow_->getSize())) {
-            int ret = inputStream->read(buffer.data(), gsl::narrow<int>(buffer.size()));
-            if (ret < 0) {
+            const auto ret = inputStream->read(buffer.data(), buffer.size());
+            if (ret == static_cast<size_t>(-1)) {
               return -1;
             } else if (ret == 0) {
               break;
@@ -414,7 +413,7 @@ public:
 
       success_ = filterStream->isFinished();
 
-      return flow_->getSize();
+      return gsl::narrow<int64_t>(flow_->getSize());
     }
   };
 
@@ -440,7 +439,7 @@ private:
   void processFlowFile(const std::shared_ptr<core::FlowFile>& flowFile, const std::shared_ptr<core::ProcessSession>& session);
 
   std::shared_ptr<logging::Logger> logger_;
-  int compressLevel_;
+  int compressLevel_{};
   CompressionMode compressMode_;
   ExtendedCompressionFormat compressFormat_;
   bool updateFileName_;
diff --git a/extensions/libarchive/FocusArchiveEntry.cpp b/extensions/libarchive/FocusArchiveEntry.cpp
index 59a43e0..9452dce 100644
--- a/extensions/libarchive/FocusArchiveEntry.cpp
+++ b/extensions/libarchive/FocusArchiveEntry.cpp
@@ -34,6 +34,7 @@
 #include "core/ProcessContext.h"
 #include "core/ProcessSession.h"
 #include "Exception.h"
+#include "utils/gsl.h"
 
 namespace org {
 namespace apache {
@@ -150,20 +151,20 @@ typedef struct {
 la_ssize_t FocusArchiveEntry::ReadCallback::read_cb(struct archive * a, void *d, const void **buf) {
   auto data = static_cast<FocusArchiveEntryReadData *>(d);
   *buf = data->buf;
-  int read = 0;
-  int last_read = 0;
+  size_t read = 0;
+  size_t last_read = 0;
 
   do {
     last_read = data->stream->read(reinterpret_cast<uint8_t *>(data->buf), 8196 - read);
     read += last_read;
-  } while (data->processor->isRunning() && last_read > 0 && read < 8196);
+  } while (data->processor->isRunning() && last_read > 0 && last_read != static_cast<size_t>(-1) && read < 8196);
 
   if (!data->processor->isRunning()) {
     archive_set_error(a, EINTR, "Processor shut down during read");
     return -1;
   }
 
-  return read;
+  return gsl::narrow<la_ssize_t>(read);
 }
 
 int64_t FocusArchiveEntry::ReadCallback::process(const std::shared_ptr<io::BaseStream>& stream) {
diff --git a/extensions/librdkafka/PublishKafka.cpp b/extensions/librdkafka/PublishKafka.cpp
index b83d028..bbb66a8 100644
--- a/extensions/librdkafka/PublishKafka.cpp
+++ b/extensions/librdkafka/PublishKafka.cpp
@@ -409,14 +409,13 @@ class ReadCallback : public InputStreamCallback {
     }
 
     for (size_t segment_num = 0; read_size_ < flow_size_; ++segment_num) {
-      const int readRet = stream->read(buffer.data(), gsl::narrow<int>(buffer.size()));
-      if (readRet < 0) {
+      const auto readRet = stream->read(buffer.data(), buffer.size());
+      if (readRet == static_cast<size_t>(-1)) {
         status_ = -1;
         error_ = "Failed to read from stream";
         return read_size_;
       }
-
-      if (readRet <= 0) { break; }
+      if (readRet == 0) { break; }
 
       const auto err = produce(segment_num, buffer, readRet);
       if (err) {
diff --git a/extensions/mqtt/processors/ConvertJSONAck.h b/extensions/mqtt/processors/ConvertJSONAck.h
index 47f53d0..659af59 100644
--- a/extensions/mqtt/processors/ConvertJSONAck.h
+++ b/extensions/mqtt/processors/ConvertJSONAck.h
@@ -31,6 +31,8 @@
 #include "MQTTClient.h"
 #include "c2/protocols/RESTProtocol.h"
 #include "ConvertBase.h"
+#include "utils/gsl.h"
+
 namespace org {
 namespace apache {
 namespace nifi {
@@ -52,7 +54,7 @@ class ConvertJSONAck : public ConvertBase {
         logger_(logging::LoggerFactory<ConvertJSONAck>::getLogger()) {
   }
   // Destructor
-  virtual ~ConvertJSONAck() = default;
+  ~ConvertJSONAck() override = default;
   // Processor Name
   static constexpr char const* ProcessorName = "ConvertJSONAck";
 
@@ -72,14 +74,13 @@ class ConvertJSONAck : public ConvertBase {
   class ReadCallback : public InputStreamCallback {
    public:
     ReadCallback() = default;
-    ~ReadCallback() = default;
-    int64_t process(const std::shared_ptr<io::BaseStream>& stream) {
-      int64_t ret = 0;
+    ~ReadCallback() override = default;
+    int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
       if (nullptr == stream)
         return 0;
       buffer_.resize(stream->size());
-      ret = stream->read(reinterpret_cast<uint8_t*>(buffer_.data()), stream->size());
-      return ret;
+      const auto ret = stream->read(reinterpret_cast<uint8_t*>(buffer_.data()), stream->size());
+      return gsl::narrow<int64_t>(ret);
     }
     std::vector<char> buffer_;
   };
diff --git a/extensions/mqtt/processors/ConvertUpdate.cpp b/extensions/mqtt/processors/ConvertUpdate.cpp
index ca5a8d5..69da1fc 100644
--- a/extensions/mqtt/processors/ConvertUpdate.cpp
+++ b/extensions/mqtt/processors/ConvertUpdate.cpp
@@ -37,7 +37,7 @@ void ConvertUpdate::onTrigger(const std::shared_ptr<core::ProcessContext> &conte
   bool received_update = false;
   while (mqtt_service_->get(100, listening_topic, update)) {
     // first we have the input topic string followed by the update URI
-    if (update.size() > 0) {
+    if (!update.empty()) {
 
       io::BufferStream stream(update.data(), update.size());
 
diff --git a/extensions/mqtt/processors/PublishMQTT.h b/extensions/mqtt/processors/PublishMQTT.h
index fdad09f..ba14c0f 100644
--- a/extensions/mqtt/processors/PublishMQTT.h
+++ b/extensions/mqtt/processors/PublishMQTT.h
@@ -20,6 +20,8 @@
 #ifndef __PUBLISH_MQTT_H__
 #define __PUBLISH_MQTT_H__
 
+#include <limits>
+
 #include "FlowFileRecord.h"
 #include "core/Processor.h"
 #include "core/ProcessSession.h"
@@ -50,7 +52,7 @@ class PublishMQTT : public processors::AbstractMQTTProcessor {
     max_seg_size_ = ULLONG_MAX;
   }
   // Destructor
-  virtual ~PublishMQTT() = default;
+  ~PublishMQTT() override = default;
   // Processor Name
   static constexpr char const* ProcessorName = "PublishMQTT";
   // Supported Properties
@@ -74,18 +76,20 @@ class PublishMQTT : public processors::AbstractMQTTProcessor {
       status_ = 0;
       read_size_ = 0;
     }
-    ~ReadCallback() = default;
-    int64_t process(const std::shared_ptr<io::BaseStream>& stream) {
+    ~ReadCallback() override = default;
+    int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
       if (flow_size_ < max_seg_size_)
         max_seg_size_ = flow_size_;
+      gsl_Expects(max_seg_size_ < gsl::narrow<uint64_t>(std::numeric_limits<int>::max()));
       std::vector<unsigned char> buffer(max_seg_size_);
       read_size_ = 0;
       status_ = 0;
       while (read_size_ < flow_size_) {
-        int readRet = stream->read(&buffer[0], max_seg_size_);
+        // MQTTClient_message::payloadlen is int, so we can't handle 2GB+
+        int readRet = static_cast<int>(stream->read(&buffer[0], max_seg_size_));
         if (readRet < 0) {
           status_ = -1;
-          return read_size_;
+          return gsl::narrow<int64_t>(read_size_);
         }
         if (readRet > 0) {
           MQTTClient_message pubmsg = MQTTClient_message_initializer;
@@ -97,12 +101,12 @@ class PublishMQTT : public processors::AbstractMQTTProcessor {
             status_ = -1;
             return -1;
           }
-          read_size_ += readRet;
+          read_size_ += gsl::narrow<size_t>(readRet);
         } else {
           break;
         }
       }
-      return read_size_;
+      return gsl::narrow<int64_t>(read_size_);
     }
     uint64_t flow_size_;
     uint64_t max_seg_size_;
diff --git a/extensions/opc/src/putopc.cpp b/extensions/opc/src/putopc.cpp
index 1f3fe63..e613d1b 100644
--- a/extensions/opc/src/putopc.cpp
+++ b/extensions/opc/src/putopc.cpp
@@ -428,21 +428,15 @@ namespace processors {
     uint64_t size = 0;
 
     do {
-      int read = stream->read(buf_.data() + size, 1024);
-
-      if (read < 0) {
-        return -1;
-      }
-
-      if (read == 0) {
-        break;
-      }
+      const auto read = stream->read(buf_.data() + size, 1024);
+      if (read == static_cast<size_t>(-1)) return -1;
+      if (read == 0) break;
       size += read;
     } while (size < stream->size());
 
     logger_->log_trace("Read %llu bytes from flowfile content to buffer", stream->size());
 
-    return size;
+    return gsl::narrow<int64_t>(size);
   }
 
 } /* namespace processors */
diff --git a/extensions/rocksdb-repos/RocksDbStream.cpp b/extensions/rocksdb-repos/RocksDbStream.cpp
index 86daa20..27b342e 100644
--- a/extensions/rocksdb-repos/RocksDbStream.cpp
+++ b/extensions/rocksdb-repos/RocksDbStream.cpp
@@ -47,7 +47,7 @@ RocksDbStream::RocksDbStream(std::string path, gsl::not_null<minifi::internal::R
 void RocksDbStream::close() {
 }
 
-void RocksDbStream::seek(uint64_t /*offset*/) {
+void RocksDbStream::seek(size_t /*offset*/) {
   // noop
 }
 
@@ -84,28 +84,17 @@ int RocksDbStream::write(const uint8_t *value, int size) {
   }
 }
 
-int RocksDbStream::read(uint8_t *buf, int buflen) {
-  gsl_Expects(buflen >= 0);
-  if (!exists_) {
-    return -1;
-  }
-  if (buflen == 0) {
-    return 0;
-  }
-  if (!IsNullOrEmpty(buf)) {
-    size_t amtToRead = gsl::narrow<size_t>(buflen);
-    if (offset_ >= value_.size()) {
-      return 0;
-    }
-    if (amtToRead > value_.size() - offset_) {
-      amtToRead = value_.size() - offset_;
-    }
-    std::memcpy(buf, value_.data() + offset_, amtToRead);
-    offset_ += amtToRead;
-    return gsl::narrow<int>(amtToRead);
-  } else {
-    return -1;
-  }
+size_t RocksDbStream::read(uint8_t *buf, size_t buflen) {
+  // The check have to be in this order for RocksDBStreamTest "Read zero bytes" to succeed
+  if (!exists_) return static_cast<size_t>(-1);
+  if (buflen == 0) return 0;
+  if (IsNullOrEmpty(buf)) return static_cast<size_t>(-1);
+  if (offset_ >= value_.size()) return 0;
+
+  const auto amtToRead = std::min(buflen, value_.size() - offset_);
+  std::memcpy(buf, value_.data() + offset_, amtToRead);
+  offset_ += amtToRead;
+  return amtToRead;
 }
 
 } /* namespace io */
diff --git a/extensions/rocksdb-repos/RocksDbStream.h b/extensions/rocksdb-repos/RocksDbStream.h
index 8e47b51..d051802 100644
--- a/extensions/rocksdb-repos/RocksDbStream.h
+++ b/extensions/rocksdb-repos/RocksDbStream.h
@@ -56,7 +56,7 @@ class RocksDbStream : public io::BaseStream {
    * Skip to the specified offset.
    * @param offset offset to which we will skip
    */
-  void seek(uint64_t offset) override;
+  void seek(size_t offset) override;
 
   size_t size() const override {
     return size_;
@@ -70,7 +70,7 @@ class RocksDbStream : public io::BaseStream {
    * @param buf buffer in which we extract data
    * @param buflen
    */
-  int read(uint8_t *buf, int buflen) override;
+  size_t read(uint8_t *buf, size_t buflen) override;
 
   /**
    * writes value to stream
diff --git a/extensions/script/python/PyBaseStream.cpp b/extensions/script/python/PyBaseStream.cpp
index fcde480..1733994 100644
--- a/extensions/script/python/PyBaseStream.cpp
+++ b/extensions/script/python/PyBaseStream.cpp
@@ -49,13 +49,11 @@ py::bytes PyBaseStream::read(size_t len) {
 
   std::vector<uint8_t> buffer(len);
 
-  auto read = stream_->read(buffer.data(), static_cast<int>(len));
-  auto result = py::bytes(reinterpret_cast<char *>(buffer.data()), static_cast<size_t>(read));
-
-  return result;
+  const auto read = stream_->read(buffer.data(), len);
+  return py::bytes(reinterpret_cast<char *>(buffer.data()), static_cast<size_t>(read));
 }
 
-size_t PyBaseStream::write(py::bytes buf) {
+size_t PyBaseStream::write(const py::bytes& buf) {
   const auto &&buf_str = buf.operator std::string();
   return static_cast<size_t>(stream_->write(reinterpret_cast<uint8_t *>(const_cast<char *>(buf_str.data())),
                                                 static_cast<int>(buf_str.length())));
diff --git a/extensions/script/python/PyBaseStream.h b/extensions/script/python/PyBaseStream.h
index f07fe80..1f11065 100644
--- a/extensions/script/python/PyBaseStream.h
+++ b/extensions/script/python/PyBaseStream.h
@@ -37,7 +37,7 @@ class PyBaseStream {
 
   py::bytes read();
   py::bytes read(size_t len = 0);
-  size_t write(py::bytes buf);
+  size_t write(const py::bytes& buf);
 
  private:
   std::shared_ptr<io::BaseStream> stream_;
diff --git a/extensions/sftp/client/SFTPClient.cpp b/extensions/sftp/client/SFTPClient.cpp
index e657d2e..8d528ab 100644
--- a/extensions/sftp/client/SFTPClient.cpp
+++ b/extensions/sftp/client/SFTPClient.cpp
@@ -562,12 +562,12 @@ bool SFTPClient::putFile(const std::string& path, io::BaseStream& input, bool ov
     return true;
   }
 
-  const size_t buf_size = expected_size < 0 ? MAX_BUFFER_SIZE : std::min<size_t>(expected_size, MAX_BUFFER_SIZE);
+  const size_t buf_size = expected_size < 0 ? MAX_BUFFER_SIZE : std::min(gsl::narrow<size_t>(expected_size), MAX_BUFFER_SIZE);
   std::vector<uint8_t> buf(buf_size);
   uint64_t total_read = 0U;
   do {
-    int read_ret = input.read(buf.data(), buf.size());
-    if (read_ret < 0) {
+    const auto read_ret = input.read(buf.data(), buf.size());
+    if (read_ret == static_cast<size_t>(-1)) {
       last_error_.setLibssh2Error(LIBSSH2_FX_OK);
       logger_->log_error("Error while reading input");
       return false;
@@ -577,20 +577,20 @@ bool SFTPClient::putFile(const std::string& path, io::BaseStream& input, bool ov
     }
     logger_->log_trace("Read %d bytes", read_ret);
     total_read += read_ret;
-    ssize_t remaining = read_ret;
+    auto remaining = gsl::narrow<size_t>(read_ret);
     while (remaining > 0) {
-      int write_ret = libssh2_sftp_write(file_handle, reinterpret_cast<char*>(buf.data() + (read_ret - remaining)), remaining);
+      const auto write_ret = libssh2_sftp_write(file_handle, reinterpret_cast<char*>(buf.data() + (read_ret - remaining)), remaining);
       if (write_ret < 0) {
         last_error_.setSftpError(SFTPError::IoError);
         logger_->log_error("Failed to write remote file \"%s\"", path.c_str());
         return false;
       }
       logger_->log_trace("Wrote %d bytes to remote file \"%s\"", write_ret, path.c_str());
-      remaining -= write_ret;
+      remaining -= gsl::narrow<size_t>(write_ret);
     }
   } while (true);
 
-  if (expected_size >= 0 && total_read != gsl::narrow<uint64_t>(expected_size)) {
+  if (expected_size >= 0 && total_read != gsl::narrow<size_t>(expected_size)) {
     last_error_.setLibssh2Error(LIBSSH2_FX_OK);
     logger_->log_error("Input has unexpected size, expected: %ld, actual: %lu", path.c_str(), expected_size, total_read);
     return false;
diff --git a/extensions/standard-processors/processors/ExtractText.cpp b/extensions/standard-processors/processors/ExtractText.cpp
index 345b84a..04e5e91 100644
--- a/extensions/standard-processors/processors/ExtractText.cpp
+++ b/extensions/standard-processors/processors/ExtractText.cpp
@@ -113,10 +113,9 @@ void ExtractText::onTrigger(core::ProcessContext *context, core::ProcessSession
 }
 
 int64_t ExtractText::ReadCallback::process(const std::shared_ptr<io::BaseStream>& stream) {
-  int64_t ret = 0;
-  uint64_t read_size = 0;
+  size_t read_size = 0;
   bool regex_mode;
-  uint64_t size_limit = flowFile_->getSize();
+  size_t size_limit = flowFile_->getSize();
 
   std::string attrKey, sizeLimitStr;
   ctx_->getProperty(Attribute.getName(), attrKey);
@@ -126,22 +125,22 @@ int64_t ExtractText::ReadCallback::process(const std::shared_ptr<io::BaseStream>
   if (sizeLimitStr.empty())
     size_limit = DEFAULT_SIZE_LIMIT;
   else if (sizeLimitStr != "0")
-    size_limit = std::stoi(sizeLimitStr);
+    size_limit = gsl::narrow_cast<size_t>(std::stoi(sizeLimitStr));
 
   std::ostringstream contentStream;
 
   while (read_size < size_limit) {
     // Don't read more than config limit or the size of the buffer
-    int length = gsl::narrow<int>(std::min<uint64_t>(size_limit - read_size, buffer_.size()));
-    ret = stream->read(buffer_, length);
+    const auto length = std::min(size_limit - read_size, buffer_.size());
+    const auto ret = stream->read(buffer_, length);
 
-    if (ret < 0) {
+    if (ret == static_cast<size_t>(-1)) {
       return -1;  // Stream error
     } else if (ret == 0) {
       break;  // End of stream, no more data
     }
 
-    contentStream.write(reinterpret_cast<const char*>(buffer_.data()), ret);
+    contentStream.write(reinterpret_cast<const char*>(buffer_.data()), gsl::narrow<std::streamsize>(ret));
     read_size += ret;
     if (contentStream.fail()) {
       return -1;
@@ -162,9 +161,8 @@ int64_t ExtractText::ReadCallback::process(const std::shared_ptr<io::BaseStream>
     bool repeatingcapture;
     ctx_->getProperty(EnableRepeatingCaptureGroup.getName(), repeatingcapture);
 
-    int maxCaptureSizeProperty;
-    ctx_->getProperty(MaxCaptureGroupLen.getName(), maxCaptureSizeProperty);
-    size_t maxCaptureSize = gsl::narrow<size_t>(maxCaptureSizeProperty);
+    size_t maxCaptureSize;
+    ctx_->getProperty(MaxCaptureGroupLen.getName(), maxCaptureSize);
 
     std::string contentStr = contentStream.str();
 
@@ -212,7 +210,7 @@ int64_t ExtractText::ReadCallback::process(const std::shared_ptr<io::BaseStream>
   } else {
     flowFile_->setAttribute(attrKey, contentStream.str());
   }
-  return read_size;
+  return gsl::narrow<int64_t>(read_size);
 }
 
 ExtractText::ReadCallback::ReadCallback(std::shared_ptr<core::FlowFile> flowFile, core::ProcessContext *ctx,  std::shared_ptr<logging::Logger> lgr)
diff --git a/extensions/standard-processors/processors/LogAttribute.h b/extensions/standard-processors/processors/LogAttribute.h
index 6d4b2ef..7a7d920 100644
--- a/extensions/standard-processors/processors/LogAttribute.h
+++ b/extensions/standard-processors/processors/LogAttribute.h
@@ -47,14 +47,14 @@ class LogAttribute : public core::Processor {
    * Create a new processor
    */
   explicit LogAttribute(const std::string& name, const utils::Identifier& uuid = {})
-      : Processor(std::move(name), uuid),
+      : Processor(name, uuid),
         flowfiles_to_log_(1),
         hexencode_(false),
         max_line_length_(80U),
         logger_(logging::LoggerFactory<LogAttribute>::getLogger()) {
   }
   // Destructor
-  virtual ~LogAttribute() = default;
+  ~LogAttribute() override = default;
   // Processor Name
   static constexpr char const* ProcessorName = "LogAttribute";
   // Supported Properties
@@ -103,16 +103,14 @@ class LogAttribute : public core::Processor {
         : logger_(std::move(logger))
         , buffer_(size)  {
     }
-    int64_t process(const std::shared_ptr<io::BaseStream>& stream) {
-      if (buffer_.size() == 0U) {
-        return 0U;
-      }
-      int ret = stream->read(buffer_.data(), gsl::narrow<int>(buffer_.size()));
-      if (ret < 0 || static_cast<uint64_t>(ret) != buffer_.size()) {
-        logger_->log_error("%zu bytes were requested from the stream but %d bytes were read. Rolling back.", buffer_.size(), ret);
+    int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
+      if (buffer_.empty()) return 0U;
+      const auto ret = stream->read(buffer_.data(), buffer_.size());
+      if (ret != buffer_.size()) {
+        logger_->log_error("%zu bytes were requested from the stream but %zu bytes were read. Rolling back.", buffer_.size(), size_t{ret});
         throw Exception(PROCESSOR_EXCEPTION, "Failed to read the entire FlowFile.");
       }
-      return buffer_.size();
+      return gsl::narrow<int64_t>(buffer_.size());
     }
     std::shared_ptr<logging::Logger> logger_;
     std::vector<uint8_t> buffer_;
@@ -123,7 +121,7 @@ class LogAttribute : public core::Processor {
   // OnTrigger method, implemented by NiFi LogAttribute
   void onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) override;
   // Initialize, over write by NiFi LogAttribute
-  void initialize(void) override;
+  void initialize() override;
 
  private:
   uint64_t flowfiles_to_log_;
diff --git a/extensions/standard-processors/processors/PutFile.cpp b/extensions/standard-processors/processors/PutFile.cpp
index 3765856..2060867 100644
--- a/extensions/standard-processors/processors/PutFile.cpp
+++ b/extensions/standard-processors/processors/PutFile.cpp
@@ -25,10 +25,12 @@
 #include <memory>
 #include <string>
 #include <set>
+#include <utility>
 #ifdef WIN32
 #include <Windows.h>
 #endif
 #include "utils/file/FileUtils.h"
+#include "utils/gsl.h"
 
 namespace org {
 namespace apache {
@@ -304,9 +306,9 @@ void PutFile::getDirectoryPermissions(core::ProcessContext *context) {
 }
 #endif
 
-PutFile::ReadCallback::ReadCallback(const std::string &tmp_file, const std::string &dest_file)
-    : tmp_file_(tmp_file),
-      dest_file_(dest_file) {
+PutFile::ReadCallback::ReadCallback(std::string tmp_file, std::string dest_file)
+    : tmp_file_(std::move(tmp_file)),
+      dest_file_(std::move(dest_file)) {
 }
 
 // Copy the entire file contents to the temporary file
@@ -319,17 +321,10 @@ int64_t PutFile::ReadCallback::process(const std::shared_ptr<io::BaseStream>& st
   std::ofstream tmp_file_os(tmp_file_, std::ios::out | std::ios::binary);
 
   do {
-    int read = stream->read(buffer, 1024);
-
-    if (read < 0) {
-      return -1;
-    }
-
-    if (read == 0) {
-      break;
-    }
-
-    tmp_file_os.write(reinterpret_cast<char *>(buffer), read);
+    const auto read = stream->read(buffer, 1024);
+    if (read == static_cast<size_t>(-1)) return -1;
+    if (read == 0) break;
+    tmp_file_os.write(reinterpret_cast<char *>(buffer), gsl::narrow<std::streamsize>(read));
     size += read;
   } while (size < stream->size());
 
@@ -339,7 +334,7 @@ int64_t PutFile::ReadCallback::process(const std::shared_ptr<io::BaseStream>& st
     write_succeeded_ = true;
   }
 
-  return size;
+  return gsl::narrow<int64_t>(size);
 }
 
 // Renames tmp file to final destination
diff --git a/extensions/standard-processors/processors/PutFile.h b/extensions/standard-processors/processors/PutFile.h
index c09bffb..487213b 100644
--- a/extensions/standard-processors/processors/PutFile.h
+++ b/extensions/standard-processors/processors/PutFile.h
@@ -82,7 +82,7 @@ class PutFile : public core::Processor {
 
   class ReadCallback : public InputStreamCallback {
    public:
-    ReadCallback(const std::string &tmp_file, const std::string &dest_file);
+    ReadCallback(std::string tmp_file, std::string dest_file);
     ~ReadCallback() override;
     int64_t process(const std::shared_ptr<io::BaseStream>& stream) override;
     bool commit();
diff --git a/libminifi/include/io/AtomicEntryStream.h b/libminifi/include/io/AtomicEntryStream.h
index 75b2d4c..e33b3c5 100644
--- a/libminifi/include/io/AtomicEntryStream.h
+++ b/libminifi/include/io/AtomicEntryStream.h
@@ -64,7 +64,7 @@ class AtomicEntryStream : public BaseStream {
    * Skip to the specified offset.
    * @param offset offset to which we will skip
    */
-  void seek(uint64_t offset) override;
+  void seek(size_t offset) override;
 
   size_t size() const override {
     return length_;
@@ -75,7 +75,7 @@ class AtomicEntryStream : public BaseStream {
    * @param buf buffer in which we extract data
    * @param buflen
    */
-  int read(uint8_t *buf, int buflen) override;
+  size_t read(uint8_t *buf, size_t buflen) override;
 
   /**
    * writes value to stream
@@ -103,7 +103,7 @@ AtomicEntryStream<T>::~AtomicEntryStream() {
 }
 
 template<typename T>
-void AtomicEntryStream<T>::seek(uint64_t offset) {
+void AtomicEntryStream<T>::seek(size_t offset) {
   std::lock_guard<std::recursive_mutex> lock(entry_lock_);
   offset_ = gsl::narrow<size_t>(offset);
 }
@@ -129,14 +129,13 @@ int AtomicEntryStream<T>::write(const uint8_t *value, int size) {
 }
 
 template<typename T>
-int AtomicEntryStream<T>::read(uint8_t *buf, int buflen) {
-  gsl_Expects(buflen >= 0);
+size_t AtomicEntryStream<T>::read(uint8_t *buf, size_t buflen) {
   if (buflen == 0) {
     return 0;
   }
   if (nullptr != buf && !invalid_stream_) {
     std::lock_guard<std::recursive_mutex> lock(entry_lock_);
-    int len = buflen;
+    auto len = buflen;
     core::repository::RepoValue<T> *value;
     if (entry_->getValue(key_, &value)) {
       if (offset_ + len > value->getBufferSize()) {
@@ -152,7 +151,7 @@ int AtomicEntryStream<T>::read(uint8_t *buf, int buflen) {
       return len;
     }
   }
-  return -1;
+  return static_cast<size_t>(-1);
 }
 
 }  // namespace io
diff --git a/libminifi/include/io/BufferStream.h b/libminifi/include/io/BufferStream.h
index 3f0bfdc..7ab2b9e 100644
--- a/libminifi/include/io/BufferStream.h
+++ b/libminifi/include/io/BufferStream.h
@@ -35,7 +35,7 @@ class BufferStream : public BaseStream {
  public:
   BufferStream() = default;
 
-  BufferStream(const uint8_t *buf, const unsigned int len) {
+  BufferStream(const uint8_t *buf, const size_t len) {
     write(buf, len);
   }
 
@@ -48,7 +48,7 @@ class BufferStream : public BaseStream {
 
   int write(const uint8_t* data, int len) final;
 
-  int read(uint8_t* buffer, int len) override;
+  size_t read(uint8_t* buffer, size_t len) override;
 
   int initialize() override {
     buffer_.clear();
@@ -56,7 +56,7 @@ class BufferStream : public BaseStream {
     return 0;
   }
 
-  void seek(uint64_t offset) override {
+  void seek(size_t offset) override {
     readOffset_ += offset;
   }
 
diff --git a/libminifi/include/io/CRCStream.h b/libminifi/include/io/CRCStream.h
index 2204537..abe0762 100644
--- a/libminifi/include/io/CRCStream.h
+++ b/libminifi/include/io/CRCStream.h
@@ -84,9 +84,9 @@ class InputCRCStream : public virtual CRCStreamBase<StreamType>, public InputStr
  public:
   using InputStream::read;
 
-  int read(uint8_t *buf, int buflen) override {
-    int ret = child_stream_->read(buf, buflen);
-    if (ret > 0) {
+  size_t read(uint8_t *buf, size_t buflen) override {
+    const auto ret = child_stream_->read(buf, buflen);
+    if (ret > 0 && ret != static_cast<size_t>(-1)) {
       crc_ = crc32(crc_, buf, ret);
     }
     return ret;
diff --git a/libminifi/include/io/ClientSocket.h b/libminifi/include/io/ClientSocket.h
index 37626c8..398a1d5 100644
--- a/libminifi/include/io/ClientSocket.h
+++ b/libminifi/include/io/ClientSocket.h
@@ -159,7 +159,7 @@ class Socket : public BaseStream {
    * @param buflen
    * @param retrieve_all_bytes determines if we should read all bytes before returning
    */
-  int read(uint8_t *buf, int buflen) override {
+  size_t read(uint8_t *buf, size_t buflen) override {
     return read(buf, buflen, true);
   }
 
@@ -169,7 +169,7 @@ class Socket : public BaseStream {
    * @param buflen
    * @param retrieve_all_bytes determines if we should read all bytes before returning
    */
-  virtual int read(uint8_t *buf, int buflen, bool retrieve_all_bytes);
+  virtual size_t read(uint8_t *buf, size_t buflen, bool retrieve_all_bytes);
 
  protected:
   /**
diff --git a/libminifi/include/io/DescriptorStream.h b/libminifi/include/io/DescriptorStream.h
index a7ea713..ee5aec7 100644
--- a/libminifi/include/io/DescriptorStream.h
+++ b/libminifi/include/io/DescriptorStream.h
@@ -52,14 +52,14 @@ class DescriptorStream : public io::BaseStream {
    * Skip to the specified offset.
    * @param offset offset to which we will skip
    */
-  void seek(uint64_t offset) override;
+  void seek(size_t offset) override;
 
   /**
    * Reads data and places it into buf
    * @param buf buffer in which we extract data
    * @param buflen
    */
-  int read(uint8_t *buf, int buflen) override;
+  size_t read(uint8_t *buf, size_t buflen) override;
 
   /**
    * writes value to stream
diff --git a/libminifi/include/io/FileStream.h b/libminifi/include/io/FileStream.h
index 3156a30..ad7f0dc 100644
--- a/libminifi/include/io/FileStream.h
+++ b/libminifi/include/io/FileStream.h
@@ -63,7 +63,7 @@ class FileStream : public io::BaseStream {
    * Skip to the specified offset.
    * @param offset offset to which we will skip
    */
-  void seek(uint64_t offset) override;
+  void seek(size_t offset) override;
 
   size_t size() const override {
     return length_;
@@ -77,7 +77,7 @@ class FileStream : public io::BaseStream {
    * @param buf buffer in which we extract data
    * @param buflen
    */
-  int read(uint8_t *buf, int buflen) override;
+  size_t read(uint8_t *buf, size_t buflen) override;
 
   /**
    * writes value to stream
diff --git a/libminifi/include/io/InputStream.h b/libminifi/include/io/InputStream.h
index 60a297c..cebf266 100644
--- a/libminifi/include/io/InputStream.h
+++ b/libminifi/include/io/InputStream.h
@@ -39,32 +39,32 @@ class InputStream : public virtual Stream {
    * reads a byte array from the stream
    * @param value reference in which will set the result
    * @param len length to read
-   * @return resulting read size
+   * @return resulting read size or static_cast<size_t>(-1) on error or static_cast<size_t>(-2) for ClientSocket EAGAIN
    **/
-  virtual int read(uint8_t *value, int len) = 0;
+  virtual size_t read(uint8_t *value, size_t len) = 0;
 
-  int read(std::vector<uint8_t>& buffer, int len);
+  size_t read(std::vector<uint8_t>& buffer, size_t len);
 
   /**
    * read string from stream
    * @param str reference string
    * @return resulting read size
    **/
-  int read(std::string &str, bool widen = false);
+  size_t read(std::string &str, bool widen = false);
 
   /**
    * read a bool from stream
    * @param value reference to the output
    * @return resulting read size
    **/
-  int read(bool& value);
+  size_t read(bool& value);
 
   /**
    * read a uuid from stream
    * @param value reference to the output
    * @return resulting read size
    **/
-  int read(utils::Identifier& value);
+  size_t read(utils::Identifier& value);
 
   /**
   * reads sizeof(Integral) bytes from the stream
@@ -72,10 +72,10 @@ class InputStream : public virtual Stream {
   * @return resulting read size
   **/
   template<typename Integral, typename = std::enable_if<std::is_unsigned<Integral>::value && !std::is_same<Integral, bool>::value>>
-  int read(Integral& value) {
+  size_t read(Integral& value) {
     uint8_t buf[sizeof(Integral)]{};
     if (read(buf, sizeof(Integral)) != sizeof(Integral)) {
-      return -1;
+      return static_cast<size_t>(-1);
     }
 
     value = 0;
diff --git a/libminifi/include/io/Stream.h b/libminifi/include/io/Stream.h
index e067b52..cb528e5 100644
--- a/libminifi/include/io/Stream.h
+++ b/libminifi/include/io/Stream.h
@@ -31,7 +31,7 @@ class Stream {
  public:
   virtual void close() {}
 
-  virtual void seek(uint64_t /*offset*/) {
+  virtual void seek(size_t /*offset*/) {
     throw std::runtime_error("Seek is not supported");
   }
 
diff --git a/libminifi/include/io/StreamPipe.h b/libminifi/include/io/StreamPipe.h
index b3fafce..8c88568 100644
--- a/libminifi/include/io/StreamPipe.h
+++ b/libminifi/include/io/StreamPipe.h
@@ -49,14 +49,10 @@ inline int64_t pipe(const std::shared_ptr<io::InputStream>& src, const std::shar
   uint8_t buffer[4096U];
   int64_t totalTransferred = 0;
   while (true) {
-    int readRet = src->read(buffer, sizeof(buffer));
-    if (readRet < 0) {
-      return readRet;
-    }
-    if (readRet == 0) {
-      break;
-    }
-    int remaining = readRet;
+    const auto readRet = src->read(buffer, sizeof(buffer));
+    if (readRet == static_cast<size_t>(-1)) return -1;
+    if (readRet == 0) break;
+    auto remaining = readRet;
     int transferred = 0;
     while (remaining > 0) {
       int writeRet = dst->write(buffer + transferred, remaining);
diff --git a/libminifi/include/io/tls/SecureDescriptorStream.h b/libminifi/include/io/tls/SecureDescriptorStream.h
index 7699910..5f285c1 100644
--- a/libminifi/include/io/tls/SecureDescriptorStream.h
+++ b/libminifi/include/io/tls/SecureDescriptorStream.h
@@ -63,7 +63,7 @@ class SecureDescriptorStream : public io::BaseStream {
    * Skip to the specified offset.
    * @param offset offset to which we will skip
    */
-  void seek(uint64_t offset) override;
+  void seek(size_t offset) override;
 
   size_t size() const override {
     return -1;
@@ -74,7 +74,7 @@ class SecureDescriptorStream : public io::BaseStream {
    * @param buf buffer in which we extract data
    * @param buflen
    */
-  int read(uint8_t *buf, int buflen) override;
+  size_t read(uint8_t *buf, size_t buflen) override;
 
   /**
    * writes value to stream
diff --git a/libminifi/include/io/tls/TLSSocket.h b/libminifi/include/io/tls/TLSSocket.h
index 6bf741d..24df1d9 100644
--- a/libminifi/include/io/tls/TLSSocket.h
+++ b/libminifi/include/io/tls/TLSSocket.h
@@ -136,14 +136,14 @@ class TLSSocket : public Socket {
   using Socket::read;
   using Socket::write;
 
-  int read(uint8_t *buf, int buflen, bool retrieve_all_bytes) override;
+  size_t read(uint8_t *buf, size_t buflen, bool retrieve_all_bytes) override;
 
   /**
    * Reads data and places it into buf
    * @param buf buffer in which we extract data
    * @param buflen
    */
-  int read(uint8_t *buf, int buflen) override;
+  size_t read(uint8_t *buf, size_t buflen) override;
 
   /**
    * Write value to the stream using uint8_t ptr
diff --git a/libminifi/include/provenance/Provenance.h b/libminifi/include/provenance/Provenance.h
index 3b7b4f7..60ee8c2 100644
--- a/libminifi/include/provenance/Provenance.h
+++ b/libminifi/include/provenance/Provenance.h
@@ -363,25 +363,23 @@ class ProvenanceEventRecord : public core::SerializableComponent {
 
   uint64_t getEventTime(const uint8_t *buffer, const size_t bufferSize) {
     const auto size = std::min<size_t>(72, bufferSize);
-    org::apache::nifi::minifi::io::BufferStream outStream(buffer, gsl::narrow<int>(size));
+    org::apache::nifi::minifi::io::BufferStream outStream(buffer, size);
 
     std::string uuid;
-    int ret = outStream.read(uuid);
-
-    if (ret <= 0) {
+    const auto uuidret = outStream.read(uuid);
+    if (uuidret <= 0) {
       return 0;
     }
 
     uint32_t eventType;
-    ret = outStream.read(eventType);
-    if (ret != 4) {
+    const auto typeret = outStream.read(eventType);
+    if (typeret != 4) {
       return 0;
     }
 
     uint64_t event_time;
-
-    ret = outStream.read(event_time);
-    if (ret != 8) {
+    const auto timeret = outStream.read(event_time);
+    if (timeret != 8) {
       return 0;
     }
 
diff --git a/libminifi/include/sitetosite/Peer.h b/libminifi/include/sitetosite/Peer.h
index a82e3b4..d2138ea 100644
--- a/libminifi/include/sitetosite/Peer.h
+++ b/libminifi/include/sitetosite/Peer.h
@@ -299,7 +299,7 @@ class SiteToSitePeer : public org::apache::nifi::minifi::io::BaseStream {
     return stream_->write(data, len);
   }
 
-  int read(uint8_t* data, int len) override {
+  size_t read(uint8_t* data, size_t len) override {
     return stream_->read(data, len);
   }
 
diff --git a/libminifi/include/sitetosite/SiteToSiteClient.h b/libminifi/include/sitetosite/SiteToSiteClient.h
index fc1dc91..7a41e81 100644
--- a/libminifi/include/sitetosite/SiteToSiteClient.h
+++ b/libminifi/include/sitetosite/SiteToSiteClient.h
@@ -19,6 +19,7 @@
 #ifndef LIBMINIFI_INCLUDE_SITETOSITE_SITETOSITECLIENT_H_
 #define LIBMINIFI_INCLUDE_SITETOSITE_SITETOSITECLIENT_H_
 
+#include <algorithm>
 #include <map>
 #include <memory>
 #include <string>
@@ -30,6 +31,7 @@
 #include "core/ProcessSession.h"
 #include "core/ProcessContext.h"
 #include "core/Connectable.h"
+#include "utils/gsl.h"
 
 namespace org {
 namespace apache {
@@ -65,11 +67,6 @@ class SiteToSiteClient : public core::Connectable {
         _batchSendNanos(5000000000),
         ssl_context_service_(nullptr),
         logger_(logging::LoggerFactory<SiteToSiteClient>::getLogger()) {
-    _supportedVersion[0] = 5;
-    _supportedVersion[1] = 4;
-    _supportedVersion[2] = 3;
-    _supportedVersion[3] = 2;
-    _supportedVersion[4] = 1;
     _currentVersion = _supportedVersion[0];
     _currentVersionIndex = 0;
     _supportedCodecVersion[0] = 1;
@@ -77,7 +74,7 @@ class SiteToSiteClient : public core::Connectable {
     _currentCodecVersionIndex = 0;
   }
 
-  virtual ~SiteToSiteClient() = default;
+  ~SiteToSiteClient() override = default;
 
   void setSSLContextService(const std::shared_ptr<minifi::controllers::SSLContextService> &context_service) {
     ssl_context_service_ = context_service;
@@ -189,13 +186,13 @@ class SiteToSiteClient : public core::Connectable {
     return logger_;
   }
 
-  virtual void yield() {
+  void yield() override {
   }
 
   /**
    * Determines if we are connected and operating
    */
-  virtual bool isRunning() {
+  bool isRunning() override {
     return running_;
   }
 
@@ -203,7 +200,7 @@ class SiteToSiteClient : public core::Connectable {
    * Determines if work is available by this connectable
    * @return boolean if work is available.
    */
-  virtual bool isWorkAvailable() {
+  bool isWorkAvailable() override {
     return true;
   }
 
@@ -234,12 +231,12 @@ class SiteToSiteClient : public core::Connectable {
   virtual int writeResponse(const std::shared_ptr<Transaction> &transaction, RespondCode code, std::string message);
   // getRespondCodeContext
   virtual RespondCodeContext *getRespondCodeContext(RespondCode code) {
-    for (unsigned int i = 0; i < sizeof(SiteToSiteRequest::respondCodeContext) / sizeof(RespondCodeContext); i++) {
-      if (SiteToSiteRequest::respondCodeContext[i].code == code) {
-        return &SiteToSiteRequest::respondCodeContext[i];
+    for (auto & i : SiteToSiteRequest::respondCodeContext) {
+      if (i.code == code) {
+        return &i;
       }
     }
-    return NULL;
+    return nullptr;
   }
 
   // Peer State
@@ -254,7 +251,7 @@ class SiteToSiteClient : public core::Connectable {
   // Peer Connection
   std::unique_ptr<SiteToSitePeer> peer_;
 
-  std::atomic<bool> running_;
+  std::atomic<bool> running_{false};
 
   // transaction map
   std::map<utils::Identifier, std::shared_ptr<Transaction>> known_transactions_;
@@ -265,10 +262,10 @@ class SiteToSiteClient : public core::Connectable {
   /***
    * versioning
    */
-  uint32_t _supportedVersion[5];
+  uint32_t _supportedVersion[5] = {5, 4, 3, 2, 1};
   uint32_t _currentVersion;
   int _currentVersionIndex;
-  uint32_t _supportedCodecVersion[1];
+  uint32_t _supportedCodecVersion[1] = {1};
   uint32_t _currentCodecVersion;
   int _currentCodecVersionIndex;
 
@@ -286,13 +283,13 @@ class WriteCallback : public OutputStreamCallback {
   }
   DataPacket *_packet;
   // void process(std::ofstream *stream) {
-  int64_t process(const std::shared_ptr<io::BaseStream>& stream) {
+  int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
     uint8_t buffer[16384];
     uint64_t len = _packet->_size;
     uint64_t total = 0;
     while (len > 0) {
-      int size = len < 16384 ? static_cast<int>(len) : 16384;
-      int ret = _packet->transaction_->getStream().read(buffer, size);
+      const auto size = std::min(len, size_t{16384});
+      const auto ret = _packet->transaction_->getStream().read(buffer, size);
       if (ret != size) {
         logging::LOG_ERROR(_packet->logger_reference_) << "Site2Site Receive Flow Size " << size << " Failed " << ret << ", should have received " << len;
         return -1;
@@ -302,7 +299,7 @@ class WriteCallback : public OutputStreamCallback {
       total += size;
     }
     logging::LOG_INFO(_packet->logger_reference_) << "Received " << total << " from stream";
-    return len;
+    return gsl::narrow<int64_t>(len);
   }
 };
 // Nest Callback Class for read stream
@@ -312,21 +309,15 @@ class ReadCallback : public InputStreamCallback {
       : _packet(packet) {
   }
   DataPacket *_packet;
-  int64_t process(const std::shared_ptr<io::BaseStream>& stream) {
+  int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
     _packet->_size = 0;
     uint8_t buffer[8192] = { 0 };
-    int readSize;
     size_t size = 0;
     do {
-      readSize = stream->read(buffer, 8192);
-
-      if (readSize == 0) {
-        break;
-      }
-      if (readSize < 0) {
-        return -1;
-      }
-      int ret = _packet->transaction_->getStream().write(buffer, readSize);
+      const auto readSize = stream->read(buffer, 8192);
+      if (readSize == 0) break;
+      if (readSize == static_cast<size_t>(-1)) return -1;
+      const auto ret = _packet->transaction_->getStream().write(buffer, readSize);
       if (ret != readSize) {
         logging::LOG_INFO(_packet->logger_reference_) << "Site2Site Send Flow Size " << readSize << " Failed " << ret;
         return -1;
@@ -334,7 +325,7 @@ class ReadCallback : public InputStreamCallback {
       size += readSize;
     } while (size < stream->size());
     _packet->_size = size;
-    return size;
+    return gsl::narrow<int64_t>(size);
   }
 };
 
diff --git a/libminifi/include/utils/ByteArrayCallback.h b/libminifi/include/utils/ByteArrayCallback.h
index 84e3362..ace72b6 100644
--- a/libminifi/include/utils/ByteArrayCallback.h
+++ b/libminifi/include/utils/ByteArrayCallback.h
@@ -24,6 +24,7 @@
 #include "concurrentqueue.h"
 #include "FlowFileRecord.h"
 #include "core/logging/LoggerConfiguration.h"
+#include "utils/gsl.h"
 
 namespace org {
 namespace apache {
@@ -45,10 +46,10 @@ class ByteInputCallBack : public InputStreamCallback {
     if (stream->size() > 0) {
       vec.resize(stream->size());
 
-      stream->read(reinterpret_cast<uint8_t*>(vec.data()), gsl::narrow<int>(stream->size()));
+      stream->read(reinterpret_cast<uint8_t*>(vec.data()), stream->size());
     }
 
-    return vec.size();
+    return gsl::narrow<int64_t>(vec.size());
   }
 
   virtual void seek(size_t) { }
@@ -101,7 +102,7 @@ class ByteOutputCallback : public OutputStreamCallback {
 
   virtual int64_t process(const std::shared_ptr<io::BaseStream>& stream);
 
-  virtual const std::vector<char> to_string();
+  virtual std::vector<char> to_string();
 
   virtual void close();
 
diff --git a/libminifi/include/utils/Enum.h b/libminifi/include/utils/Enum.h
index 440b158..00facc4 100644
--- a/libminifi/include/utils/Enum.h
+++ b/libminifi/include/utils/Enum.h
@@ -72,8 +72,8 @@ namespace utils {
 
 #define SMART_ENUM_BODY(Clazz, ...) \
     constexpr Clazz(Type value = static_cast<Type>(-1)) : value_{value} {} \
-    Clazz(const std::string& str) : value_{parse(str.c_str()).value_} {} \
-    Clazz(const char* str) : value_{parse(str).value_} {} \
+    explicit Clazz(const std::string& str) : value_{parse(str.c_str()).value_} {} \
+    explicit Clazz(const char* str) : value_{parse(str).value_} {} \
    private: \
     Type value_; \
    public: \
diff --git a/libminifi/include/utils/FileOutputCallback.h b/libminifi/include/utils/FileOutputCallback.h
index 1343fae..ee68db5 100644
--- a/libminifi/include/utils/FileOutputCallback.h
+++ b/libminifi/include/utils/FileOutputCallback.h
@@ -51,7 +51,7 @@ class FileOutputCallback : public ByteOutputCallback {
 
   int64_t process(const std::shared_ptr<io::BaseStream>& stream) override;
 
-  const std::vector<char> to_string() override;
+  std::vector<char> to_string() override;
 
   void close() override;
 
diff --git a/libminifi/src/FlowFileRecord.cpp b/libminifi/src/FlowFileRecord.cpp
index 813137b..047c571 100644
--- a/libminifi/src/FlowFileRecord.cpp
+++ b/libminifi/src/FlowFileRecord.cpp
@@ -161,11 +161,9 @@ bool FlowFileRecord::Persist(const std::shared_ptr<core::Repository>& flowReposi
 }
 
 std::shared_ptr<FlowFileRecord> FlowFileRecord::DeSerialize(io::InputStream& inStream, const std::shared_ptr<core::ContentRepository>& content_repo, utils::Identifier& container) {
-  int ret;
-
   auto file = std::make_shared<FlowFileRecord>();
 
-  ret = inStream.read(file->event_time_);
+  auto ret = inStream.read(file->event_time_);
   if (ret != 8) {
     return {};
   }
@@ -181,12 +179,12 @@ std::shared_ptr<FlowFileRecord> FlowFileRecord::DeSerialize(io::InputStream& inS
   }
 
   ret = inStream.read(file->uuid_);
-  if (ret <= 0) {
+  if (ret == static_cast<size_t>(-1)) {
     return {};
   }
 
   ret = inStream.read(container);
-  if (ret <= 0) {
+  if (ret == static_cast<size_t>(-1)) {
     return {};
   }
 
@@ -200,12 +198,12 @@ std::shared_ptr<FlowFileRecord> FlowFileRecord::DeSerialize(io::InputStream& inS
   for (uint32_t i = 0; i < numAttributes; i++) {
     std::string key;
     ret = inStream.read(key, true);
-    if (ret <= 0) {
+    if (ret == static_cast<size_t>(-1)) {
       return {};
     }
     std::string value;
     ret = inStream.read(value, true);
-    if (ret <= 0) {
+    if (ret == static_cast<size_t>(-1)) {
       return {};
     }
     file->attributes_[key] = value;
@@ -213,7 +211,7 @@ std::shared_ptr<FlowFileRecord> FlowFileRecord::DeSerialize(io::InputStream& inS
 
   std::string content_full_path;
   ret = inStream.read(content_full_path);
-  if (ret <= 0) {
+  if (ret == static_cast<size_t>(-1)) {
     return {};
   }
 
diff --git a/libminifi/src/c2/ControllerSocketProtocol.cpp b/libminifi/src/c2/ControllerSocketProtocol.cpp
index 8322302..73b74ff 100644
--- a/libminifi/src/c2/ControllerSocketProtocol.cpp
+++ b/libminifi/src/c2/ControllerSocketProtocol.cpp
@@ -101,8 +101,8 @@ void ControllerSocketProtocol::initialize(core::controller::ControllerServicePro
         case Operation::START:
         {
           std::string componentStr;
-          int size = stream->read(componentStr);
-          if ( size != -1 ) {
+          const auto size = stream->read(componentStr);
+          if ( size != static_cast<size_t>(-1) ) {
             auto components = update_sink_->getComponents(componentStr);
             for (const auto& component : components) {
               component->start();
@@ -115,8 +115,8 @@ void ControllerSocketProtocol::initialize(core::controller::ControllerServicePro
         case Operation::STOP:
         {
           std::string componentStr;
-          int size = stream->read(componentStr);
-          if ( size != -1 ) {
+          const auto size = stream->read(componentStr);
+          if ( size != static_cast<size_t>(-1) ) {
             auto components = update_sink_->getComponents(componentStr);
             for (const auto& component : components) {
               component->stop();
@@ -129,8 +129,8 @@ void ControllerSocketProtocol::initialize(core::controller::ControllerServicePro
         case Operation::CLEAR:
         {
           std::string connection;
-          int size = stream->read(connection);
-          if ( size != -1 ) {
+          const auto size = stream->read(connection);
+          if ( size != static_cast<size_t>(-1) ) {
             update_sink_->clearConnection(connection);
           }
         }
@@ -138,18 +138,18 @@ void ControllerSocketProtocol::initialize(core::controller::ControllerServicePro
         case Operation::UPDATE:
         {
           std::string what;
-          int size = stream->read(what);
-          if (size == -1) {
+          const auto size = stream->read(what);
+          if (size == static_cast<size_t>(-1)) {
             logger_->log_debug("Connection broke");
             break;
           }
           if (what == "flow") {
             std::string ff_loc;
-            int size = stream->read(ff_loc);
+            const auto size = stream->read(ff_loc);
             std::ifstream tf(ff_loc);
             std::string configuration((std::istreambuf_iterator<char>(tf)),
                 std::istreambuf_iterator<char>());
-            if (size == -1) {
+            if (size == static_cast<size_t>(-1)) {
               logger_->log_debug("Connection broke");
               break;
             }
@@ -160,15 +160,15 @@ void ControllerSocketProtocol::initialize(core::controller::ControllerServicePro
         case Operation::DESCRIBE:
         {
           std::string what;
-          int size = stream->read(what);
-          if (size == -1) {
+          const auto size = stream->read(what);
+          if (size == static_cast<size_t>(-1)) {
             logger_->log_debug("Connection broke");
             break;
           }
           if (what == "queue") {
             std::string connection;
-            int size = stream->read(connection);
-            if (size == -1) {
+            const auto size_ = stream->read(connection);
+            if (size_ == static_cast<size_t>(-1)) {
               logger_->log_debug("Connection broke");
               break;
             }
@@ -184,8 +184,8 @@ void ControllerSocketProtocol::initialize(core::controller::ControllerServicePro
           } else if (what == "components") {
             io::BufferStream resp;
             resp.write(&head, 1);
-            uint16_t size = gsl::narrow<uint16_t>(update_sink_->getAllComponents().size());
-            resp.write(size);
+            const auto size_ = gsl::narrow<uint16_t>(update_sink_->getAllComponents().size());
+            resp.write(size_);
             for (const auto &component : update_sink_->getAllComponents()) {
               resp.write(component->getComponentName());
               resp.write(component->isRunning() ? "true" : "false");
@@ -210,8 +210,8 @@ void ControllerSocketProtocol::initialize(core::controller::ControllerServicePro
           } else if (what == "connections") {
             io::BufferStream resp;
             resp.write(&head, 1);
-            uint16_t size = gsl::narrow<uint16_t>(queue_full_.size());
-            resp.write(size);
+            const auto size_ = gsl::narrow<uint16_t>(queue_full_.size());
+            resp.write(size_);
             for (const auto &connection : queue_full_) {
               resp.write(connection.first, false);
             }
@@ -220,17 +220,17 @@ void ControllerSocketProtocol::initialize(core::controller::ControllerServicePro
             std::vector<std::string> full_connections;
             {
               std::lock_guard<std::mutex> lock(controller_mutex_);
-              for (auto conn : queue_full_) {
-                if (conn.second == true) {
+              for (const auto& conn : queue_full_) {
+                if (conn.second) {
                   full_connections.push_back(conn.first);
                 }
               }
             }
             io::BufferStream resp;
             resp.write(&head, 1);
-            uint16_t full_connection_count = gsl::narrow<uint16_t>(full_connections.size());
+            const auto full_connection_count = gsl::narrow<uint16_t>(full_connections.size());
             resp.write(full_connection_count);
-            for (auto conn : full_connections) {
+            for (const auto& conn : full_connections) {
               resp.write(conn);
             }
             stream->write(const_cast<uint8_t*>(resp.getBuffer()), gsl::narrow<int>(resp.size()));
@@ -248,11 +248,11 @@ void ControllerSocketProtocol::initialize(core::controller::ControllerServicePro
 void ControllerSocketProtocol::parse_content(const std::vector<C2ContentResponse> &content) {
   for (const auto &payload_content : content) {
     if (payload_content.name == "Components") {
-      for (auto content : payload_content.operation_arguments) {
+      for (const auto& content_ : payload_content.operation_arguments) {
         bool is_enabled = false;
-        minifi::utils::StringUtils::StringToBool(content.second.to_string(), is_enabled);
+        minifi::utils::StringUtils::StringToBool(content_.second.to_string(), is_enabled);
         std::lock_guard<std::mutex> lock(controller_mutex_);
-        component_map_[content.first] = is_enabled;
+        component_map_[content_.first] = is_enabled;
       }
     }
   }
@@ -261,21 +261,20 @@ void ControllerSocketProtocol::parse_content(const std::vector<C2ContentResponse
 int16_t ControllerSocketProtocol::heartbeat(const C2Payload &payload) {
   if (server_socket_ == nullptr)
     return 0;
-  const std::vector<C2ContentResponse> &content = payload.getContent();
   for (const auto &pc : payload.getNestedPayloads()) {
     if (pc.getLabel() == "flowInfo" || pc.getLabel() == "metrics") {
       for (const auto &metrics_payload : pc.getNestedPayloads()) {
         if (metrics_payload.getLabel() == "QueueMetrics" || metrics_payload.getLabel() == "queues") {
           for (const auto &queue_metrics : metrics_payload.getNestedPayloads()) {
-            auto metric_content = queue_metrics.getContent();
-            for (const auto &payload_content : queue_metrics.getContent()) {
+            const auto& metric_content = queue_metrics.getContent();
+            for (const auto &payload_content : metric_content) {
               uint64_t size = 0;
               uint64_t max = 0;
-              for (auto content : payload_content.operation_arguments) {
+              for (const auto& content : payload_content.operation_arguments) {
                 if (content.first == "datasize") {
-                  size = std::stol(content.second.to_string());
+                  size = std::stoull(content.second.to_string());
                 } else if (content.first == "datasizemax") {
-                  max = std::stol(content.second.to_string());
+                  max = std::stoull(content.second.to_string());
                 }
               }
               std::lock_guard<std::mutex> lock(controller_mutex_);
@@ -293,7 +292,7 @@ int16_t ControllerSocketProtocol::heartbeat(const C2Payload &payload) {
     }
   }
 
-  parse_content(content);
+  parse_content(payload.getContent());
 
   std::vector<uint8_t> buffer;
   buffer.resize(1024);
diff --git a/libminifi/src/core/ProcessSession.cpp b/libminifi/src/core/ProcessSession.cpp
index f8e8c31..6a4077e 100644
--- a/libminifi/src/core/ProcessSession.cpp
+++ b/libminifi/src/core/ProcessSession.cpp
@@ -325,8 +325,8 @@ void ProcessSession::importFrom(io::InputStream&& stream, const std::shared_ptr<
  *
  */
 void ProcessSession::importFrom(io::InputStream &stream, const std::shared_ptr<core::FlowFile> &flow) {
-  std::shared_ptr<ResourceClaim> claim = content_session_->create();
-  int max_read = getpagesize();
+  const std::shared_ptr<ResourceClaim> claim = content_session_->create();
+  const auto max_read = gsl::narrow_cast<size_t>(getpagesize());
   std::vector<uint8_t> charBuffer(max_read);
 
   try {
@@ -336,10 +336,10 @@ void ProcessSession::importFrom(io::InputStream &stream, const std::shared_ptr<c
     if (nullptr == content_stream) {
       throw Exception(FILE_OPERATION_EXCEPTION, "Could not obtain claim for " + claim->getContentFullPath());
     }
-    int position = 0;
-    const int max_size = gsl::narrow<int>(stream.size());
+    size_t position = 0;
+    const auto max_size = stream.size();
     while (position < max_size) {
-      const int read_size = std::min(max_read, max_size - position);
+      const auto read_size = std::min(max_read, max_size - position);
       stream.read(charBuffer, read_size);
 
       content_stream->write(charBuffer.data(), read_size);
diff --git a/libminifi/src/core/ProcessSessionReadCallback.cpp b/libminifi/src/core/ProcessSessionReadCallback.cpp
index 8a21f73..d62fac8 100644
--- a/libminifi/src/core/ProcessSessionReadCallback.cpp
+++ b/libminifi/src/core/ProcessSessionReadCallback.cpp
@@ -24,6 +24,7 @@
 
 #include "core/logging/LoggerConfiguration.h"
 #include "io/BaseStream.h"
+#include "utils/gsl.h"
 
 namespace org {
 namespace apache {
@@ -47,20 +48,16 @@ int64_t ProcessSessionReadCallback::process(const std::shared_ptr<io::BaseStream
   size_t size = 0;
   uint8_t buffer[8192];
   do {
-    int read = stream->read(buffer, 8192);
-    if (read < 0) {
-      return -1;
-    }
-    if (read == 0) {
-      break;
-    }
+    const auto read = stream->read(buffer, 8192);
+    if (read == static_cast<size_t>(-1)) return -1;
+    if (read == 0) break;
     if (!_tmpFileOs.write(reinterpret_cast<char*>(buffer), read)) {
       return -1;
     }
     size += read;
   } while (size < stream->size());
   _writeSucceeded = true;
-  return size;
+  return gsl::narrow<int64_t>(size);
 }
 
 // Renames tmp file to final destination
diff --git a/libminifi/src/io/BufferStream.cpp b/libminifi/src/io/BufferStream.cpp
index 6d3ebab..12220ef 100644
--- a/libminifi/src/io/BufferStream.cpp
+++ b/libminifi/src/io/BufferStream.cpp
@@ -36,17 +36,16 @@ int BufferStream::write(const uint8_t *value, int size) {
   return size;
 }
 
-int BufferStream::read(uint8_t *buf, int len) {
-  gsl_Expects(len >= 0);
-  int bytes_available_in_buffer = gsl::narrow<int>(buffer_.size() - readOffset_);
-  len = std::min(len, bytes_available_in_buffer);
-  auto begin = buffer_.begin() + readOffset_;
-  std::copy(begin, begin + len, buf);
+size_t BufferStream::read(uint8_t *buf, size_t len) {
+  const auto bytes_available_in_buffer = buffer_.size() - readOffset_;
+  const auto readlen = std::min(len, bytes_available_in_buffer);
+  auto begin = buffer_.begin() + gsl::narrow<decltype(buffer_)::difference_type>(readOffset_);
+  std::copy(begin, begin + gsl::narrow<decltype(buffer_)::difference_type>(readlen), buf);
 
   // increase offset for the next read
-  readOffset_ += len;
+  readOffset_ += readlen;
 
-  return len;
+  return readlen;
 }
 
 } /* namespace io */
diff --git a/libminifi/src/io/ClientSocket.cpp b/libminifi/src/io/ClientSocket.cpp
index 6427447..cc65a8b 100644
--- a/libminifi/src/io/ClientSocket.cpp
+++ b/libminifi/src/io/ClientSocket.cpp
@@ -47,6 +47,8 @@
 #include "core/logging/LoggerConfiguration.h"
 #include "utils/file/FileUtils.h"
 #include "utils/GeneralUtils.h"
+#include "utils/gsl.h"
+
 namespace util = org::apache::nifi::minifi::utils;
 namespace mio = org::apache::nifi::minifi::io;
 
@@ -523,9 +525,8 @@ int Socket::write(const uint8_t *value, int size) {
   return bytes;
 }
 
-int Socket::read(uint8_t *buf, int buflen, bool retrieve_all_bytes) {
-  gsl_Expects(buflen >= 0);
-  int32_t total_read = 0;
+size_t Socket::read(uint8_t *buf, size_t buflen, bool retrieve_all_bytes) {
+  size_t total_read = 0;
   while (buflen) {
     int16_t fd = select_descriptor(1000);
     if (fd < 0) {
@@ -533,9 +534,9 @@ int Socket::read(uint8_t *buf, int buflen, bool retrieve_all_bytes) {
         logger_->log_debug("fd %d close %i", fd, buflen);
         utils::file::FileUtils::close(socket_file_descriptor_);
       }
-      return -1;
+      return static_cast<size_t>(-1);
     }
-    int bytes_read = recv(fd, reinterpret_cast<char*>(buf), buflen, 0);
+    const auto bytes_read = recv(fd, reinterpret_cast<char*>(buf), buflen, 0);
     logger_->log_trace("Recv call %d", bytes_read);
     if (bytes_read <= 0) {
       if (bytes_read == 0) {
@@ -545,23 +546,23 @@ int Socket::read(uint8_t *buf, int buflen, bool retrieve_all_bytes) {
         int err = WSAGetLastError();
         if (err == WSAEWOULDBLOCK) {
           // continue
-          return -2;
+          return static_cast<size_t>(-2);
         }
         logger_->log_error("Could not recv on %d (port %d), error code: %d", fd, port_, err);
 #else
         if (errno == EAGAIN || errno == EWOULDBLOCK) {
           // continue
-          return -2;
+          return static_cast<size_t>(-2);
         }
         logger_->log_error("Could not recv on %d (port %d), error: %s", fd, port_, strerror(errno));
 
 #endif  // WIN32
       }
-      return -1;
+      return static_cast<size_t>(-1);
     }
-    buflen -= bytes_read;
+    buflen -= gsl::narrow<size_t>(bytes_read);
     buf += bytes_read;
-    total_read += bytes_read;
+    total_read += gsl::narrow<size_t>(bytes_read);
     if (!retrieve_all_bytes) {
       break;
     }
diff --git a/libminifi/src/io/DescriptorStream.cpp b/libminifi/src/io/DescriptorStream.cpp
index 2b5fc70..6330b5e 100644
--- a/libminifi/src/io/DescriptorStream.cpp
+++ b/libminifi/src/io/DescriptorStream.cpp
@@ -40,7 +40,7 @@ DescriptorStream::DescriptorStream(int fd)
       logger_(logging::LoggerFactory<DescriptorStream>::getLogger()) {
 }
 
-void DescriptorStream::seek(uint64_t offset) {
+void DescriptorStream::seek(size_t offset) {
   std::lock_guard<std::recursive_mutex> lock(file_lock_);
 #ifdef WIN32
   _lseeki64(fd_, gsl::narrow<int64_t>(offset), 0x00);
@@ -70,25 +70,24 @@ int DescriptorStream::write(const uint8_t *value, int size) {
   }
 }
 
-int DescriptorStream::read(uint8_t *buf, int buflen) {
-  gsl_Expects(buflen >= 0);
+size_t DescriptorStream::read(uint8_t *buf, size_t buflen) {
   if (buflen == 0) {
     return 0;
   }
   if (!IsNullOrEmpty(buf)) {
 #ifdef WIN32
-    auto size_read = _read(fd_, buf, buflen);
+    const auto size_read = _read(fd_, buf, buflen);
 #else
-    auto size_read = ::read(fd_, buf, buflen);
+    const auto size_read = ::read(fd_, buf, buflen);
 #endif
 
     if (size_read < 0) {
-      return -1;
+      return static_cast<size_t>(-1);
     }
-    return  size_read;
+    return gsl::narrow<size_t>(size_read);
 
   } else {
-    return -1;
+    return static_cast<size_t>(-1);
   }
 }
 
diff --git a/libminifi/src/io/FileStream.cpp b/libminifi/src/io/FileStream.cpp
index 4f759ae..c88f8dd 100644
--- a/libminifi/src/io/FileStream.cpp
+++ b/libminifi/src/io/FileStream.cpp
@@ -97,13 +97,13 @@ void FileStream::close() {
   file_stream_.reset();
 }
 
-void FileStream::seek(uint64_t offset) {
+void FileStream::seek(size_t offset) {
   std::lock_guard<std::mutex> lock(file_lock_);
   if (file_stream_ == nullptr || !file_stream_->is_open()) {
     logging::LOG_ERROR(logger_) << SEEK_ERROR_MSG << INVALID_FILE_STREAM_ERROR_MSG;
     return;
   }
-  offset_ = gsl::narrow<size_t>(offset);
+  offset_ = offset;
   file_stream_->clear();
   if (!file_stream_->seekg(offset_))
     logging::LOG_ERROR(logger_) << SEEK_ERROR_MSG << SEEKG_CALL_ERROR_MSG;
@@ -142,8 +142,7 @@ int FileStream::write(const uint8_t *value, int size) {
   }
 }
 
-int FileStream::read(uint8_t *buf, int buflen) {
-  gsl_Expects(buflen >= 0);
+size_t FileStream::read(uint8_t *buf, size_t buflen) {
   if (buflen == 0) {
     return 0;
   }
@@ -151,32 +150,31 @@ int FileStream::read(uint8_t *buf, int buflen) {
     std::lock_guard<std::mutex> lock(file_lock_);
     if (file_stream_ == nullptr || !file_stream_->is_open()) {
       logging::LOG_ERROR(logger_) << READ_ERROR_MSG << INVALID_FILE_STREAM_ERROR_MSG;
-      return -1;
+      return static_cast<size_t>(-1);
     }
-    file_stream_->read(reinterpret_cast<char*>(buf), buflen);
+    file_stream_->read(reinterpret_cast<char*>(buf), gsl::narrow<std::streamsize>(buflen));
     if (file_stream_->eof() || file_stream_->fail()) {
       file_stream_->clear();
       seekToEndOfFile(READ_ERROR_MSG);
       auto tellg_result = file_stream_->tellg();
       if (tellg_result == std::streampos(-1)) {
         logging::LOG_ERROR(logger_) << READ_ERROR_MSG << TELLG_CALL_ERROR_MSG;
-        return -1;
+        return static_cast<size_t>(-1);
       }
-      size_t len = gsl::narrow<size_t>(tellg_result);
+      const auto len = gsl::narrow<size_t>(tellg_result);
       size_t ret = len - offset_;
       offset_ = len;
       length_ = len;
       logging::LOG_DEBUG(logger_) << path_ << " eof bit, ended at " << offset_;
-      return gsl::narrow<int>(ret);
+      return ret;
     } else {
       offset_ += buflen;
-      file_stream_->seekp(offset_);
+      file_stream_->seekp(gsl::narrow<std::streamoff>(offset_));
       return buflen;
     }
-
   } else {
     logging::LOG_ERROR(logger_) << READ_ERROR_MSG << INVALID_BUFFER_ERROR_MSG;
-    return -1;
+    return static_cast<size_t>(-1);
   }
 }
 
diff --git a/libminifi/src/io/InputStream.cpp b/libminifi/src/io/InputStream.cpp
index cdd101e..cd4166b 100644
--- a/libminifi/src/io/InputStream.cpp
+++ b/libminifi/src/io/InputStream.cpp
@@ -30,42 +30,43 @@ namespace nifi {
 namespace minifi {
 namespace io {
 
-int InputStream::read(std::vector<uint8_t>& buffer, int len) {
-  if (buffer.size() < gsl::narrow<size_t>(len)) {
+size_t InputStream::read(std::vector<uint8_t>& buffer, size_t len) {
+  if (buffer.size() < len) {
     buffer.resize(len);
   }
-  int ret = read(buffer.data(), len);
-  buffer.resize((std::max)(ret, 0));
+  const auto ret = read(buffer.data(), len);
+  if (ret == static_cast<size_t>(-1)) return ret;
+  buffer.resize((std::max)(ret, size_t{0}));
   return ret;
 }
 
-int InputStream::read(bool &value) {
+size_t InputStream::read(bool &value) {
   uint8_t buf = 0;
 
   if (read(&buf, 1) != 1) {
-    return -1;
+    return static_cast<size_t>(-1);
   }
   value = buf;
   return 1;
 }
 
-int InputStream::read(utils::Identifier &value) {
+size_t InputStream::read(utils::Identifier &value) {
   std::string uuidStr;
-  int ret = read(uuidStr);
+  const auto ret = read(uuidStr);
   if (ret < 0) {
     return ret;
   }
   auto optional_uuid = utils::Identifier::parse(uuidStr);
   if (!optional_uuid) {
-    return -1;
+    return static_cast<size_t>(-1);
   }
   value = optional_uuid.value();
   return ret;
 }
 
-int InputStream::read(std::string &str, bool widen) {
+size_t InputStream::read(std::string &str, bool widen) {
   uint32_t len = 0;
-  int ret = 0;
+  size_t ret = 0;
   if (!widen) {
     uint16_t shortLength = 0;
     ret = read(shortLength);
@@ -84,9 +85,9 @@ int InputStream::read(std::string &str, bool widen) {
   }
 
   std::vector<uint8_t> buffer(len);
-  uint32_t bytes_read = gsl::narrow<uint32_t>(read(buffer.data(), len));
+  const auto bytes_read = gsl::narrow<uint32_t>(read(buffer.data(), len));
   if (bytes_read != len) {
-    return -1;
+    return static_cast<size_t>(-1);
   }
 
   str = std::string(reinterpret_cast<const char*>(buffer.data()), len);
diff --git a/libminifi/src/io/tls/SecureDescriptorStream.cpp b/libminifi/src/io/tls/SecureDescriptorStream.cpp
index c5b0b3f..66d23b5 100644
--- a/libminifi/src/io/tls/SecureDescriptorStream.cpp
+++ b/libminifi/src/io/tls/SecureDescriptorStream.cpp
@@ -34,7 +34,7 @@ SecureDescriptorStream::SecureDescriptorStream(int fd, SSL *ssl)
       logger_(logging::LoggerFactory<SecureDescriptorStream>::getLogger()) {
 }
 
-void SecureDescriptorStream::seek(uint64_t offset) {
+void SecureDescriptorStream::seek(size_t offset) {
   std::lock_guard<std::recursive_mutex> lock(file_lock_);
 #ifdef WIN32
   _lseeki64(fd_, gsl::narrow<int64_t>(offset), 0x00);
@@ -71,34 +71,29 @@ int SecureDescriptorStream::write(const uint8_t *value, int size) {
   }
 }
 
-int SecureDescriptorStream::read(uint8_t *buf, int buflen) {
-  gsl_Expects(buflen >= 0);
+size_t SecureDescriptorStream::read(uint8_t * const buf, const size_t buflen) {
   if (buflen == 0) {
     return 0;
   }
-  if (!IsNullOrEmpty(buf)) {
-    int total_read = 0;
-      int status = 0;
-      while (buflen) {
-        int sslStatus;
-        do {
-          status = SSL_read(ssl_, buf, buflen);
-          sslStatus = SSL_get_error(ssl_, status);
-        } while (status < 0 && sslStatus == SSL_ERROR_WANT_READ);
+  if (IsNullOrEmpty(buf)) return static_cast<size_t>(-1);
+  size_t total_read = 0;
+  uint8_t* writepos = buf;
+  while (buflen > total_read) {
+    int status;
+    int sslStatus;
+    do {
+      status = SSL_read(ssl_, writepos, gsl::narrow<int>(std::min(buflen - total_read, gsl::narrow<size_t>(std::numeric_limits<int>::max()))));
+      sslStatus = SSL_get_error(ssl_, status);
+    } while (status < 0 && sslStatus == SSL_ERROR_WANT_READ);
 
-        if (status < 0)
-              break;
+    if (status < 0)
+      break;
 
-        buflen -= status;
-        buf += status;
-        total_read += status;
-      }
-
-      return total_read;
-
-  } else {
-    return -1;
+    writepos += status;
+    total_read += gsl::narrow<size_t>(status);
   }
+
+  return total_read;
 }
 
 } /* namespace io */
diff --git a/libminifi/src/io/tls/TLSSocket.cpp b/libminifi/src/io/tls/TLSSocket.cpp
index 776fa44..49d07ec 100644
--- a/libminifi/src/io/tls/TLSSocket.cpp
+++ b/libminifi/src/io/tls/TLSSocket.cpp
@@ -351,36 +351,37 @@ int16_t TLSSocket::select_descriptor(const uint16_t msec) {
   return -1;
 }
 
-int TLSSocket::read(uint8_t *buf, int buflen, bool /*retrieve_all_bytes*/) {
-  gsl_Expects(buflen >= 0);
-  int total_read = 0;
+size_t TLSSocket::read(uint8_t *buf, size_t buflen, bool /*retrieve_all_bytes*/) {
+  size_t total_read = 0;
   int status = 0;
   int loc = 0;
   int16_t fd = select_descriptor(1000);
   if (fd < 0) {
     close();
-    return -1;
+    return static_cast<size_t>(-1);
   }
   auto fd_ssl = get_ssl(fd);
   if (IsNullOrEmpty(fd_ssl)) {
-    return -1;
+    return static_cast<size_t>(-1);
   }
   if (!SSL_pending(fd_ssl)) {
     return 0;
   }
   while (buflen) {
     if (fd <= 0) {
-      return -1;
+      return static_cast<size_t>(-1);
     }
     int sslStatus;
     do {
-      status = SSL_read(fd_ssl, buf + loc, buflen);
+      status = SSL_read(fd_ssl, buf + loc, gsl::narrow<int>(std::min(buflen, gsl::narrow<size_t>(std::numeric_limits<int>::max()))));
       sslStatus = SSL_get_error(fd_ssl, status);
     } while (status < 0 && sslStatus == SSL_ERROR_WANT_READ && SSL_pending(fd_ssl));
 
-    buflen -= status;
+    if (status < 0) break;
+
+    buflen -= gsl::narrow<size_t>(status);
     loc += status;
-    total_read += status;
+    total_read += gsl::narrow<size_t>(status);
   }
 
   return total_read;
@@ -418,33 +419,32 @@ int TLSSocket::write(const uint8_t *value, int size) {
   return writeData(value, size, fd);
 }
 
-int TLSSocket::read(uint8_t *buf, int buflen) {
-  gsl_Expects(buflen >= 0);
-  int total_read = 0;
+size_t TLSSocket::read(uint8_t *buf, size_t buflen) {
+  size_t total_read = 0;
   int status = 0;
   while (buflen) {
-    int16_t fd = select_descriptor(1000);
+    const int16_t fd = select_descriptor(1000);
     if (fd < 0) {
       close();
-      return -1;
+      return static_cast<size_t>(-1);
     }
 
     int sslStatus;
     do {
       auto fd_ssl = get_ssl(fd);
       if (IsNullOrEmpty(fd_ssl)) {
-        return -1;
+        return static_cast<size_t>(-1);
       }
-      status = SSL_read(fd_ssl, buf, buflen);
+      status = SSL_read(fd_ssl, buf, gsl::narrow<int>(std::min(buflen, gsl::narrow<size_t>(std::numeric_limits<int>::max()))));
       sslStatus = SSL_get_error(fd_ssl, status);
     } while (status < 0 && sslStatus == SSL_ERROR_WANT_READ);
 
     if (status < 0)
       break;
 
-    buflen -= status;
+    buflen -= gsl::narrow<size_t>(status);
     buf += status;
-    total_read += status;
+    total_read += gsl::narrow<size_t>(status);
   }
 
   return total_read;
diff --git a/libminifi/src/provenance/Provenance.cpp b/libminifi/src/provenance/Provenance.cpp
index f9af7f9..2fac08b 100644
--- a/libminifi/src/provenance/Provenance.cpp
+++ b/libminifi/src/provenance/Provenance.cpp
@@ -235,12 +235,10 @@ bool ProvenanceEventRecord::Serialize(const std::shared_ptr<core::SerializableCo
 }
 
 bool ProvenanceEventRecord::DeSerialize(const uint8_t *buffer, const size_t bufferSize) {
-  int ret;
-
   org::apache::nifi::minifi::io::BufferStream outStream(buffer, gsl::narrow<unsigned int>(bufferSize));
 
-  ret = outStream.read(uuid_);
-  if (ret <= 0) {
+  auto ret = outStream.read(uuid_);
+  if (ret == static_cast<size_t>(-1)) {
     return false;
   }
 
@@ -272,22 +270,22 @@ bool ProvenanceEventRecord::DeSerialize(const uint8_t *buffer, const size_t buff
   }
 
   ret = outStream.read(this->_componentId);
-  if (ret <= 0) {
+  if (ret == static_cast<size_t>(-1)) {
     return false;
   }
 
   ret = outStream.read(this->_componentType);
-  if (ret <= 0) {
+  if (ret == static_cast<size_t>(-1)) {
     return false;
   }
 
   ret = outStream.read(this->flow_uuid_);
-  if (ret <= 0) {
+  if (ret == static_cast<size_t>(-1)) {
     return false;
   }
 
   ret = outStream.read(this->_details);
-  if (ret <= 0) {
+  if (ret == static_cast<size_t>(-1)) {
     return false;
   }
 
@@ -301,19 +299,19 @@ bool ProvenanceEventRecord::DeSerialize(const uint8_t *buffer, const size_t buff
   for (uint32_t i = 0; i < numAttributes; i++) {
     std::string key;
     ret = outStream.read(key);
-    if (ret <= 0) {
+    if (ret == static_cast<size_t>(-1)) {
       return false;
     }
     std::string value;
     ret = outStream.read(value);
-    if (ret <= 0) {
+    if (ret == static_cast<size_t>(-1)) {
       return false;
     }
     this->_attributes[key] = value;
   }
 
   ret = outStream.read(this->_contentFullPath);
-  if (ret <= 0) {
+  if (ret == static_cast<size_t>(-1)) {
     return false;
   }
 
@@ -328,7 +326,7 @@ bool ProvenanceEventRecord::DeSerialize(const uint8_t *buffer, const size_t buff
   }
 
   ret = outStream.read(this->_sourceQueueIdentifier);
-  if (ret <= 0) {
+  if (ret == static_cast<size_t>(-1)) {
     return false;
   }
 
@@ -343,7 +341,7 @@ bool ProvenanceEventRecord::DeSerialize(const uint8_t *buffer, const size_t buff
     for (uint32_t i = 0; i < number; i++) {
       utils::Identifier parentUUID;
       ret = outStream.read(parentUUID);
-      if (ret <= 0) {
+      if (ret == static_cast<size_t>(-1)) {
         return false;
       }
       this->addParentUuid(parentUUID);
@@ -356,23 +354,23 @@ bool ProvenanceEventRecord::DeSerialize(const uint8_t *buffer, const size_t buff
     for (uint32_t i = 0; i < number; i++) {
       utils::Identifier childUUID;
       ret = outStream.read(childUUID);
-      if (ret <= 0) {
+      if (ret == static_cast<size_t>(-1)) {
         return false;
       }
       this->addChildUuid(childUUID);
     }
   } else if (this->_eventType == ProvenanceEventRecord::SEND || this->_eventType == ProvenanceEventRecord::FETCH) {
     ret = outStream.read(this->_transitUri);
-    if (ret <= 0) {
+    if (ret == static_cast<size_t>(-1)) {
       return false;
     }
   } else if (this->_eventType == ProvenanceEventRecord::RECEIVE) {
     ret = outStream.read(this->_transitUri);
-    if (ret <= 0) {
+    if (ret == static_cast<size_t>(-1)) {
       return false;
     }
     ret = outStream.read(this->_sourceSystemFlowFileIdentifier);
-    if (ret <= 0) {
+    if (ret == static_cast<size_t>(-1)) {
       return false;
     }
   }
diff --git a/libminifi/src/sitetosite/RawSocketProtocol.cpp b/libminifi/src/sitetosite/RawSocketProtocol.cpp
index 7ea213f..0eb1f7e 100644
--- a/libminifi/src/sitetosite/RawSocketProtocol.cpp
+++ b/libminifi/src/sitetosite/RawSocketProtocol.cpp
@@ -342,40 +342,40 @@ bool RawSiteToSiteClient::getPeerList(std::vector<PeerStatus> &peers) {
     }
 
     uint32_t number;
-    status = peer_->read(number);
+    auto ret = peer_->read(number);
 
-    if (status <= 0) {
+    if (ret == static_cast<size_t>(-1)) {
       tearDown();
       return false;
     }
 
     for (uint32_t i = 0; i < number; i++) {
       std::string host;
-      status = peer_->read(host);
-      if (status <= 0) {
+      ret = peer_->read(host);
+      if (ret == static_cast<size_t>(-1)) {
         tearDown();
         return false;
       }
       uint32_t port;
-      status = peer_->read(port);
-      if (status <= 0) {
+      ret = peer_->read(port);
+      if (ret == static_cast<size_t>(-1)) {
         tearDown();
         return false;
       }
       uint8_t secure;
-      status = peer_->read(secure);
-      if (status <= 0) {
+      ret = peer_->read(secure);
+      if (ret == static_cast<size_t>(-1)) {
         tearDown();
         return false;
       }
       uint32_t count;
-      status = peer_->read(count);
-      if (status <= 0) {
+      ret = peer_->read(count);
+      if (ret == static_cast<size_t>(-1)) {
         tearDown();
         return false;
       }
-      PeerStatus status(std::make_shared<Peer>(port_id_, host, gsl::narrow<uint16_t>(port), secure != 0), count, true);
-      peers.push_back(std::move(status));
+      PeerStatus peer_status(std::make_shared<Peer>(port_id_, host, gsl::narrow<uint16_t>(port), secure != 0), count, true);
+      peers.push_back(std::move(peer_status));
       logging::LOG_TRACE(logger_) << "Site2Site Peer host " << host << " port " << port << " Secure " << std::to_string(secure);
     }
 
@@ -397,15 +397,14 @@ bool RawSiteToSiteClient::getPeerList(std::vector<PeerStatus> &peers) {
   int RawSiteToSiteClient::readRequestType(RequestType &type) {
     std::string requestTypeStr;
 
-    int ret = peer_->read(requestTypeStr);
-
-    if (ret <= 0)
-      return ret;
+    const auto ret = peer_->read(requestTypeStr);
+    if (ret == static_cast<size_t>(-1))
+      return gsl::narrow_cast<int>(ret);
 
     for (int i = NEGOTIATE_FLOWFILE_CODEC; i <= SHUTDOWN; i++) {
       if (SiteToSiteRequest::RequestTypeStr[i] == requestTypeStr) {
         type = (RequestType) i;
-        return ret;
+        return gsl::narrow_cast<int>(ret);
       }
     }
 
@@ -417,7 +416,7 @@ int RawSiteToSiteClient::readRespond(const std::shared_ptr<Transaction> &transac
 }
 
 int RawSiteToSiteClient::writeRespond(const std::shared_ptr<Transaction> &transaction, RespondCode code, std::string message) {
-  return writeResponse(transaction, code, message);
+  return writeResponse(transaction, code, std::move(message));
 }
 
 bool RawSiteToSiteClient::negotiateCodec() {
diff --git a/libminifi/src/sitetosite/SiteToSiteClient.cpp b/libminifi/src/sitetosite/SiteToSiteClient.cpp
index 1833144..fb24f73 100644
--- a/libminifi/src/sitetosite/SiteToSiteClient.cpp
+++ b/libminifi/src/sitetosite/SiteToSiteClient.cpp
@@ -32,7 +32,7 @@ namespace sitetosite {
 int SiteToSiteClient::readResponse(const std::shared_ptr<Transaction>& /*transaction*/, RespondCode &code, std::string &message) {
   uint8_t firstByte;
 
-  int ret = peer_->read(firstByte);
+  auto ret = peer_->read(firstByte);
 
   if (ret <= 0 || firstByte != CODE_SEQUENCE_VALUE_1)
     return -1;
@@ -48,27 +48,25 @@ int SiteToSiteClient::readResponse(const std::shared_ptr<Transaction>& /*transac
 
   ret = peer_->read(thirdByte);
 
-  if (ret <= 0)
-    return ret;
+  if (ret != static_cast<size_t>(-1))
+    return gsl::narrow_cast<int>(ret);
 
   code = (RespondCode) thirdByte;
 
   RespondCodeContext *resCode = this->getRespondCodeContext(code);
-
-  if (resCode == NULL) {
-    // Not a valid respond code
+  if (!resCode) {
     return -1;
   }
   if (resCode->hasDescription) {
     ret = peer_->read(message);
-    if (ret <= 0)
+    if (ret != static_cast<size_t>(-1))
       return -1;
   }
   return gsl::narrow<int>(3 + message.size());
 }
 
 void SiteToSiteClient::deleteTransaction(const utils::Identifier& transactionID) {
-  std::shared_ptr<Transaction> transaction = NULL;
+  std::shared_ptr<Transaction> transaction;
 
   auto it = this->known_transactions_.find(transactionID);
 
@@ -85,7 +83,7 @@ void SiteToSiteClient::deleteTransaction(const utils::Identifier& transactionID)
 int SiteToSiteClient::writeResponse(const std::shared_ptr<Transaction>& /*transaction*/, RespondCode code, std::string message) {
   RespondCodeContext *resCode = this->getRespondCodeContext(code);
 
-  if (resCode == NULL) {
+  if (!resCode) {
     // Not a valid respond code
     return -1;
   }
@@ -205,7 +203,7 @@ bool SiteToSiteClient::transferFlowFiles(const std::shared_ptr<core::ProcessCont
 
 bool SiteToSiteClient::confirm(const utils::Identifier& transactionID) {
   int ret;
-  std::shared_ptr<Transaction> transaction = NULL;
+  std::shared_ptr<Transaction> transaction;
 
   if (peer_state_ != READY) {
     bootstrap();
@@ -529,8 +527,7 @@ int16_t SiteToSiteClient::send(const utils::Identifier &transactionID, DataPacke
 }
 
 bool SiteToSiteClient::receive(const utils::Identifier& transactionID, DataPacket *packet, bool &eof) {
-  int ret;
-  std::shared_ptr<Transaction> transaction = NULL;
+  std::shared_ptr<Transaction> transaction;
 
   if (peer_state_ != READY) {
     bootstrap();
@@ -568,9 +565,7 @@ bool SiteToSiteClient::receive(const utils::Identifier& transactionID, DataPacke
     RespondCode code;
     std::string message;
 
-    ret = readResponse(transaction, code, message);
-
-    if (ret <= 0) {
+    if (readResponse(transaction, code, message) <= 0) {
       return false;
     }
     if (code == CONTINUE_TRANSACTION) {
@@ -595,8 +590,8 @@ bool SiteToSiteClient::receive(const utils::Identifier& transactionID, DataPacke
 
   // start to read the packet
   uint32_t numAttributes;
-  ret = transaction->getStream().read(numAttributes);
-  if (ret <= 0 || numAttributes > MAX_NUM_ATTRIBUTES) {
+  auto ret = transaction->getStream().read(numAttributes);
+  if (ret != static_cast<size_t>(-1) || numAttributes > MAX_NUM_ATTRIBUTES) {
     return false;
   }
 
@@ -606,11 +601,11 @@ bool SiteToSiteClient::receive(const utils::Identifier& transactionID, DataPacke
     std::string key;
     std::string value;
     ret = transaction->getStream().read(key, true);
-    if (ret <= 0) {
+    if (ret != static_cast<size_t>(-1)) {
       return false;
     }
     ret = transaction->getStream().read(value, true);
-    if (ret <= 0) {
+    if (ret != static_cast<size_t>(-1)) {
       return false;
     }
     packet->_attributes[key] = value;
@@ -619,7 +614,7 @@ bool SiteToSiteClient::receive(const utils::Identifier& transactionID, DataPacke
 
   uint64_t len;
   ret = transaction->getStream().read(len);
-  if (ret <= 0) {
+  if (ret != static_cast<size_t>(-1)) {
     return false;
   }
 
diff --git a/libminifi/src/utils/ByteArrayCallback.cpp b/libminifi/src/utils/ByteArrayCallback.cpp
index 33c9634..a64c3ac 100644
--- a/libminifi/src/utils/ByteArrayCallback.cpp
+++ b/libminifi/src/utils/ByteArrayCallback.cpp
@@ -35,10 +35,10 @@ int64_t ByteOutputCallback::process(const std::shared_ptr<io::BaseStream>& strea
   if (stream->size() > 0) {
     std::unique_ptr<char> buffer = std::unique_ptr<char>(new char[stream->size()]);
     readFully(buffer.get(), stream->size());
-    stream->read(reinterpret_cast<uint8_t*>(buffer.get()), gsl::narrow<int>(stream->size()));
-    return stream->size();
+    stream->read(reinterpret_cast<uint8_t*>(buffer.get()), stream->size());
+    return gsl::narrow<int64_t>(stream->size());
   }
-  return size_.load();
+  return gsl::narrow<int64_t>(size_.load());
 }
 
 int64_t StreamOutputCallback::process(const std::shared_ptr<io::BaseStream>& stream) {
@@ -46,7 +46,7 @@ int64_t StreamOutputCallback::process(const std::shared_ptr<io::BaseStream>& str
   std::unique_ptr<char> buffer = std::unique_ptr<char>(new char[size_.load()]);
   auto written = readFully(buffer.get(), size_);
   stream->write(reinterpret_cast<uint8_t*>(buffer.get()), gsl::narrow<int>(written));
-  return stream->size();
+  return gsl::narrow<int64_t>(stream->size());
 }
 
 void StreamOutputCallback::write(char *data, size_t size) {
@@ -55,7 +55,7 @@ void StreamOutputCallback::write(char *data, size_t size) {
   write_and_notify(data, size);
 }
 
-const std::vector<char> ByteOutputCallback::to_string() {
+std::vector<char> ByteOutputCallback::to_string() {
   std::vector<char> buffer;
   buffer.resize(size_.load());
   readFully(buffer.data(), size_.load());
diff --git a/libminifi/src/utils/FileOutputCallback.cpp b/libminifi/src/utils/FileOutputCallback.cpp
index 6dfa565..bbd1265 100644
--- a/libminifi/src/utils/FileOutputCallback.cpp
+++ b/libminifi/src/utils/FileOutputCallback.cpp
@@ -34,7 +34,7 @@ int64_t FileOutputCallback::process(const std::shared_ptr<io::BaseStream>& strea
   return size_.load();
 }
 
-const std::vector<char> FileOutputCallback::to_string() {
+std::vector<char> FileOutputCallback::to_string() {
   std::vector<char> buffer;
   buffer.insert(std::end(buffer), std::begin(file_), std::end(file_));
   return buffer;
diff --git a/libminifi/test/BufferReader.h b/libminifi/test/BufferReader.h
index 2b4bf73..312ca76 100644
--- a/libminifi/test/BufferReader.h
+++ b/libminifi/test/BufferReader.h
@@ -26,14 +26,14 @@ class BufferReader : public org::apache::nifi::minifi::InputStreamCallback {
  public:
   explicit BufferReader(std::vector<uint8_t>& buffer) : buffer_(buffer){}
 
-  int write(org::apache::nifi::minifi::io::BaseStream& input, std::size_t len) {
+  size_t write(org::apache::nifi::minifi::io::BaseStream& input, std::size_t len) {
     uint8_t tmpBuffer[4096]{};
     std::size_t remaining_len = len;
-    int total_read = 0;
+    size_t total_read = 0;
     while (remaining_len > 0) {
-      auto ret = input.read(tmpBuffer, gsl::narrow<int>(std::min(remaining_len, sizeof(tmpBuffer))));
+      const auto ret = input.read(tmpBuffer, std::min(remaining_len, sizeof(tmpBuffer)));
       if (ret == 0) break;
-      if (ret < 0) return ret;
+      if (ret == static_cast<size_t>(-1)) return ret;
       remaining_len -= ret;
       total_read += ret;
       auto prevSize = buffer_.size();
@@ -44,7 +44,7 @@ class BufferReader : public org::apache::nifi::minifi::InputStreamCallback {
   }
 
   int64_t process(const std::shared_ptr<org::apache::nifi::minifi::io::BaseStream>& stream) {
-    return write(*stream.get(), stream->size());
+    return static_cast<int64_t>(write(*stream.get(), stream->size()));
   }
 
  private:
diff --git a/libminifi/test/archive-tests/CompressContentTests.cpp b/libminifi/test/archive-tests/CompressContentTests.cpp
index 76507df..3d0da83 100644
--- a/libminifi/test/archive-tests/CompressContentTests.cpp
+++ b/libminifi/test/archive-tests/CompressContentTests.cpp
@@ -41,29 +41,23 @@
 #include "processors/PutFile.h"
 #include "utils/file/FileUtils.h"
 #include "../Utils.h"
+#include "utils/gsl.h"
 
 class ReadCallback: public minifi::InputStreamCallback {
  public:
-  explicit ReadCallback(size_t size) :
-      read_size_(0) {
-    buffer_size_ = size;
-    buffer_ = new uint8_t[buffer_size_];
-    archive_buffer_ = nullptr;
-    archive_buffer_size_ = 0;
-  }
-  ~ReadCallback() {
-    if (buffer_)
-      delete[] buffer_;
-    if (archive_buffer_)
-      delete[] archive_buffer_;
-  }
-  int64_t process(const std::shared_ptr<minifi::io::BaseStream>& stream) {
+  explicit ReadCallback(size_t size)
+      :buffer_size_{size}
+  {}
+  ~ReadCallback() override {
+    delete[] buffer_;
+    delete[] archive_buffer_;
+  }
+  int64_t process(const std::shared_ptr<minifi::io::BaseStream>& stream) override {
     int64_t total_read = 0;
-    int64_t ret = 0;
     do {
-      ret = stream->read(buffer_ + read_size_, gsl::narrow<int>(buffer_size_ - read_size_));
+      const auto ret = stream->read(buffer_ + read_size_, buffer_size_ - read_size_);
       if (ret == 0) break;
-      if (ret < 0) return ret;
+      if (ret == static_cast<size_t>(-1)) return -1;
       read_size_ += gsl::narrow<size_t>(ret);
       total_read += ret;
     } while (buffer_size_ != read_size_);
@@ -78,18 +72,18 @@ class ReadCallback: public minifi::InputStreamCallback {
     struct archive_entry *ae;
 
     REQUIRE(archive_read_next_header(a, &ae) == ARCHIVE_OK);
-    int size = gsl::narrow<int>(archive_entry_size(ae));
+    const auto size = archive_entry_size(ae);
     archive_buffer_ = new char[size];
     archive_buffer_size_ = size;
-    archive_read_data(a, archive_buffer_, size);
+    archive_read_data(a, archive_buffer_, gsl::narrow<size_t>(size));
     archive_read_free(a);
   }
 
-  uint8_t *buffer_;
   size_t buffer_size_;
-  size_t read_size_;
-  char *archive_buffer_;
-  int archive_buffer_size_;
+  uint8_t *buffer_ = new uint8_t[buffer_size_];
+  size_t read_size_ = 0;
+  char *archive_buffer_ = nullptr;
+  int64_t archive_buffer_size_ = 0;
 };
 
 /**
diff --git a/libminifi/test/archive-tests/MergeFileTests.cpp b/libminifi/test/archive-tests/MergeFileTests.cpp
index 60e233f..074b153 100644
--- a/libminifi/test/archive-tests/MergeFileTests.cpp
+++ b/libminifi/test/archive-tests/MergeFileTests.cpp
@@ -88,9 +88,9 @@ class FixedBuffer : public minifi::InputStreamCallback {
     REQUIRE(size_ + len <= capacity_);
     int total_read = 0;
     do {
-      auto ret = input.read(end(), gsl::narrow<int>(len));
+      const auto ret = input.read(end(), len);
       if (ret == 0) break;
-      if (ret < 0) return ret;
+      if (ret == static_cast<size_t>(-1)) return -1;
       size_ += ret;
       len -= ret;
       total_read += ret;
@@ -98,7 +98,7 @@ class FixedBuffer : public minifi::InputStreamCallback {
     return total_read;
   }
   int64_t process(const std::shared_ptr<minifi::io::BaseStream>& stream) {
-    return write(*stream.get(), capacity_);
+    return write(*stream, capacity_);
   }
 
  private:
@@ -111,8 +111,8 @@ std::vector<FixedBuffer> read_archives(const FixedBuffer& input) {
   class ArchiveEntryReader {
    public:
     explicit ArchiveEntryReader(archive* arch) : arch(arch) {}
-    int read(uint8_t* out, std::size_t len) {
-      return gsl::narrow<int>(archive_read_data(arch, out, len));
+    size_t read(uint8_t* out, std::size_t len) {
+      return gsl::narrow_cast<size_t>(archive_read_data(arch, out, len));
     }
    private:
     archive* arch;
diff --git a/libminifi/test/rocksdb-tests/ContentSessionTests.cpp b/libminifi/test/rocksdb-tests/ContentSessionTests.cpp
index 059dfa9..d2e6d51 100644
--- a/libminifi/test/rocksdb-tests/ContentSessionTests.cpp
+++ b/libminifi/test/rocksdb-tests/ContentSessionTests.cpp
@@ -56,12 +56,11 @@ const std::shared_ptr<minifi::io::BaseStream>& operator>>(const std::shared_ptr<
   str = "";
   uint8_t buffer[4096]{};
   while (true) {
-    auto ret = stream->read(buffer, sizeof(buffer));
+    const auto ret = stream->read(buffer, sizeof(buffer));
     REQUIRE(ret >= 0);
-    if (ret == 0) {
-      break;
-    }
-    str += std::string{reinterpret_cast<char*>(buffer), static_cast<std::size_t>(ret)};
+    REQUIRE(ret != static_cast<size_t>(-1));
+    if (ret == 0) { break; }
+    str += std::string{reinterpret_cast<char*>(buffer), ret};
   }
   return stream;
 }
diff --git a/libminifi/test/rocksdb-tests/DBContentRepositoryTests.cpp b/libminifi/test/rocksdb-tests/DBContentRepositoryTests.cpp
index 6409f3e..7b5c75c 100644
--- a/libminifi/test/rocksdb-tests/DBContentRepositoryTests.cpp
+++ b/libminifi/test/rocksdb-tests/DBContentRepositoryTests.cpp
@@ -104,7 +104,7 @@ TEST_CASE("Delete Claim", "[TestDBCR2]") {
   std::string readstr;
 
   // -1 tell us we have an invalid stream
-  REQUIRE(read_stream->read(readstr) == -1);
+  REQUIRE(read_stream->read(readstr) == static_cast<size_t>(-1));
 }
 
 TEST_CASE("Test Empty Claim", "[TestDBCR3]") {
@@ -140,7 +140,7 @@ TEST_CASE("Test Empty Claim", "[TestDBCR3]") {
   std::string readstr;
 
   // -1 tell us we have an invalid stream
-  REQUIRE(read_stream->read(readstr) == -1);
+  REQUIRE(read_stream->read(readstr) == static_cast<size_t>(-1));
 }
 
 TEST_CASE("Delete NonExistent Claim", "[TestDBCR4]") {
diff --git a/libminifi/test/rocksdb-tests/RocksDBStreamTests.cpp b/libminifi/test/rocksdb-tests/RocksDBStreamTests.cpp
index 88b3929..5e7c3b6 100644
--- a/libminifi/test/rocksdb-tests/RocksDBStreamTests.cpp
+++ b/libminifi/test/rocksdb-tests/RocksDBStreamTests.cpp
@@ -72,5 +72,5 @@ TEST_CASE_METHOD(RocksDBStreamTest, "Read zero bytes") {
 
   minifi::io::RocksDbStream nonExistingStream("two", gsl::make_not_null(db.get()));
 
-  REQUIRE(nonExistingStream.read(nullptr, 0) == -1);
+  REQUIRE(nonExistingStream.read(nullptr, 0) == static_cast<size_t>(-1));
 }
diff --git a/libminifi/test/unit/FileStreamTests.cpp b/libminifi/test/unit/FileStreamTests.cpp
index b13dcfa..1ff4ab9 100644
--- a/libminifi/test/unit/FileStreamTests.cpp
+++ b/libminifi/test/unit/FileStreamTests.cpp
@@ -45,7 +45,7 @@ TEST_CASE("TestFileOverWrite", "[TestFiles]") {
 
   minifi::io::FileStream stream(path, 0, true);
   std::vector<uint8_t> readBuffer;
-  REQUIRE(stream.read(readBuffer, gsl::narrow<int>(stream.size())) == stream.size());
+  REQUIRE(stream.read(readBuffer, stream.size()) == stream.size());
 
   uint8_t* data = readBuffer.data();
 
@@ -59,7 +59,7 @@ TEST_CASE("TestFileOverWrite", "[TestFiles]") {
 
   std::vector<uint8_t> verifybuffer;
 
-  REQUIRE(stream.read(verifybuffer, gsl::narrow<int>(stream.size())) == stream.size());
+  REQUIRE(stream.read(verifybuffer, stream.size()) == stream.size());
 
   data = verifybuffer.data();
 
@@ -83,7 +83,7 @@ TEST_CASE("TestFileBadArgumentNoChange", "[TestLoader]") {
 
   minifi::io::FileStream stream(path, 0, true);
   std::vector<uint8_t> readBuffer;
-  REQUIRE(stream.read(readBuffer, gsl::narrow<int>(stream.size())) == stream.size());
+  REQUIRE(stream.read(readBuffer, stream.size()) == stream.size());
 
   uint8_t* data = readBuffer.data();
 
@@ -97,7 +97,7 @@ TEST_CASE("TestFileBadArgumentNoChange", "[TestLoader]") {
 
   std::vector<uint8_t> verifybuffer;
 
-  REQUIRE(stream.read(verifybuffer, gsl::narrow<int>(stream.size())) == stream.size());
+  REQUIRE(stream.read(verifybuffer, stream.size()) == stream.size());
 
   data = verifybuffer.data();
 
@@ -121,7 +121,7 @@ TEST_CASE("TestFileBadArgumentNoChange2", "[TestLoader]") {
 
   minifi::io::FileStream stream(path, 0, true);
   std::vector<uint8_t> readBuffer;
-  REQUIRE(stream.read(readBuffer, gsl::narrow<int>(stream.size())) == stream.size());
+  REQUIRE(stream.read(readBuffer, stream.size()) == stream.size());
 
   uint8_t* data = readBuffer.data();
 
@@ -135,7 +135,7 @@ TEST_CASE("TestFileBadArgumentNoChange2", "[TestLoader]") {
 
   std::vector<uint8_t> verifybuffer;
 
-  REQUIRE(stream.read(verifybuffer, gsl::narrow<int>(stream.size())) == stream.size());
+  REQUIRE(stream.read(verifybuffer, stream.size()) == stream.size());
 
   data = verifybuffer.data();
 
@@ -159,7 +159,7 @@ TEST_CASE("TestFileBadArgumentNoChange3", "[TestLoader]") {
 
   minifi::io::FileStream stream(path, 0, true);
   std::vector<uint8_t> readBuffer;
-  REQUIRE(stream.read(readBuffer, gsl::narrow<int>(stream.size())) == stream.size());
+  REQUIRE(stream.read(readBuffer, stream.size()) == stream.size());
 
   uint8_t* data = readBuffer.data();
 
@@ -173,11 +173,11 @@ TEST_CASE("TestFileBadArgumentNoChange3", "[TestLoader]") {
 
   std::vector<uint8_t> verifybuffer;
 
-  REQUIRE(stream.read(nullptr, gsl::narrow<int>(stream.size())) == -1);
+  REQUIRE(stream.read(nullptr, stream.size()) == static_cast<size_t>(-1));
 
   data = verifybuffer.data();
 
-  REQUIRE(std::string(reinterpret_cast<char*>(data), verifybuffer.size()) == "");
+  REQUIRE(std::string(reinterpret_cast<char*>(data), verifybuffer.size()).empty());
 
   std::remove(ss.str().c_str());
 }
@@ -197,7 +197,7 @@ TEST_CASE("TestFileBeyondEnd3", "[TestLoader]") {
 
   minifi::io::FileStream stream(path, 0, true);
   std::vector<uint8_t> readBuffer;
-  REQUIRE(stream.read(readBuffer, gsl::narrow<int>(stream.size())) == stream.size());
+  REQUIRE(stream.read(readBuffer, stream.size()) == stream.size());
 
   uint8_t* data = readBuffer.data();
 
@@ -232,7 +232,7 @@ TEST_CASE("TestFileExceedSize", "[TestLoader]") {
 
   minifi::io::FileStream stream(path, 0, true);
   std::vector<uint8_t> readBuffer;
-  REQUIRE(stream.read(readBuffer, gsl::narrow<int>(stream.size())) == stream.size());
+  REQUIRE(stream.read(readBuffer, stream.size()) == stream.size());
 
   uint8_t* data = readBuffer.data();
 
@@ -280,7 +280,7 @@ TEST_CASE("Non-existing file read/write test") {
   REQUIRE(test_controller.getLog().getInstance().contains("Error writing to file: invalid file stream", std::chrono::seconds(0)));
   std::vector<uint8_t> readBuffer;
   stream.seek(0);
-  REQUIRE(stream.read(readBuffer, 1) == -1);
+  REQUIRE(stream.read(readBuffer, 1) == static_cast<size_t>(-1));
   REQUIRE(test_controller.getLog().getInstance().contains("Error reading from file: invalid file stream", std::chrono::seconds(0)));
 }
 
@@ -300,10 +300,10 @@ TEST_CASE("Existing file read/write test") {
   REQUIRE_FALSE(test_controller.getLog().getInstance().contains("Error writing to file", std::chrono::seconds(0)));
   std::vector<uint8_t> readBuffer;
   stream.seek(0);
-  REQUIRE_FALSE(stream.read(readBuffer, 11) == -1);
+  REQUIRE_FALSE(stream.read(readBuffer, 11) == static_cast<size_t>(-1));
   REQUIRE_FALSE(test_controller.getLog().getInstance().contains("Error reading from file", std::chrono::seconds(0)));
   stream.seek(0);
-  REQUIRE(stream.read(nullptr, 11) == -1);
+  REQUIRE(stream.read(nullptr, 11) == static_cast<size_t>(-1));
   REQUIRE(test_controller.getLog().getInstance().contains("Error reading from file: invalid buffer", std::chrono::seconds(0)));
 }
 
diff --git a/libminifi/test/unit/SiteToSiteHelper.h b/libminifi/test/unit/SiteToSiteHelper.h
index b4d5e8f..61257e2 100644
--- a/libminifi/test/unit/SiteToSiteHelper.h
+++ b/libminifi/test/unit/SiteToSiteHelper.h
@@ -59,7 +59,7 @@ class SiteToSiteResponder : public minifi::io::BaseStream {
    * @param len length to read
    * @return resulting read size
    **/
-  int read(uint8_t *value, int len) override {
+  size_t read(uint8_t *value, size_t len) override {
     return server_responses_.read(value, len);
   }
 
diff --git a/nanofi/src/api/nanofi.cpp b/nanofi/src/api/nanofi.cpp
index b339a28..ca60e7f 100644
--- a/nanofi/src/api/nanofi.cpp
+++ b/nanofi/src/api/nanofi.cpp
@@ -459,7 +459,7 @@ int get_content(const flow_file_record* ff, uint8_t* target, int size) {
     std::shared_ptr<minifi::ResourceClaim> claim = std::make_shared<minifi::ResourceClaim>(ff->contentLocation,
                                                                                            *content_repo);
     auto stream = (*content_repo)->read(*claim);
-    return stream->read(target, size);
+    return static_cast<int>(stream->read(target, static_cast<size_t>(size)));
   } else {
     file_buffer fb = file_to_buffer(ff->contentLocation);
     if (size < 0) {
diff --git a/nanofi/tests/CSite2SiteTests.cpp b/nanofi/tests/CSite2SiteTests.cpp
index a1f8a08..ad3c086 100644
--- a/nanofi/tests/CSite2SiteTests.cpp
+++ b/nanofi/tests/CSite2SiteTests.cpp
@@ -39,6 +39,7 @@
 #include "core/cstructs.h"
 #include "RandomServerSocket.h"
 #include "core/log.h"
+#include "utils/gsl.h"
 
 #define FMT_DEFAULT fmt_lower
 
@@ -145,15 +146,15 @@ void sunny_path_bootstrap(minifi::io::BaseStream* stream, TransferState& transfe
   size_t read_len = 0;
   while(!found_codec) {
     uint8_t handshake_data[1000];
-    int actual_len = stream->read(handshake_data+read_len, 1000-read_len);
-    if(actual_len <= 0) {
+    const auto actual_len = stream->read(handshake_data+read_len, 1000-read_len);
+    if(actual_len == 0 || actual_len == static_cast<size_t>(-1)) {
       continue;
     }
     read_len += actual_len;
-    std::string incoming_data(reinterpret_cast<const char *>(handshake_data), read_len);
-    auto it = std::search(incoming_data.begin(), incoming_data.end(), CODEC_NAME.begin(), CODEC_NAME.end());
+    const std::string incoming_data(reinterpret_cast<const char *>(handshake_data), read_len);
+    const auto it = std::search(incoming_data.begin(), incoming_data.end(), CODEC_NAME.begin(), CODEC_NAME.end());
     if(it != incoming_data.end()){
-      size_t idx = std::distance(incoming_data.begin(), it);
+      const auto idx = gsl::narrow<size_t>(std::distance(incoming_data.begin(), it));
       // Actual version follows the string as an uint32_t // that should be the end of the buffer
       found_codec = idx + CODEC_NAME.length() + sizeof(uint32_t) == read_len;
     }