You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by sz...@apache.org on 2021/05/26 12:46:08 UTC

[nifi-minifi-cpp] branch MINIFICPP-1507 created (now ea2b397)

This is an automated email from the ASF dual-hosted git repository.

szaszm pushed a change to branch MINIFICPP-1507
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git.


      at ea2b397  mqtt: defer casting read result to int

This branch includes the following new commits:

     new 5c41fff  convert InputStream::read to size_t
     new ee2ac71  fix s2s issues
     new a4b439c  fix compilation on mac
     new 4c01eea  fix compilation on some platforms with size_t not the same as uint64_t
     new d4c4d80  fix compilation on some platforms with size_t not the same as uint64_t 2
     new e409070  Update Provenance.h: handle uuid read errors
     new f177e5d  review feedback, fix linter script on mac
     new cf5c879  introduce isError, fix a few issues
     new c035ad4  uint64_t -> size_t is narrowing on windows
     new 722180e  various review feedback-related changes
     new 3010f2b  fix issues
     new 6753eb1  edited the wrong PutSQL.cpp
     new 4fc6ef1  avoid undefined behavior (std::string(nullptr)) in LuaBaseStream::read
     new 54a48a9  fix warnings that caused CI failures
     new e8df736  Update extensions/libarchive/CompressContent.h
     new efd9dc6  fix various issues
     new ea2b397  mqtt: defer casting read result to int

The 17 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


[nifi-minifi-cpp] 08/17: introduce isError, fix a few issues

Posted by sz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

szaszm pushed a commit to branch MINIFICPP-1507
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit cf5c879d8326b8721e358873f51260f476a093e7
Author: Marton Szasz <sz...@gmail.com>
AuthorDate: Wed Mar 17 15:27:44 2021 +0100

    introduce isError, fix a few issues
---
 extensions/aws/processors/PutS3Object.h            |  2 +-
 extensions/libarchive/CompressContent.h            |  6 ++--
 extensions/libarchive/FocusArchiveEntry.cpp        |  2 +-
 extensions/librdkafka/PublishKafka.cpp             |  2 +-
 extensions/opc/src/putopc.cpp                      |  2 +-
 extensions/script/lua/LuaBaseStream.cpp            | 13 +++-----
 extensions/sftp/client/SFTPClient.cpp              |  2 +-
 .../processors/ExecuteProcess.cpp                  | 16 +++++-----
 .../standard-processors/processors/ExtractText.cpp |  2 +-
 .../standard-processors/processors/GetTCP.cpp      | 12 ++++----
 .../standard-processors/processors/PutFile.cpp     |  2 +-
 extensions/tensorflow/TFApplyGraph.cpp             | 20 +++++-------
 extensions/tensorflow/TFConvertImageToTensor.cpp   |  6 +---
 extensions/tensorflow/TFExtractTopLabels.cpp       | 18 +++++------
 libminifi/include/io/CRCStream.h                   |  2 +-
 libminifi/include/io/InputStream.h                 | 24 +++++++--------
 libminifi/include/io/Stream.h                      |  9 ++++++
 libminifi/include/io/StreamPipe.h                  |  2 +-
 libminifi/include/provenance/Provenance.h          |  2 +-
 libminifi/include/sitetosite/SiteToSiteClient.h    |  2 +-
 libminifi/src/FlowControlProtocol.cpp              | 22 ++++++-------
 libminifi/src/FlowFileRecord.cpp                   | 10 +++---
 libminifi/src/c2/ControllerSocketProtocol.cpp      | 32 ++++++++++---------
 libminifi/src/core/ProcessSessionReadCallback.cpp  |  2 +-
 libminifi/src/io/InputStream.cpp                   |  4 +--
 libminifi/src/provenance/Provenance.cpp            | 28 ++++++++---------
 libminifi/src/sitetosite/RawSocketProtocol.cpp     | 20 ++++++------
 libminifi/src/sitetosite/SiteToSiteClient.cpp      | 36 +++++++++++++---------
 libminifi/test/BufferReader.h                      |  2 +-
 .../test/archive-tests/CompressContentTests.cpp    |  2 +-
 libminifi/test/archive-tests/MergeFileTests.cpp    |  5 +--
 .../test/rocksdb-tests/ContentSessionTests.cpp     |  2 +-
 .../rocksdb-tests/DBContentRepositoryTests.cpp     |  6 ++--
 .../test/rocksdb-tests/RocksDBStreamTests.cpp      |  2 +-
 libminifi/test/unit/FileStreamTests.cpp            |  8 ++---
 nanofi/tests/CSite2SiteTests.cpp                   |  2 +-
 thirdparty/google-styleguide/run_linter.sh         |  9 +++---
 37 files changed, 171 insertions(+), 167 deletions(-)

diff --git a/extensions/aws/processors/PutS3Object.h b/extensions/aws/processors/PutS3Object.h
index e55ac1e..6cb8618 100644
--- a/extensions/aws/processors/PutS3Object.h
+++ b/extensions/aws/processors/PutS3Object.h
@@ -97,7 +97,7 @@ class PutS3Object : public S3Processor {
       while (read_size_ < flow_size_) {
         const auto next_read_size = (std::min)(flow_size_ - read_size_, BUFFER_SIZE);
         const auto read_ret = stream->read(buffer.data(), next_read_size);
-        if (read_ret == static_cast<size_t>(-1)) {
+        if (io::isError(read_ret)) {
           return -1;
         }
         if (read_ret > 0) {
diff --git a/extensions/libarchive/CompressContent.h b/extensions/libarchive/CompressContent.h
index df7fc75..f5e44fa 100644
--- a/extensions/libarchive/CompressContent.h
+++ b/extensions/libarchive/CompressContent.h
@@ -109,7 +109,7 @@ public:
       }
       while (read_size < flow_->getSize()) {
         const auto readret = stream->read(buffer, sizeof(buffer));
-        if (readret == static_cast<size_t>(-1)) {
+        if (io::isError(readret)) {
           status_ = -1;
           return -1;
         }
@@ -145,7 +145,7 @@ public:
       stream->seek(offset_);
       const auto readRet = stream->read(buffer_, sizeof(buffer_));
       read_size_ = readRet;
-      if (readRet != static_cast<size_t>(-1)) {
+      if (!io::isError(readRet)) {
         offset_ += readRet;
       }
       return gsl::narrow<int64_t>(readRet);
@@ -383,7 +383,7 @@ public:
           int64_t read_size = 0;
           while (read_size < gsl::narrow<int64_t>(writer_.flow_->getSize())) {
             const auto ret = inputStream->read(buffer.data(), buffer.size());
-            if (ret == static_cast<size_t>(-1)) {
+            if (io::isError(ret)) {
               return -1;
             } else if (ret == 0) {
               break;
diff --git a/extensions/libarchive/FocusArchiveEntry.cpp b/extensions/libarchive/FocusArchiveEntry.cpp
index 9452dce..ea709bf 100644
--- a/extensions/libarchive/FocusArchiveEntry.cpp
+++ b/extensions/libarchive/FocusArchiveEntry.cpp
@@ -157,7 +157,7 @@ la_ssize_t FocusArchiveEntry::ReadCallback::read_cb(struct archive * a, void *d,
   do {
     last_read = data->stream->read(reinterpret_cast<uint8_t *>(data->buf), 8196 - read);
     read += last_read;
-  } while (data->processor->isRunning() && last_read > 0 && last_read != static_cast<size_t>(-1) && read < 8196);
+  } while (data->processor->isRunning() && last_read > 0 && !io::isError(last_read) && read < 8196);
 
   if (!data->processor->isRunning()) {
     archive_set_error(a, EINTR, "Processor shut down during read");
diff --git a/extensions/librdkafka/PublishKafka.cpp b/extensions/librdkafka/PublishKafka.cpp
index bbb66a8..dd7bac8 100644
--- a/extensions/librdkafka/PublishKafka.cpp
+++ b/extensions/librdkafka/PublishKafka.cpp
@@ -410,7 +410,7 @@ class ReadCallback : public InputStreamCallback {
 
     for (size_t segment_num = 0; read_size_ < flow_size_; ++segment_num) {
       const auto readRet = stream->read(buffer.data(), buffer.size());
-      if (readRet == static_cast<size_t>(-1)) {
+      if (io::isError(readRet)) {
         status_ = -1;
         error_ = "Failed to read from stream";
         return read_size_;
diff --git a/extensions/opc/src/putopc.cpp b/extensions/opc/src/putopc.cpp
index e613d1b..daf0242 100644
--- a/extensions/opc/src/putopc.cpp
+++ b/extensions/opc/src/putopc.cpp
@@ -429,7 +429,7 @@ namespace processors {
 
     do {
       const auto read = stream->read(buf_.data() + size, 1024);
-      if (read == static_cast<size_t>(-1)) return -1;
+      if (io::isError(read)) return -1;
       if (read == 0) break;
       size += read;
     } while (size < stream->size());
diff --git a/extensions/script/lua/LuaBaseStream.cpp b/extensions/script/lua/LuaBaseStream.cpp
index 1387fae..883a13c 100644
--- a/extensions/script/lua/LuaBaseStream.cpp
+++ b/extensions/script/lua/LuaBaseStream.cpp
@@ -52,16 +52,11 @@ std::string LuaBaseStream::read(size_t len) {
   //     0 <= n < s.size()."
   //
   // http://www.open-std.org/jtc1/sc22/wg21/docs/papers/2012/n3337.pdf
-  auto read = stream_->read(reinterpret_cast<uint8_t *>(&buffer[0]), static_cast<int>(len));
-  if (read < 0) {
-    return nullptr;
-  }
-
-  if (gsl::narrow<size_t>(read) != len) {
-    buffer.resize(gsl::narrow<size_t>(read));
+  const auto read = stream_->read(reinterpret_cast<uint8_t *>(&buffer[0]), len);
+  if (!io::isError(read) && read != len) {
+    buffer.resize(read);
   }
-
-  return buffer;
+  return io::isError(read) ? std::string{} : buffer;
 }
 
 size_t LuaBaseStream::write(std::string buf) {
diff --git a/extensions/sftp/client/SFTPClient.cpp b/extensions/sftp/client/SFTPClient.cpp
index 1e76cb8..7af066d 100644
--- a/extensions/sftp/client/SFTPClient.cpp
+++ b/extensions/sftp/client/SFTPClient.cpp
@@ -567,7 +567,7 @@ bool SFTPClient::putFile(const std::string& path, io::BaseStream& input, bool ov
   uint64_t total_read = 0U;
   do {
     const auto read_ret = input.read(buf.data(), buf.size());
-    if (read_ret == static_cast<size_t>(-1)) {
+    if (io::isError(read_ret)) {
       last_error_.setLibssh2Error(LIBSSH2_FX_OK);
       logger_->log_error("Error while reading input");
       return false;
diff --git a/extensions/standard-processors/processors/ExecuteProcess.cpp b/extensions/standard-processors/processors/ExecuteProcess.cpp
index 1dac5c6..cc9212d 100644
--- a/extensions/standard-processors/processors/ExecuteProcess.cpp
+++ b/extensions/standard-processors/processors/ExecuteProcess.cpp
@@ -160,11 +160,11 @@ void ExecuteProcess::onTrigger(core::ProcessContext *context, core::ProcessSessi
           while (1) {
             std::this_thread::sleep_for(std::chrono::milliseconds(_batchDuration));
             char buffer[4096];
-            int numRead = read(_pipefd[0], buffer, sizeof(buffer));
+            const auto  numRead = read(_pipefd[0], buffer, sizeof(buffer));
             if (numRead <= 0)
               break;
-            logger_->log_debug("Execute Command Respond %d", numRead);
-            ExecuteProcess::WriteCallback callback(buffer, numRead);
+            logger_->log_debug("Execute Command Respond %zd", numRead);
+            ExecuteProcess::WriteCallback callback(buffer, gsl::narrow<uint64_t>(numRead));
             auto flowFile = session->create();
             if (!flowFile)
               continue;
@@ -177,13 +177,13 @@ void ExecuteProcess::onTrigger(core::ProcessContext *context, core::ProcessSessi
         } else {
           char buffer[4096];
           char *bufPtr = buffer;
-          int totalRead = 0;
+          size_t totalRead = 0;
           std::shared_ptr<core::FlowFile> flowFile = nullptr;
           while (true) {
-            int numRead = read(_pipefd[0], bufPtr, (sizeof(buffer) - totalRead));
+            const auto numRead = read(_pipefd[0], bufPtr, (sizeof(buffer) - totalRead));
             if (numRead <= 0) {
               if (totalRead > 0) {
-                logger_->log_debug("Execute Command Respond %d", totalRead);
+                logger_->log_debug("Execute Command Respond %zu", totalRead);
                 // child exits and close the pipe
                 ExecuteProcess::WriteCallback callback(buffer, totalRead);
                 if (!flowFile) {
@@ -200,9 +200,9 @@ void ExecuteProcess::onTrigger(core::ProcessContext *context, core::ProcessSessi
               }
               break;
             } else {
-              if (numRead == static_cast<int>((sizeof(buffer) - totalRead))) {
+              if (numRead == static_cast<ssize_t>((sizeof(buffer) - totalRead))) {
                 // we reach the max buffer size
-                logger_->log_debug("Execute Command Max Respond %d", sizeof(buffer));
+                logger_->log_debug("Execute Command Max Respond %zu", sizeof(buffer));
                 ExecuteProcess::WriteCallback callback(buffer, sizeof(buffer));
                 if (!flowFile) {
                   flowFile = session->create();
diff --git a/extensions/standard-processors/processors/ExtractText.cpp b/extensions/standard-processors/processors/ExtractText.cpp
index 0000b84..9774cf8 100644
--- a/extensions/standard-processors/processors/ExtractText.cpp
+++ b/extensions/standard-processors/processors/ExtractText.cpp
@@ -134,7 +134,7 @@ int64_t ExtractText::ReadCallback::process(const std::shared_ptr<io::BaseStream>
     const auto length = std::min(size_limit - read_size, buffer_.size());
     const auto ret = stream->read(buffer_, length);
 
-    if (ret == static_cast<size_t>(-1)) {
+    if (io::isError(ret)) {
       return -1;  // Stream error
     } else if (ret == 0) {
       break;  // End of stream, no more data
diff --git a/extensions/standard-processors/processors/GetTCP.cpp b/extensions/standard-processors/processors/GetTCP.cpp
index e55967f..cbbe535 100644
--- a/extensions/standard-processors/processors/GetTCP.cpp
+++ b/extensions/standard-processors/processors/GetTCP.cpp
@@ -167,12 +167,12 @@ void GetTCP::onSchedule(const std::shared_ptr<core::ProcessContext> &context, co
       do {
         if ( socket_ring_buffer_.try_dequeue(socket_ptr) ) {
           buffer.resize(receive_buffer_size_);
-          int size_read = socket_ptr->read(buffer.data(), gsl::narrow<int>(receive_buffer_size_), false);
-          if (size_read >= 0) {
-            if (size_read > 0) {
+          const auto size_read = socket_ptr->read(buffer.data(), receive_buffer_size_, false);
+          if (size_read >= 0 && !io::isError(size_read)) {
+            if (size_read > 0 && !io::isError(size_read)) {
               // determine cut location
-              int startLoc = 0, i = 0;
-              for (; i < size_read; i++) {
+              size_t startLoc = 0;
+              for (size_t i = 0; i < size_read; i++) {
                 if (buffer.at(i) == endOfMessageByte && i > 0) {
                   if (i-startLoc > 0) {
                     handler_->handle(socket_ptr->getHostname(), buffer.data()+startLoc, (i-startLoc), true);
@@ -194,7 +194,7 @@ void GetTCP::onSchedule(const std::shared_ptr<core::ProcessContext> &context, co
               reconnects = 0;
             }
             socket_ring_buffer_.enqueue(std::move(socket_ptr));
-          } else if (size_read == -2 && stay_connected_) {
+          } else if (size_read == static_cast<size_t>(-2) && stay_connected_) {
             if (++reconnects > connection_attempt_limit_) {
               logger_->log_info("Too many reconnects, exiting thread");
               socket_ptr->close();
diff --git a/extensions/standard-processors/processors/PutFile.cpp b/extensions/standard-processors/processors/PutFile.cpp
index 2060867..db60c89 100644
--- a/extensions/standard-processors/processors/PutFile.cpp
+++ b/extensions/standard-processors/processors/PutFile.cpp
@@ -322,7 +322,7 @@ int64_t PutFile::ReadCallback::process(const std::shared_ptr<io::BaseStream>& st
 
   do {
     const auto read = stream->read(buffer, 1024);
-    if (read == static_cast<size_t>(-1)) return -1;
+    if (io::isError(read)) return -1;
     if (read == 0) break;
     tmp_file_os.write(reinterpret_cast<char *>(buffer), gsl::narrow<std::streamsize>(read));
     size += read;
diff --git a/extensions/tensorflow/TFApplyGraph.cpp b/extensions/tensorflow/TFApplyGraph.cpp
index 4419caa..14d349e 100644
--- a/extensions/tensorflow/TFApplyGraph.cpp
+++ b/extensions/tensorflow/TFApplyGraph.cpp
@@ -16,10 +16,12 @@
  */
 
 #include "TFApplyGraph.h"
-#include <core/ProcessContext.h>
-#include <core/ProcessSession.h>
 #include <tensorflow/cc/ops/standard_ops.h>
 
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+#include "utils/gsl.h"
+
 namespace org {
 namespace apache {
 namespace nifi {
@@ -191,29 +193,23 @@ void TFApplyGraph::onTrigger(const std::shared_ptr<core::ProcessContext> &contex
 int64_t TFApplyGraph::GraphReadCallback::process(const std::shared_ptr<io::BaseStream>& stream) {
   std::string graph_proto_buf;
   graph_proto_buf.resize(stream->size());
-  auto num_read = stream->read(reinterpret_cast<uint8_t *>(&graph_proto_buf[0]),
-                                   static_cast<int>(stream->size()));
-
+  const auto num_read = stream->read(reinterpret_cast<uint8_t *>(&graph_proto_buf[0]), stream->size());
   if (num_read != stream->size()) {
     throw std::runtime_error("GraphReadCallback failed to fully read flow file input stream");
   }
-
   graph_def_->ParseFromString(graph_proto_buf);
-  return num_read;
+  return gsl::narrow<int64_t>(num_read);
 }
 
 int64_t TFApplyGraph::TensorReadCallback::process(const std::shared_ptr<io::BaseStream>& stream) {
   std::string tensor_proto_buf;
   tensor_proto_buf.resize(stream->size());
-  auto num_read = stream->read(reinterpret_cast<uint8_t *>(&tensor_proto_buf[0]),
-                                   static_cast<int>(stream->size()));
-
+  const auto num_read = stream->read(reinterpret_cast<uint8_t *>(&tensor_proto_buf[0]), stream->size());
   if (num_read != stream->size()) {
     throw std::runtime_error("TensorReadCallback failed to fully read flow file input stream");
   }
-
   tensor_proto_->ParseFromString(tensor_proto_buf);
-  return num_read;
+  return gsl::narrow<int64_t>(num_read);
 }
 
 int64_t TFApplyGraph::TensorWriteCallback::process(const std::shared_ptr<io::BaseStream>& stream) {
diff --git a/extensions/tensorflow/TFConvertImageToTensor.cpp b/extensions/tensorflow/TFConvertImageToTensor.cpp
index 5f94548..aea09f8 100644
--- a/extensions/tensorflow/TFConvertImageToTensor.cpp
+++ b/extensions/tensorflow/TFConvertImageToTensor.cpp
@@ -321,14 +321,10 @@ int64_t TFConvertImageToTensor::ImageReadCallback::process(const std::shared_ptr
   if (tensor_->AllocatedBytes() < stream->size()) {
     throw std::runtime_error("Tensor is not big enough to hold FlowFile bytes");
   }
-
-  auto num_read = stream->read(tensor_->flat<unsigned char>().data(),
-                                   static_cast<int>(stream->size()));
-
+  const auto num_read = stream->read(tensor_->flat<unsigned char>().data(), stream->size());
   if (num_read != stream->size()) {
     throw std::runtime_error("TensorReadCallback failed to fully read flow file input stream");
   }
-
   return num_read;
 }
 
diff --git a/extensions/tensorflow/TFExtractTopLabels.cpp b/extensions/tensorflow/TFExtractTopLabels.cpp
index f0853c1..b73a3e5 100644
--- a/extensions/tensorflow/TFExtractTopLabels.cpp
+++ b/extensions/tensorflow/TFExtractTopLabels.cpp
@@ -19,6 +19,8 @@
 
 #include "tensorflow/cc/ops/standard_ops.h"
 
+#include "utils/gsl.h"
+
 namespace org {
 namespace apache {
 namespace nifi {
@@ -124,7 +126,7 @@ void TFExtractTopLabels::onTrigger(const std::shared_ptr<core::ProcessContext> &
 }
 
 int64_t TFExtractTopLabels::LabelsReadCallback::process(const std::shared_ptr<io::BaseStream>& stream) {
-  int64_t total_read = 0;
+  size_t total_read = 0;
   std::string label;
   uint64_t max_label_len = 65536;
   label.resize(max_label_len);
@@ -134,9 +136,8 @@ int64_t TFExtractTopLabels::LabelsReadCallback::process(const std::shared_ptr<io
   buf.resize(buf_size);
 
   while (total_read < stream->size()) {
-    auto read = stream->read(reinterpret_cast<uint8_t *>(&buf[0]), static_cast<int>(buf_size));
-
-    for (auto i = 0; i < read; i++) {
+    const auto read = stream->read(reinterpret_cast<uint8_t *>(&buf[0]), buf_size);
+    for (size_t i = 0; i < read; i++) {
       if (buf[i] == '\n' || total_read + i == stream->size()) {
         labels_->emplace_back(label.substr(0, label_size));
         label_size = 0;
@@ -149,21 +150,18 @@ int64_t TFExtractTopLabels::LabelsReadCallback::process(const std::shared_ptr<io
     total_read += read;
   }
 
-  return total_read;
+  return gsl::narrow<int64_t>(total_read);
 }
 
 int64_t TFExtractTopLabels::TensorReadCallback::process(const std::shared_ptr<io::BaseStream>& stream) {
   std::string tensor_proto_buf;
   tensor_proto_buf.resize(stream->size());
-  auto num_read = stream->read(reinterpret_cast<uint8_t *>(&tensor_proto_buf[0]),
-                                   static_cast<int>(stream->size()));
-
+  const auto num_read = stream->read(reinterpret_cast<uint8_t *>(&tensor_proto_buf[0]), stream->size());
   if (num_read != stream->size()) {
     throw std::runtime_error("TensorReadCallback failed to fully read flow file input stream");
   }
-
   tensor_proto_->ParseFromString(tensor_proto_buf);
-  return num_read;
+  return gsl::narrow<int64_t>(num_read);
 }
 
 } /* namespace processors */
diff --git a/libminifi/include/io/CRCStream.h b/libminifi/include/io/CRCStream.h
index abe0762..b6ae751 100644
--- a/libminifi/include/io/CRCStream.h
+++ b/libminifi/include/io/CRCStream.h
@@ -86,7 +86,7 @@ class InputCRCStream : public virtual CRCStreamBase<StreamType>, public InputStr
 
   size_t read(uint8_t *buf, size_t buflen) override {
     const auto ret = child_stream_->read(buf, buflen);
-    if (ret > 0 && ret != static_cast<size_t>(-1)) {
+    if (ret > 0 && !io::isError(ret)) {
       crc_ = crc32(crc_, buf, ret);
     }
     return ret;
diff --git a/libminifi/include/io/InputStream.h b/libminifi/include/io/InputStream.h
index cebf266..4360acc 100644
--- a/libminifi/include/io/InputStream.h
+++ b/libminifi/include/io/InputStream.h
@@ -36,41 +36,41 @@ class InputStream : public virtual Stream {
     throw std::runtime_error("Querying size is not supported");
   }
   /**
-   * reads a byte array from the stream
+   * Reads a byte array from the stream. Use isError (Stream.h) to check for errors.
    * @param value reference in which will set the result
    * @param len length to read
-   * @return resulting read size or static_cast<size_t>(-1) on error or static_cast<size_t>(-2) for ClientSocket EAGAIN
+   * @return resulting read size or static_cast<size_t>(-1) on error or static_cast<size_t>(-2) on EAGAIN
    **/
   virtual size_t read(uint8_t *value, size_t len) = 0;
 
   size_t read(std::vector<uint8_t>& buffer, size_t len);
 
   /**
-   * read string from stream
+   * Read string from stream. Use isError (Stream.h) to check for errors.
    * @param str reference string
-   * @return resulting read size
+   * @return resulting read size or static_cast<size_t>(-1) on error or static_cast<size_t>(-2) on EAGAIN
    **/
   size_t read(std::string &str, bool widen = false);
 
   /**
-   * read a bool from stream
+   * Read a bool from stream. Use isError (Stream.h) to check for errors.
    * @param value reference to the output
-   * @return resulting read size
+   * @return resulting read size or static_cast<size_t>(-1) on error or static_cast<size_t>(-2) on EAGAIN
    **/
   size_t read(bool& value);
 
   /**
-   * read a uuid from stream
+   * Read a uuid from stream. Use isError (Stream.h) to check for errors.
    * @param value reference to the output
-   * @return resulting read size
+   * @return resulting read size or static_cast<size_t>(-1) on error or static_cast<size_t>(-2) on EAGAIN
    **/
   size_t read(utils::Identifier& value);
 
   /**
-  * reads sizeof(Integral) bytes from the stream
-  * @param value reference in which will set the result
-  * @return resulting read size
-  **/
+   * Reads sizeof(Integral) bytes from the stream. Use isError (Stream.h) to check for errors.
+   * @param value reference in which will set the result
+   * @return resulting read size or static_cast<size_t>(-1) on error or static_cast<size_t>(-2) on EAGAIN
+   **/
   template<typename Integral, typename = std::enable_if<std::is_unsigned<Integral>::value && !std::is_same<Integral, bool>::value>>
   size_t read(Integral& value) {
     uint8_t buf[sizeof(Integral)]{};
diff --git a/libminifi/include/io/Stream.h b/libminifi/include/io/Stream.h
index cb528e5..8cbff10 100644
--- a/libminifi/include/io/Stream.h
+++ b/libminifi/include/io/Stream.h
@@ -24,6 +24,15 @@ namespace nifi {
 namespace minifi {
 namespace io {
 
+inline bool isError(const size_t read_return) noexcept {
+  return read_return == static_cast<size_t>(-1)   // general error
+      || read_return == static_cast<size_t>(-2);  // Socket EAGAIN, to be refactored to eliminate this error condition
+}
+
+inline bool isError(const int write_return) noexcept {
+  return write_return == -1;
+}
+
 /**
  * All streams serialize/deserialize in big-endian
  */
diff --git a/libminifi/include/io/StreamPipe.h b/libminifi/include/io/StreamPipe.h
index 8c88568..abac015 100644
--- a/libminifi/include/io/StreamPipe.h
+++ b/libminifi/include/io/StreamPipe.h
@@ -50,7 +50,7 @@ inline int64_t pipe(const std::shared_ptr<io::InputStream>& src, const std::shar
   int64_t totalTransferred = 0;
   while (true) {
     const auto readRet = src->read(buffer, sizeof(buffer));
-    if (readRet == static_cast<size_t>(-1)) return -1;
+    if (io::isError(readRet)) return -1;
     if (readRet == 0) break;
     auto remaining = readRet;
     int transferred = 0;
diff --git a/libminifi/include/provenance/Provenance.h b/libminifi/include/provenance/Provenance.h
index c2fd38c..ed1cadc 100644
--- a/libminifi/include/provenance/Provenance.h
+++ b/libminifi/include/provenance/Provenance.h
@@ -367,7 +367,7 @@ class ProvenanceEventRecord : public core::SerializableComponent {
 
     std::string uuid;
     const auto uuidret = outStream.read(uuid);
-    if (uuidret == 0 || uuidret == static_cast<size_t>(-1)) {
+    if (uuidret == 0 || io::isError(uuidret)) {
       return 0;
     }
 
diff --git a/libminifi/include/sitetosite/SiteToSiteClient.h b/libminifi/include/sitetosite/SiteToSiteClient.h
index 36c3d96..b372ca3 100644
--- a/libminifi/include/sitetosite/SiteToSiteClient.h
+++ b/libminifi/include/sitetosite/SiteToSiteClient.h
@@ -306,7 +306,7 @@ class ReadCallback : public InputStreamCallback {
     do {
       const auto readSize = stream->read(buffer, 8192);
       if (readSize == 0) break;
-      if (readSize == static_cast<size_t>(-1)) return -1;
+      if (io::isError(readSize)) return -1;
       const auto ret = _packet->transaction_->getStream().write(buffer, readSize);
       if (ret != readSize) {
         logging::LOG_INFO(_packet->logger_reference_) << "Site2Site Send Flow Size " << readSize << " Failed " << ret;
diff --git a/libminifi/src/FlowControlProtocol.cpp b/libminifi/src/FlowControlProtocol.cpp
index 8d8ad4a..79334c2 100644
--- a/libminifi/src/FlowControlProtocol.cpp
+++ b/libminifi/src/FlowControlProtocol.cpp
@@ -71,26 +71,26 @@ int FlowControlProtocol::selectClient(int msec) {
 }
 
 int FlowControlProtocol::readData(uint8_t *buf, int buflen) {
+  gsl_Expects(buflen >= 0);
   int sendSize = buflen;
 
   while (buflen) {
-    int status;
-    status = selectClient(MAX_READ_TIMEOUT);
-    if (status <= 0) {
-      return status;
+    const auto selectstatus = selectClient(MAX_READ_TIMEOUT);
+    if (selectstatus <= 0) {
+      return selectstatus;
     }
 #ifdef WIN32
-    status = _read(_socket, buf, buflen);
+    const auto readstatus = _read(_socket, buf, buflen);
 #elif !defined(__MACH__)
-    status = read(_socket, buf, buflen);
+    const auto readstatus = read(_socket, buf, gsl::narrow<size_t>(buflen));
 #else
-    status = recv(_socket, buf, buflen, 0);
+    const auto readstatus = recv(_socket, buf, buflen, 0);
 #endif
-    if (status <= 0) {
-      return status;
+    if (readstatus <= 0) {
+      return gsl::narrow<int>(readstatus);
     }
-    buflen -= status;
-    buf += status;
+    buflen -= readstatus;
+    buf += readstatus;
   }
 
   return sendSize;
diff --git a/libminifi/src/FlowFileRecord.cpp b/libminifi/src/FlowFileRecord.cpp
index f7b1638..89096a9 100644
--- a/libminifi/src/FlowFileRecord.cpp
+++ b/libminifi/src/FlowFileRecord.cpp
@@ -186,14 +186,14 @@ std::shared_ptr<FlowFileRecord> FlowFileRecord::DeSerialize(io::InputStream& inS
 
   {
     const auto ret = inStream.read(file->uuid_);
-    if (ret == 0 || ret == static_cast<size_t>(-1)) {
+    if (ret == 0 || io::isError(ret)) {
       return {};
     }
   }
 
   {
     const auto ret = inStream.read(container);
-    if (ret == 0 || ret == static_cast<size_t>(-1)) {
+    if (ret == 0 || io::isError(ret)) {
       return {};
     }
   }
@@ -211,14 +211,14 @@ std::shared_ptr<FlowFileRecord> FlowFileRecord::DeSerialize(io::InputStream& inS
     std::string key;
     {
       const auto ret = inStream.read(key, true);
-      if (ret == 0 || ret == static_cast<size_t>(-1)) {
+      if (ret == 0 || io::isError(ret)) {
         return {};
       }
     }
     std::string value;
     {
       const auto ret = inStream.read(value, true);
-      if (ret == 0 || ret == static_cast<size_t>(-1)) {
+      if (ret == 0 || io::isError(ret)) {
         return {};
       }
     }
@@ -228,7 +228,7 @@ std::shared_ptr<FlowFileRecord> FlowFileRecord::DeSerialize(io::InputStream& inS
   std::string content_full_path;
   {
     const auto ret = inStream.read(content_full_path);
-    if (ret == 0 || ret == static_cast<size_t>(-1)) {
+    if (ret == 0 || io::isError(ret)) {
       return {};
     }
   }
diff --git a/libminifi/src/c2/ControllerSocketProtocol.cpp b/libminifi/src/c2/ControllerSocketProtocol.cpp
index 73b74ff..d441182 100644
--- a/libminifi/src/c2/ControllerSocketProtocol.cpp
+++ b/libminifi/src/c2/ControllerSocketProtocol.cpp
@@ -102,7 +102,7 @@ void ControllerSocketProtocol::initialize(core::controller::ControllerServicePro
         {
           std::string componentStr;
           const auto size = stream->read(componentStr);
-          if ( size != static_cast<size_t>(-1) ) {
+          if (!io::isError(size)) {
             auto components = update_sink_->getComponents(componentStr);
             for (const auto& component : components) {
               component->start();
@@ -116,7 +116,7 @@ void ControllerSocketProtocol::initialize(core::controller::ControllerServicePro
         {
           std::string componentStr;
           const auto size = stream->read(componentStr);
-          if ( size != static_cast<size_t>(-1) ) {
+          if (!io::isError(size)) {
             auto components = update_sink_->getComponents(componentStr);
             for (const auto& component : components) {
               component->stop();
@@ -130,7 +130,7 @@ void ControllerSocketProtocol::initialize(core::controller::ControllerServicePro
         {
           std::string connection;
           const auto size = stream->read(connection);
-          if ( size != static_cast<size_t>(-1) ) {
+          if (!io::isError(size)) {
             update_sink_->clearConnection(connection);
           }
         }
@@ -138,21 +138,25 @@ void ControllerSocketProtocol::initialize(core::controller::ControllerServicePro
         case Operation::UPDATE:
         {
           std::string what;
-          const auto size = stream->read(what);
-          if (size == static_cast<size_t>(-1)) {
-            logger_->log_debug("Connection broke");
-            break;
+          {
+            const auto size = stream->read(what);
+            if (io::isError(size)) {
+              logger_->log_debug("Connection broke");
+              break;
+            }
           }
           if (what == "flow") {
             std::string ff_loc;
-            const auto size = stream->read(ff_loc);
+            {
+              const auto size = stream->read(ff_loc);
+              if (io::isError(size)) {
+                logger_->log_debug("Connection broke");
+                break;
+              }
+            }
             std::ifstream tf(ff_loc);
             std::string configuration((std::istreambuf_iterator<char>(tf)),
                 std::istreambuf_iterator<char>());
-            if (size == static_cast<size_t>(-1)) {
-              logger_->log_debug("Connection broke");
-              break;
-            }
             update_sink_->applyUpdate("ControllerSocketProtocol", configuration);
           }
         }
@@ -161,14 +165,14 @@ void ControllerSocketProtocol::initialize(core::controller::ControllerServicePro
         {
           std::string what;
           const auto size = stream->read(what);
-          if (size == static_cast<size_t>(-1)) {
+          if (io::isError(size)) {
             logger_->log_debug("Connection broke");
             break;
           }
           if (what == "queue") {
             std::string connection;
             const auto size_ = stream->read(connection);
-            if (size_ == static_cast<size_t>(-1)) {
+            if (io::isError(size_)) {
               logger_->log_debug("Connection broke");
               break;
             }
diff --git a/libminifi/src/core/ProcessSessionReadCallback.cpp b/libminifi/src/core/ProcessSessionReadCallback.cpp
index d62fac8..0d822c7 100644
--- a/libminifi/src/core/ProcessSessionReadCallback.cpp
+++ b/libminifi/src/core/ProcessSessionReadCallback.cpp
@@ -49,7 +49,7 @@ int64_t ProcessSessionReadCallback::process(const std::shared_ptr<io::BaseStream
   uint8_t buffer[8192];
   do {
     const auto read = stream->read(buffer, 8192);
-    if (read == static_cast<size_t>(-1)) return -1;
+    if (io::isError(read)) return -1;
     if (read == 0) break;
     if (!_tmpFileOs.write(reinterpret_cast<char*>(buffer), read)) {
       return -1;
diff --git a/libminifi/src/io/InputStream.cpp b/libminifi/src/io/InputStream.cpp
index cd4166b..be0a273 100644
--- a/libminifi/src/io/InputStream.cpp
+++ b/libminifi/src/io/InputStream.cpp
@@ -35,7 +35,7 @@ size_t InputStream::read(std::vector<uint8_t>& buffer, size_t len) {
     buffer.resize(len);
   }
   const auto ret = read(buffer.data(), len);
-  if (ret == static_cast<size_t>(-1)) return ret;
+  if (io::isError(ret)) return ret;
   buffer.resize((std::max)(ret, size_t{0}));
   return ret;
 }
@@ -75,7 +75,7 @@ size_t InputStream::read(std::string &str, bool widen) {
     ret = read(len);
   }
 
-  if (ret <= 0) {
+  if (ret == 0 || isError(ret)) {
     return ret;
   }
 
diff --git a/libminifi/src/provenance/Provenance.cpp b/libminifi/src/provenance/Provenance.cpp
index 0dd9cfc..f3b0715 100644
--- a/libminifi/src/provenance/Provenance.cpp
+++ b/libminifi/src/provenance/Provenance.cpp
@@ -239,7 +239,7 @@ bool ProvenanceEventRecord::DeSerialize(const uint8_t *buffer, const size_t buff
 
   {
     const auto ret = outStream.read(uuid_);
-    if (ret == 0 || ret == static_cast<size_t>(-1)) {
+    if (ret == 0 || io::isError(ret)) {
       return false;
     }
   }
@@ -283,28 +283,28 @@ bool ProvenanceEventRecord::DeSerialize(const uint8_t *buffer, const size_t buff
 
   {
     const auto ret = outStream.read(this->_componentId);
-    if (ret == 0 || ret == static_cast<size_t>(-1)) {
+    if (ret == 0 || io::isError(ret)) {
       return false;
     }
   }
 
   {
     const auto ret = outStream.read(this->_componentType);
-    if (ret == 0 || ret == static_cast<size_t>(-1)) {
+    if (ret == 0 || io::isError(ret)) {
       return false;
     }
   }
 
   {
     const auto ret = outStream.read(this->flow_uuid_);
-    if (ret == 0 || ret == static_cast<size_t>(-1)) {
+    if (ret == 0 || io::isError(ret)) {
       return false;
     }
   }
 
   {
     const auto ret = outStream.read(this->_details);
-    if (ret == 0 || ret == static_cast<size_t>(-1)) {
+    if (ret == 0 || io::isError(ret)) {
       return false;
     }
   }
@@ -322,14 +322,14 @@ bool ProvenanceEventRecord::DeSerialize(const uint8_t *buffer, const size_t buff
     std::string key;
     {
       const auto ret = outStream.read(key);
-      if (ret == 0 || ret == static_cast<size_t>(-1)) {
+      if (ret == 0 || io::isError(ret)) {
         return false;
       }
     }
     std::string value;
     {
       const auto ret = outStream.read(value);
-      if (ret == 0 || ret == static_cast<size_t>(-1)) {
+      if (ret == 0 || io::isError(ret)) {
         return false;
       }
     }
@@ -338,7 +338,7 @@ bool ProvenanceEventRecord::DeSerialize(const uint8_t *buffer, const size_t buff
 
   {
     const auto ret = outStream.read(this->_contentFullPath);
-    if (ret == 0 || ret == static_cast<size_t>(-1)) {
+    if (ret == 0 || io::isError(ret)) {
       return false;
     }
   }
@@ -359,7 +359,7 @@ bool ProvenanceEventRecord::DeSerialize(const uint8_t *buffer, const size_t buff
 
   {
     const auto ret = outStream.read(this->_sourceQueueIdentifier);
-    if (ret == 0 || ret == static_cast<size_t>(-1)) {
+    if (ret == 0 || io::isError(ret)) {
       return false;
     }
   }
@@ -378,7 +378,7 @@ bool ProvenanceEventRecord::DeSerialize(const uint8_t *buffer, const size_t buff
       utils::Identifier parentUUID;
       {
         const auto ret = outStream.read(parentUUID);
-        if (ret == 0 || ret == static_cast<size_t>(-1)) {
+        if (ret == 0 || io::isError(ret)) {
           return false;
         }
       }
@@ -395,7 +395,7 @@ bool ProvenanceEventRecord::DeSerialize(const uint8_t *buffer, const size_t buff
       utils::Identifier childUUID;
       {
         const auto ret = outStream.read(childUUID);
-        if (ret == 0 || ret == static_cast<size_t>(-1)) {
+        if (ret == 0 || io::isError(ret)) {
           return false;
         }
       }
@@ -404,20 +404,20 @@ bool ProvenanceEventRecord::DeSerialize(const uint8_t *buffer, const size_t buff
   } else if (this->_eventType == ProvenanceEventRecord::SEND || this->_eventType == ProvenanceEventRecord::FETCH) {
     {
       const auto ret = outStream.read(this->_transitUri);
-      if (ret == 0 || ret == static_cast<size_t>(-1)) {
+      if (ret == 0 || io::isError(ret)) {
         return false;
       }
     }
   } else if (this->_eventType == ProvenanceEventRecord::RECEIVE) {
     {
       const auto ret = outStream.read(this->_transitUri);
-      if (ret == 0 || ret == static_cast<size_t>(-1)) {
+      if (ret == 0 || io::isError(ret)) {
         return false;
       }
     }
     {
       const auto ret = outStream.read(this->_sourceSystemFlowFileIdentifier);
-      if (ret == 0 || ret == static_cast<size_t>(-1)) {
+      if (ret == 0 || io::isError(ret)) {
         return false;
       }
     }
diff --git a/libminifi/src/sitetosite/RawSocketProtocol.cpp b/libminifi/src/sitetosite/RawSocketProtocol.cpp
index 9dce59d..463a96f 100644
--- a/libminifi/src/sitetosite/RawSocketProtocol.cpp
+++ b/libminifi/src/sitetosite/RawSocketProtocol.cpp
@@ -135,7 +135,7 @@ bool RawSiteToSiteClient::initiateResourceNegotiation() {
   uint8_t statusCode;
   {
     const auto ret = peer_->read(statusCode);
-    if (ret == 0 || ret == static_cast<size_t>(-1)) {
+    if (ret == 0 || io::isError(ret)) {
       logger_->log_debug("result of writing version status code  %i", ret);
       return false;
     }
@@ -149,7 +149,7 @@ bool RawSiteToSiteClient::initiateResourceNegotiation() {
       uint32_t serverVersion;
       {
         const auto ret = peer_->read(serverVersion);
-        if (ret == 0 || ret == static_cast<size_t>(-1)) {
+        if (ret == 0 || io::isError(ret)) {
           return false;
         }
       }
@@ -202,7 +202,7 @@ bool RawSiteToSiteClient::initiateCodecResourceNegotiation() {
   uint8_t statusCode;
   {
     const auto ret = peer_->read(statusCode);
-    if (ret == 0 || ret == static_cast<size_t>(-1)) {
+    if (ret == 0 || io::isError(ret)) {
       return false;
     }
   }
@@ -214,7 +214,7 @@ bool RawSiteToSiteClient::initiateCodecResourceNegotiation() {
       uint32_t serverVersion;
       {
         const auto ret = peer_->read(serverVersion);
-        if (ret == 0 || ret == static_cast<size_t>(-1)) {
+        if (ret == 0 || io::isError(ret)) {
           return false;
         }
       }
@@ -359,7 +359,7 @@ bool RawSiteToSiteClient::getPeerList(std::vector<PeerStatus> &peers) {
     uint32_t number;
     {
       const auto ret = peer_->read(number);
-      if (ret == 0 || ret == static_cast<size_t>(-1)) {
+      if (ret == 0 || io::isError(ret)) {
         tearDown();
         return false;
       }
@@ -369,7 +369,7 @@ bool RawSiteToSiteClient::getPeerList(std::vector<PeerStatus> &peers) {
       std::string host;
       {
         const auto ret = peer_->read(host);
-        if (ret == 0 || ret == static_cast<size_t>(-1)) {
+        if (ret == 0 || io::isError(ret)) {
           tearDown();
           return false;
         }
@@ -377,7 +377,7 @@ bool RawSiteToSiteClient::getPeerList(std::vector<PeerStatus> &peers) {
       uint32_t port;
       {
         const auto ret = peer_->read(port);
-        if (ret == 0 || ret == static_cast<size_t>(-1)) {
+        if (ret == 0 || io::isError(ret)) {
           tearDown();
           return false;
         }
@@ -385,7 +385,7 @@ bool RawSiteToSiteClient::getPeerList(std::vector<PeerStatus> &peers) {
       uint8_t secure;
       {
         const auto ret = peer_->read(secure);
-        if (ret == 0 || ret == static_cast<size_t>(-1)) {
+        if (ret == 0 || io::isError(ret)) {
           tearDown();
           return false;
         }
@@ -393,7 +393,7 @@ bool RawSiteToSiteClient::getPeerList(std::vector<PeerStatus> &peers) {
       uint32_t count;
       {
         const auto ret = peer_->read(count);
-        if (ret == 0 || ret == static_cast<size_t>(-1)) {
+        if (ret == 0 || io::isError(ret)) {
           tearDown();
           return false;
         }
@@ -422,7 +422,7 @@ bool RawSiteToSiteClient::getPeerList(std::vector<PeerStatus> &peers) {
     std::string requestTypeStr;
 
     const auto ret = peer_->read(requestTypeStr);
-    if (ret == 0 || ret == static_cast<size_t>(-1))
+    if (ret == 0 || io::isError(ret))
       return static_cast<int>(ret);
 
     for (int i = NEGOTIATE_FLOWFILE_CODEC; i <= SHUTDOWN; i++) {
diff --git a/libminifi/src/sitetosite/SiteToSiteClient.cpp b/libminifi/src/sitetosite/SiteToSiteClient.cpp
index 7e4a38d..c7cd919 100644
--- a/libminifi/src/sitetosite/SiteToSiteClient.cpp
+++ b/libminifi/src/sitetosite/SiteToSiteClient.cpp
@@ -31,19 +31,25 @@ namespace sitetosite {
 
 int SiteToSiteClient::readResponse(const std::shared_ptr<Transaction>& /*transaction*/, RespondCode &code, std::string &message) {
   uint8_t firstByte;
-  auto ret = peer_->read(firstByte);
-  if (ret == 0 || ret == static_cast<size_t>(-1) || firstByte != CODE_SEQUENCE_VALUE_1)
-    return -1;
+  {
+    const auto ret = peer_->read(firstByte);
+    if (ret == 0 || io::isError(ret) || firstByte != CODE_SEQUENCE_VALUE_1)
+      return -1;
+  }
 
   uint8_t secondByte;
-  ret = peer_->read(secondByte);
-  if (ret == 0 || ret == static_cast<size_t>(-1) || secondByte != CODE_SEQUENCE_VALUE_2)
-    return -1;
+  {
+    const auto ret = peer_->read(secondByte);
+    if (ret == 0 || io::isError(ret) || secondByte != CODE_SEQUENCE_VALUE_2)
+      return -1;
+  }
 
   uint8_t thirdByte;
-  ret = peer_->read(thirdByte);
-  if (ret == 0 || ret == static_cast<size_t>(-1))
-    return gsl::narrow_cast<int>(ret);
+  {
+    const auto ret = peer_->read(thirdByte);
+    if (ret == 0 || io::isError(ret))
+      return static_cast<int>(ret);
+  }
 
   code = (RespondCode) thirdByte;
   RespondCodeContext *resCode = this->getRespondCodeContext(code);
@@ -51,8 +57,8 @@ int SiteToSiteClient::readResponse(const std::shared_ptr<Transaction>& /*transac
     return -1;
   }
   if (resCode->hasDescription) {
-    ret = peer_->read(message);
-    if (ret != static_cast<size_t>(-1))
+    const auto ret = peer_->read(message);
+    if (ret == 0 || !io::isError(ret))
       return -1;
   }
   return gsl::narrow<int>(3 + message.size());
@@ -584,7 +590,7 @@ bool SiteToSiteClient::receive(const utils::Identifier& transactionID, DataPacke
   uint32_t numAttributes;
   {
     const auto ret = transaction->getStream().read(numAttributes);
-    if (ret == 0 || ret == static_cast<size_t>(-1) || numAttributes > MAX_NUM_ATTRIBUTES) {
+    if (ret == 0 || io::isError(ret) || numAttributes > MAX_NUM_ATTRIBUTES) {
       return false;
     }
   }
@@ -596,13 +602,13 @@ bool SiteToSiteClient::receive(const utils::Identifier& transactionID, DataPacke
     std::string value;
     {
       const auto ret = transaction->getStream().read(key, true);
-      if (ret == 0 || ret == static_cast<size_t>(-1)) {
+      if (ret == 0 || io::isError(ret)) {
         return false;
       }
     }
     {
       const auto ret = transaction->getStream().read(value, true);
-      if (ret == 0 || ret == static_cast<size_t>(-1)) {
+      if (ret == 0 || io::isError(ret)) {
         return false;
       }
     }
@@ -613,7 +619,7 @@ bool SiteToSiteClient::receive(const utils::Identifier& transactionID, DataPacke
   uint64_t len;
   {
     const auto ret = transaction->getStream().read(len);
-    if (ret == 0 || ret == static_cast<size_t>(-1)) {
+    if (ret == 0 || io::isError(ret)) {
       return false;
     }
   }
diff --git a/libminifi/test/BufferReader.h b/libminifi/test/BufferReader.h
index 312ca76..63bce60 100644
--- a/libminifi/test/BufferReader.h
+++ b/libminifi/test/BufferReader.h
@@ -33,7 +33,7 @@ class BufferReader : public org::apache::nifi::minifi::InputStreamCallback {
     while (remaining_len > 0) {
       const auto ret = input.read(tmpBuffer, std::min(remaining_len, sizeof(tmpBuffer)));
       if (ret == 0) break;
-      if (ret == static_cast<size_t>(-1)) return ret;
+      if (minifi::io::isError(ret)) return ret;
       remaining_len -= ret;
       total_read += ret;
       auto prevSize = buffer_.size();
diff --git a/libminifi/test/archive-tests/CompressContentTests.cpp b/libminifi/test/archive-tests/CompressContentTests.cpp
index 3d0da83..c11f47a 100644
--- a/libminifi/test/archive-tests/CompressContentTests.cpp
+++ b/libminifi/test/archive-tests/CompressContentTests.cpp
@@ -57,7 +57,7 @@ class ReadCallback: public minifi::InputStreamCallback {
     do {
       const auto ret = stream->read(buffer_ + read_size_, buffer_size_ - read_size_);
       if (ret == 0) break;
-      if (ret == static_cast<size_t>(-1)) return -1;
+      if (minifi::io::isError(ret)) return -1;
       read_size_ += gsl::narrow<size_t>(ret);
       total_read += ret;
     } while (buffer_size_ != read_size_);
diff --git a/libminifi/test/archive-tests/MergeFileTests.cpp b/libminifi/test/archive-tests/MergeFileTests.cpp
index 074b153..4114672 100644
--- a/libminifi/test/archive-tests/MergeFileTests.cpp
+++ b/libminifi/test/archive-tests/MergeFileTests.cpp
@@ -90,7 +90,7 @@ class FixedBuffer : public minifi::InputStreamCallback {
     do {
       const auto ret = input.read(end(), len);
       if (ret == 0) break;
-      if (ret == static_cast<size_t>(-1)) return -1;
+      if (minifi::io::isError(ret)) return -1;
       size_ += ret;
       len -= ret;
       total_read += ret;
@@ -112,7 +112,8 @@ std::vector<FixedBuffer> read_archives(const FixedBuffer& input) {
    public:
     explicit ArchiveEntryReader(archive* arch) : arch(arch) {}
     size_t read(uint8_t* out, std::size_t len) {
-      return gsl::narrow_cast<size_t>(archive_read_data(arch, out, len));
+      const auto ret = archive_read_data(arch, out, len);
+      return ret < 0 ? static_cast<size_t>(-1) : gsl::narrow<size_t>(ret);
     }
    private:
     archive* arch;
diff --git a/libminifi/test/rocksdb-tests/ContentSessionTests.cpp b/libminifi/test/rocksdb-tests/ContentSessionTests.cpp
index d2e6d51..77b7c73 100644
--- a/libminifi/test/rocksdb-tests/ContentSessionTests.cpp
+++ b/libminifi/test/rocksdb-tests/ContentSessionTests.cpp
@@ -58,7 +58,7 @@ const std::shared_ptr<minifi::io::BaseStream>& operator>>(const std::shared_ptr<
   while (true) {
     const auto ret = stream->read(buffer, sizeof(buffer));
     REQUIRE(ret >= 0);
-    REQUIRE(ret != static_cast<size_t>(-1));
+    REQUIRE(!minifi::io::isError(ret));
     if (ret == 0) { break; }
     str += std::string{reinterpret_cast<char*>(buffer), ret};
   }
diff --git a/libminifi/test/rocksdb-tests/DBContentRepositoryTests.cpp b/libminifi/test/rocksdb-tests/DBContentRepositoryTests.cpp
index 7b5c75c..4d5e5a3 100644
--- a/libminifi/test/rocksdb-tests/DBContentRepositoryTests.cpp
+++ b/libminifi/test/rocksdb-tests/DBContentRepositoryTests.cpp
@@ -103,8 +103,8 @@ TEST_CASE("Delete Claim", "[TestDBCR2]") {
 
   std::string readstr;
 
-  // -1 tell us we have an invalid stream
-  REQUIRE(read_stream->read(readstr) == static_cast<size_t>(-1));
+  // error tell us we have an invalid stream
+  REQUIRE(minifi::io::isError(read_stream->read(readstr)));
 }
 
 TEST_CASE("Test Empty Claim", "[TestDBCR3]") {
@@ -140,7 +140,7 @@ TEST_CASE("Test Empty Claim", "[TestDBCR3]") {
   std::string readstr;
 
   // -1 tell us we have an invalid stream
-  REQUIRE(read_stream->read(readstr) == static_cast<size_t>(-1));
+  REQUIRE(minifi::io::isError(read_stream->read(readstr)));
 }
 
 TEST_CASE("Delete NonExistent Claim", "[TestDBCR4]") {
diff --git a/libminifi/test/rocksdb-tests/RocksDBStreamTests.cpp b/libminifi/test/rocksdb-tests/RocksDBStreamTests.cpp
index 5e7c3b6..69336b8 100644
--- a/libminifi/test/rocksdb-tests/RocksDBStreamTests.cpp
+++ b/libminifi/test/rocksdb-tests/RocksDBStreamTests.cpp
@@ -72,5 +72,5 @@ TEST_CASE_METHOD(RocksDBStreamTest, "Read zero bytes") {
 
   minifi::io::RocksDbStream nonExistingStream("two", gsl::make_not_null(db.get()));
 
-  REQUIRE(nonExistingStream.read(nullptr, 0) == static_cast<size_t>(-1));
+  REQUIRE(minifi::io::isError(nonExistingStream.read(nullptr, 0)));
 }
diff --git a/libminifi/test/unit/FileStreamTests.cpp b/libminifi/test/unit/FileStreamTests.cpp
index 1ff4ab9..57bae57 100644
--- a/libminifi/test/unit/FileStreamTests.cpp
+++ b/libminifi/test/unit/FileStreamTests.cpp
@@ -173,7 +173,7 @@ TEST_CASE("TestFileBadArgumentNoChange3", "[TestLoader]") {
 
   std::vector<uint8_t> verifybuffer;
 
-  REQUIRE(stream.read(nullptr, stream.size()) == static_cast<size_t>(-1));
+  REQUIRE(minifi::io::isError(stream.read(nullptr, stream.size())));
 
   data = verifybuffer.data();
 
@@ -280,7 +280,7 @@ TEST_CASE("Non-existing file read/write test") {
   REQUIRE(test_controller.getLog().getInstance().contains("Error writing to file: invalid file stream", std::chrono::seconds(0)));
   std::vector<uint8_t> readBuffer;
   stream.seek(0);
-  REQUIRE(stream.read(readBuffer, 1) == static_cast<size_t>(-1));
+  REQUIRE(minifi::io::isError(stream.read(readBuffer, 1)));
   REQUIRE(test_controller.getLog().getInstance().contains("Error reading from file: invalid file stream", std::chrono::seconds(0)));
 }
 
@@ -300,10 +300,10 @@ TEST_CASE("Existing file read/write test") {
   REQUIRE_FALSE(test_controller.getLog().getInstance().contains("Error writing to file", std::chrono::seconds(0)));
   std::vector<uint8_t> readBuffer;
   stream.seek(0);
-  REQUIRE_FALSE(stream.read(readBuffer, 11) == static_cast<size_t>(-1));
+  REQUIRE_FALSE(minifi::io::isError(stream.read(readBuffer, 11)));
   REQUIRE_FALSE(test_controller.getLog().getInstance().contains("Error reading from file", std::chrono::seconds(0)));
   stream.seek(0);
-  REQUIRE(stream.read(nullptr, 11) == static_cast<size_t>(-1));
+  REQUIRE(minifi::io::isError(stream.read(nullptr, 11)));
   REQUIRE(test_controller.getLog().getInstance().contains("Error reading from file: invalid buffer", std::chrono::seconds(0)));
 }
 
diff --git a/nanofi/tests/CSite2SiteTests.cpp b/nanofi/tests/CSite2SiteTests.cpp
index ad3c086..821d8e6 100644
--- a/nanofi/tests/CSite2SiteTests.cpp
+++ b/nanofi/tests/CSite2SiteTests.cpp
@@ -147,7 +147,7 @@ void sunny_path_bootstrap(minifi::io::BaseStream* stream, TransferState& transfe
   while(!found_codec) {
     uint8_t handshake_data[1000];
     const auto actual_len = stream->read(handshake_data+read_len, 1000-read_len);
-    if(actual_len == 0 || actual_len == static_cast<size_t>(-1)) {
+    if(actual_len == 0 || minifi::io::isError(actual_len)) {
       continue;
     }
     read_len += actual_len;
diff --git a/thirdparty/google-styleguide/run_linter.sh b/thirdparty/google-styleguide/run_linter.sh
index 5015c94..9fa4271 100755
--- a/thirdparty/google-styleguide/run_linter.sh
+++ b/thirdparty/google-styleguide/run_linter.sh
@@ -38,9 +38,8 @@ done
 [ x"$INCLUDE_DIRS" == x"" ] && echo "WARNING: No include directories specified."
 [ x"$SOURCE_DIRS" == x"" ] && echo "ERROR: No source directories specified." && exit 1
 
-HEADERS=`find $INCLUDE_DIRS -name '*.h' | sort | uniq | tr '\n' ' '`
-SOURCES=`find $SOURCE_DIRS -name  '*.cpp' | sort | uniq | tr '\n' ' '`
+HEADERS=$(find $INCLUDE_DIRS -name '*.h' | sort | uniq | tr '\n' ' ')
+SOURCES=$(find $SOURCE_DIRS -name  '*.cpp' | sort | uniq | tr '\n' ' ')
 # this realpath alternative should work on mac
-alias prealpath="python -c 'import os, sys; print(os.path.realpath(sys.argv[1]))'"
-REPOSITORY="$(python -c 'import os, sys; print(os.path.realpath(sys.argv[1] + "../.."))' $(dirname "$0"))"
-python ${SCRIPT_DIR}/cpplint.py --linelength=200 --repository="$REPOSITORY" ${HEADERS} ${SOURCES}
+REPOSITORY="$(python -c 'import os, sys; print(os.path.realpath(sys.argv[1] + "../.."))' "$(dirname "$0")")"
+python "${SCRIPT_DIR}"/cpplint.py --linelength=200 --repository="$REPOSITORY" ${HEADERS} ${SOURCES}

[nifi-minifi-cpp] 13/17: avoid undefined behavior (std::string(nullptr)) in LuaBaseStream::read

Posted by sz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

szaszm pushed a commit to branch MINIFICPP-1507
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit 4fc6ef141ba1bc1c81b6cf7fe6cf73a6bfb2baf6
Author: Marton Szasz <sz...@gmail.com>
AuthorDate: Thu Mar 18 15:26:35 2021 +0100

    avoid undefined behavior (std::string(nullptr)) in LuaBaseStream::read
---
 extensions/script/lua/LuaBaseStream.cpp | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/extensions/script/lua/LuaBaseStream.cpp b/extensions/script/lua/LuaBaseStream.cpp
index 883a13c..3545c77 100644
--- a/extensions/script/lua/LuaBaseStream.cpp
+++ b/extensions/script/lua/LuaBaseStream.cpp
@@ -39,7 +39,7 @@ std::string LuaBaseStream::read(size_t len) {
   }
 
   if (len <= 0) {
-    return nullptr;
+    return std::string{};
   }
 
   std::string buffer;

[nifi-minifi-cpp] 14/17: fix warnings that caused CI failures

Posted by sz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

szaszm pushed a commit to branch MINIFICPP-1507
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit 54a48a9d5ead789a5685962684c7c8a3a4039f4b
Author: Marton Szasz <sz...@gmail.com>
AuthorDate: Thu Mar 18 17:41:22 2021 +0100

    fix warnings that caused CI failures
---
 extensions/coap/protocols/CoapC2Protocol.h           |  2 +-
 extensions/libarchive/CompressContent.h              | 16 ++++++----------
 extensions/standard-processors/processors/GetTCP.cpp |  4 ++--
 libminifi/include/sitetosite/SiteToSiteClient.h      |  2 +-
 4 files changed, 10 insertions(+), 14 deletions(-)

diff --git a/extensions/coap/protocols/CoapC2Protocol.h b/extensions/coap/protocols/CoapC2Protocol.h
index 46aee45..dfee5a7 100644
--- a/extensions/coap/protocols/CoapC2Protocol.h
+++ b/extensions/coap/protocols/CoapC2Protocol.h
@@ -49,7 +49,7 @@ namespace coap {
 namespace c2 {
 
 #define REQUIRE_VALID(x) \
-  if (-1 == x){ \
+  if (io::isError(x)){ \
     return minifi::c2::C2Payload(payload.getOperation(), state::UpdateState::READ_ERROR); \
   }
 
diff --git a/extensions/libarchive/CompressContent.h b/extensions/libarchive/CompressContent.h
index f5e44fa..8f456d8 100644
--- a/extensions/libarchive/CompressContent.h
+++ b/extensions/libarchive/CompressContent.h
@@ -189,16 +189,11 @@ public:
       return ret;
     }
 
-    static la_ssize_t archive_read(struct archive *arch, void *context, const void **buff) {
-      WriteCallback *callback = (WriteCallback *) context;
+    static la_ssize_t archive_read(struct archive*, void *context, const void **buff) {
+      auto *callback = (WriteCallback *) context;
       callback->session_->read(callback->flow_, &callback->readDecompressCb_);
-      if (callback->readDecompressCb_.read_size_ >= 0) {
-        *buff = callback->readDecompressCb_.buffer_;
-        return gsl::narrow<la_ssize_t>(callback->readDecompressCb_.read_size_);
-      } else {
-        archive_set_error(arch, EIO, "Error reading flowfile");
-        return -1;
-      }
+      *buff = callback->readDecompressCb_.buffer_;
+      return gsl::narrow<la_ssize_t>(callback->readDecompressCb_.read_size_);
     }
 
     static la_int64_t archive_skip(struct archive* /*a*/, void* /*client_data*/, la_int64_t /*request*/) {
@@ -388,7 +383,8 @@ public:
             } else if (ret == 0) {
               break;
             } else {
-              if (outputStream_->write(buffer.data(), ret) != ret) {
+              const auto writeret = outputStream_->write(buffer.data(), ret);
+              if (!io::isError(writeret) && gsl::narrow<size_t>(writeret) != ret) {
                 return -1;
               }
               read_size += ret;
diff --git a/extensions/standard-processors/processors/GetTCP.cpp b/extensions/standard-processors/processors/GetTCP.cpp
index cbbe535..e215f898 100644
--- a/extensions/standard-processors/processors/GetTCP.cpp
+++ b/extensions/standard-processors/processors/GetTCP.cpp
@@ -168,8 +168,8 @@ void GetTCP::onSchedule(const std::shared_ptr<core::ProcessContext> &context, co
         if ( socket_ring_buffer_.try_dequeue(socket_ptr) ) {
           buffer.resize(receive_buffer_size_);
           const auto size_read = socket_ptr->read(buffer.data(), receive_buffer_size_, false);
-          if (size_read >= 0 && !io::isError(size_read)) {
-            if (size_read > 0 && !io::isError(size_read)) {
+          if (!io::isError(size_read)) {
+            if (size_read != 0) {
               // determine cut location
               size_t startLoc = 0;
               for (size_t i = 0; i < size_read; i++) {
diff --git a/libminifi/include/sitetosite/SiteToSiteClient.h b/libminifi/include/sitetosite/SiteToSiteClient.h
index b372ca3..d142bfa 100644
--- a/libminifi/include/sitetosite/SiteToSiteClient.h
+++ b/libminifi/include/sitetosite/SiteToSiteClient.h
@@ -308,7 +308,7 @@ class ReadCallback : public InputStreamCallback {
       if (readSize == 0) break;
       if (io::isError(readSize)) return -1;
       const auto ret = _packet->transaction_->getStream().write(buffer, readSize);
-      if (ret != readSize) {
+      if (ret < 0 || gsl::narrow<size_t>(ret) != readSize) {
         logging::LOG_INFO(_packet->logger_reference_) << "Site2Site Send Flow Size " << readSize << " Failed " << ret;
         return -1;
       }

[nifi-minifi-cpp] 07/17: review feedback, fix linter script on mac

Posted by sz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

szaszm pushed a commit to branch MINIFICPP-1507
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit f177e5d46f9ce793b9d3c2727cba7da2ae0686ac
Author: Marton Szasz <sz...@gmail.com>
AuthorDate: Wed Mar 17 10:31:36 2021 +0100

    review feedback, fix linter script on mac
---
 extensions/script/python/PyBaseStream.cpp          |   2 +-
 extensions/sftp/client/SFTPClient.cpp              |   2 +-
 .../standard-processors/processors/ExtractText.cpp |   2 +-
 libminifi/include/sitetosite/SiteToSiteClient.h    |  38 ++--
 libminifi/src/FlowFileRecord.cpp                   |  88 ++++++----
 libminifi/src/provenance/Provenance.cpp            | 194 +++++++++++++--------
 libminifi/src/sitetosite/RawSocketProtocol.cpp     |  57 +++---
 libminifi/src/sitetosite/SiteToSiteClient.cpp      |  34 ++--
 thirdparty/google-styleguide/run_linter.sh         |   4 +-
 9 files changed, 250 insertions(+), 171 deletions(-)

diff --git a/extensions/script/python/PyBaseStream.cpp b/extensions/script/python/PyBaseStream.cpp
index 1733994..5e583f7 100644
--- a/extensions/script/python/PyBaseStream.cpp
+++ b/extensions/script/python/PyBaseStream.cpp
@@ -50,7 +50,7 @@ py::bytes PyBaseStream::read(size_t len) {
   std::vector<uint8_t> buffer(len);
 
   const auto read = stream_->read(buffer.data(), len);
-  return py::bytes(reinterpret_cast<char *>(buffer.data()), static_cast<size_t>(read));
+  return py::bytes(reinterpret_cast<char *>(buffer.data()), read);
 }
 
 size_t PyBaseStream::write(const py::bytes& buf) {
diff --git a/extensions/sftp/client/SFTPClient.cpp b/extensions/sftp/client/SFTPClient.cpp
index 8d528ab..1e76cb8 100644
--- a/extensions/sftp/client/SFTPClient.cpp
+++ b/extensions/sftp/client/SFTPClient.cpp
@@ -577,7 +577,7 @@ bool SFTPClient::putFile(const std::string& path, io::BaseStream& input, bool ov
     }
     logger_->log_trace("Read %d bytes", read_ret);
     total_read += read_ret;
-    auto remaining = gsl::narrow<size_t>(read_ret);
+    auto remaining = read_ret;
     while (remaining > 0) {
       const auto write_ret = libssh2_sftp_write(file_handle, reinterpret_cast<char*>(buf.data() + (read_ret - remaining)), remaining);
       if (write_ret < 0) {
diff --git a/extensions/standard-processors/processors/ExtractText.cpp b/extensions/standard-processors/processors/ExtractText.cpp
index 888ec47..0000b84 100644
--- a/extensions/standard-processors/processors/ExtractText.cpp
+++ b/extensions/standard-processors/processors/ExtractText.cpp
@@ -161,7 +161,7 @@ int64_t ExtractText::ReadCallback::process(const std::shared_ptr<io::BaseStream>
     bool repeatingcapture;
     ctx_->getProperty(EnableRepeatingCaptureGroup.getName(), repeatingcapture);
 
-    const size_t maxCaptureSize = [this]{
+    const size_t maxCaptureSize = [this] {
       uint64_t val;
       ctx_->getProperty(MaxCaptureGroupLen.getName(), val);
       return size_t{val};
diff --git a/libminifi/include/sitetosite/SiteToSiteClient.h b/libminifi/include/sitetosite/SiteToSiteClient.h
index be1d327..36c3d96 100644
--- a/libminifi/include/sitetosite/SiteToSiteClient.h
+++ b/libminifi/include/sitetosite/SiteToSiteClient.h
@@ -45,15 +45,14 @@ namespace sitetosite {
  */
 class DataPacket {
  public:
-  DataPacket(const std::shared_ptr<logging::Logger> &logger, const std::shared_ptr<Transaction> &transaction, std::map<std::string, std::string> attributes, const std::string &payload)
-      : payload_(payload),
-        logger_reference_(logger) {
-    _size = 0;
-    transaction_ = transaction;
-    _attributes = attributes;
+  DataPacket(std::shared_ptr<logging::Logger> logger, std::shared_ptr<Transaction> transaction, std::map<std::string, std::string> attributes, const std::string &payload)
+      : _attributes{std::move(attributes)},
+        transaction_{std::move(transaction)},
+        payload_{payload},
+        logger_reference_{std::move(logger)} {
   }
   std::map<std::string, std::string> _attributes;
-  uint64_t _size;
+  uint64_t _size{0};
   std::shared_ptr<Transaction> transaction_;
   const std::string & payload_;
   std::shared_ptr<logging::Logger> logger_reference_;
@@ -62,16 +61,7 @@ class DataPacket {
 class SiteToSiteClient : public core::Connectable {
  public:
   SiteToSiteClient()
-      : core::Connectable("SitetoSiteClient"),
-        peer_state_(IDLE),
-        _batchSendNanos(5000000000),
-        ssl_context_service_(nullptr),
-        logger_(logging::LoggerFactory<SiteToSiteClient>::getLogger()) {
-    _currentVersion = _supportedVersion[0];
-    _currentVersionIndex = 0;
-    _supportedCodecVersion[0] = 1;
-    _currentCodecVersion = _supportedCodecVersion[0];
-    _currentCodecVersionIndex = 0;
+      : core::Connectable("SitetoSiteClient") {
   }
 
   ~SiteToSiteClient() override = default;
@@ -240,7 +230,7 @@ class SiteToSiteClient : public core::Connectable {
   }
 
   // Peer State
-  PeerState peer_state_;
+  PeerState peer_state_{PeerState::IDLE};
 
   // portId
   utils::Identifier port_id_;
@@ -257,22 +247,22 @@ class SiteToSiteClient : public core::Connectable {
   std::map<utils::Identifier, std::shared_ptr<Transaction>> known_transactions_;
 
   // BATCH_SEND_NANOS
-  uint64_t _batchSendNanos;
+  uint64_t _batchSendNanos{5000000000};
 
   /***
    * versioning
    */
   uint32_t _supportedVersion[5] = {5, 4, 3, 2, 1};
-  uint32_t _currentVersion;
-  int _currentVersionIndex;
+  int _currentVersionIndex{0};
+  uint32_t _currentVersion{_supportedVersion[_currentVersionIndex]};
   uint32_t _supportedCodecVersion[1] = {1};
-  uint32_t _currentCodecVersion;
-  int _currentCodecVersionIndex;
+  int _currentCodecVersionIndex{0};
+  uint32_t _currentCodecVersion{_supportedCodecVersion[_currentCodecVersionIndex]};
 
   std::shared_ptr<minifi::controllers::SSLContextService> ssl_context_service_;
 
  private:
-  std::shared_ptr<logging::Logger> logger_;
+  std::shared_ptr<logging::Logger> logger_{logging::LoggerFactory<SiteToSiteClient>::getLogger()};
 };
 
 // Nest Callback Class for write stream
diff --git a/libminifi/src/FlowFileRecord.cpp b/libminifi/src/FlowFileRecord.cpp
index 047c571..f7b1638 100644
--- a/libminifi/src/FlowFileRecord.cpp
+++ b/libminifi/src/FlowFileRecord.cpp
@@ -163,66 +163,88 @@ bool FlowFileRecord::Persist(const std::shared_ptr<core::Repository>& flowReposi
 std::shared_ptr<FlowFileRecord> FlowFileRecord::DeSerialize(io::InputStream& inStream, const std::shared_ptr<core::ContentRepository>& content_repo, utils::Identifier& container) {
   auto file = std::make_shared<FlowFileRecord>();
 
-  auto ret = inStream.read(file->event_time_);
-  if (ret != 8) {
-    return {};
+  {
+    const auto ret = inStream.read(file->event_time_);
+    if (ret != 8) {
+      return {};
+    }
   }
 
-  ret = inStream.read(file->entry_date_);
-  if (ret != 8) {
-    return {};
+  {
+    const auto ret = inStream.read(file->entry_date_);
+    if (ret != 8) {
+      return {};
+    }
   }
 
-  ret = inStream.read(file->lineage_start_date_);
-  if (ret != 8) {
-    return {};
+  {
+    const auto ret = inStream.read(file->lineage_start_date_);
+    if (ret != 8) {
+      return {};
+    }
   }
 
-  ret = inStream.read(file->uuid_);
-  if (ret == static_cast<size_t>(-1)) {
-    return {};
+  {
+    const auto ret = inStream.read(file->uuid_);
+    if (ret == 0 || ret == static_cast<size_t>(-1)) {
+      return {};
+    }
   }
 
-  ret = inStream.read(container);
-  if (ret == static_cast<size_t>(-1)) {
-    return {};
+  {
+    const auto ret = inStream.read(container);
+    if (ret == 0 || ret == static_cast<size_t>(-1)) {
+      return {};
+    }
   }
 
   // read flow attributes
   uint32_t numAttributes = 0;
-  ret = inStream.read(numAttributes);
-  if (ret != 4) {
-    return {};
+  {
+    const auto ret = inStream.read(numAttributes);
+    if (ret != 4) {
+      return {};
+    }
   }
 
   for (uint32_t i = 0; i < numAttributes; i++) {
     std::string key;
-    ret = inStream.read(key, true);
-    if (ret == static_cast<size_t>(-1)) {
-      return {};
+    {
+      const auto ret = inStream.read(key, true);
+      if (ret == 0 || ret == static_cast<size_t>(-1)) {
+        return {};
+      }
     }
     std::string value;
-    ret = inStream.read(value, true);
-    if (ret == static_cast<size_t>(-1)) {
-      return {};
+    {
+      const auto ret = inStream.read(value, true);
+      if (ret == 0 || ret == static_cast<size_t>(-1)) {
+        return {};
+      }
     }
     file->attributes_[key] = value;
   }
 
   std::string content_full_path;
-  ret = inStream.read(content_full_path);
-  if (ret == static_cast<size_t>(-1)) {
-    return {};
+  {
+    const auto ret = inStream.read(content_full_path);
+    if (ret == 0 || ret == static_cast<size_t>(-1)) {
+      return {};
+    }
   }
 
-  ret = inStream.read(file->size_);
-  if (ret != 8) {
-    return {};
+  {
+    const auto ret = inStream.read(file->size_);
+    if (ret != 8) {
+      return {};
+    }
   }
 
-  ret = inStream.read(file->offset_);
-  if (ret != 8) {
-    return {};
+  {
+    const auto ret = inStream.read(file->offset_);
+    if (ret != 8) {
+      return {};
+    }
   }
 
   file->claim_ = std::make_shared<ResourceClaim>(content_full_path, content_repo);
diff --git a/libminifi/src/provenance/Provenance.cpp b/libminifi/src/provenance/Provenance.cpp
index 2fac08b..0dd9cfc 100644
--- a/libminifi/src/provenance/Provenance.cpp
+++ b/libminifi/src/provenance/Provenance.cpp
@@ -237,141 +237,189 @@ bool ProvenanceEventRecord::Serialize(const std::shared_ptr<core::SerializableCo
 bool ProvenanceEventRecord::DeSerialize(const uint8_t *buffer, const size_t bufferSize) {
   org::apache::nifi::minifi::io::BufferStream outStream(buffer, gsl::narrow<unsigned int>(bufferSize));
 
-  auto ret = outStream.read(uuid_);
-  if (ret == static_cast<size_t>(-1)) {
-    return false;
+  {
+    const auto ret = outStream.read(uuid_);
+    if (ret == 0 || ret == static_cast<size_t>(-1)) {
+      return false;
+    }
   }
 
   uint32_t eventType;
-  ret = outStream.read(eventType);
-  if (ret != 4) {
-    return false;
+  {
+    const auto ret = outStream.read(eventType);
+    if (ret != 4) {
+      return false;
+    }
   }
-  this->_eventType = (ProvenanceEventRecord::ProvenanceEventType) eventType;
 
-  ret = outStream.read(this->_eventTime);
-  if (ret != 8) {
-    return false;
+  this->_eventType = (ProvenanceEventRecord::ProvenanceEventType) eventType;
+  {
+    const auto ret = outStream.read(this->_eventTime);
+    if (ret != 8) {
+      return false;
+    }
   }
 
-  ret = outStream.read(this->_entryDate);
-  if (ret != 8) {
-    return false;
+  {
+    const auto ret = outStream.read(this->_entryDate);
+    if (ret != 8) {
+      return false;
+    }
   }
 
-  ret = outStream.read(this->_eventDuration);
-  if (ret != 8) {
-    return false;
+  {
+    const auto ret = outStream.read(this->_eventDuration);
+    if (ret != 8) {
+      return false;
+    }
   }
 
-  ret = outStream.read(this->_lineageStartDate);
-  if (ret != 8) {
-    return false;
+  {
+    const auto ret = outStream.read(this->_lineageStartDate);
+    if (ret != 8) {
+      return false;
+    }
   }
 
-  ret = outStream.read(this->_componentId);
-  if (ret == static_cast<size_t>(-1)) {
-    return false;
+  {
+    const auto ret = outStream.read(this->_componentId);
+    if (ret == 0 || ret == static_cast<size_t>(-1)) {
+      return false;
+    }
   }
 
-  ret = outStream.read(this->_componentType);
-  if (ret == static_cast<size_t>(-1)) {
-    return false;
+  {
+    const auto ret = outStream.read(this->_componentType);
+    if (ret == 0 || ret == static_cast<size_t>(-1)) {
+      return false;
+    }
   }
 
-  ret = outStream.read(this->flow_uuid_);
-  if (ret == static_cast<size_t>(-1)) {
-    return false;
+  {
+    const auto ret = outStream.read(this->flow_uuid_);
+    if (ret == 0 || ret == static_cast<size_t>(-1)) {
+      return false;
+    }
   }
 
-  ret = outStream.read(this->_details);
-  if (ret == static_cast<size_t>(-1)) {
-    return false;
+  {
+    const auto ret = outStream.read(this->_details);
+    if (ret == 0 || ret == static_cast<size_t>(-1)) {
+      return false;
+    }
   }
 
   // read flow attributes
   uint32_t numAttributes = 0;
-  ret = outStream.read(numAttributes);
-  if (ret != 4) {
-    return false;
+  {
+    const auto ret = outStream.read(numAttributes);
+    if (ret != 4) {
+      return false;
+    }
   }
 
   for (uint32_t i = 0; i < numAttributes; i++) {
     std::string key;
-    ret = outStream.read(key);
-    if (ret == static_cast<size_t>(-1)) {
-      return false;
+    {
+      const auto ret = outStream.read(key);
+      if (ret == 0 || ret == static_cast<size_t>(-1)) {
+        return false;
+      }
     }
     std::string value;
-    ret = outStream.read(value);
-    if (ret == static_cast<size_t>(-1)) {
-      return false;
+    {
+      const auto ret = outStream.read(value);
+      if (ret == 0 || ret == static_cast<size_t>(-1)) {
+        return false;
+      }
     }
     this->_attributes[key] = value;
   }
 
-  ret = outStream.read(this->_contentFullPath);
-  if (ret == static_cast<size_t>(-1)) {
-    return false;
+  {
+    const auto ret = outStream.read(this->_contentFullPath);
+    if (ret == 0 || ret == static_cast<size_t>(-1)) {
+      return false;
+    }
   }
 
-  ret = outStream.read(this->_size);
-  if (ret != 8) {
-    return false;
+  {
+    const auto ret = outStream.read(this->_size);
+    if (ret != 8) {
+      return false;
+    }
   }
 
-  ret = outStream.read(this->_offset);
-  if (ret != 8) {
-    return false;
+  {
+    const auto ret = outStream.read(this->_offset);
+    if (ret != 8) {
+      return false;
+    }
   }
 
-  ret = outStream.read(this->_sourceQueueIdentifier);
-  if (ret == static_cast<size_t>(-1)) {
-    return false;
+  {
+    const auto ret = outStream.read(this->_sourceQueueIdentifier);
+    if (ret == 0 || ret == static_cast<size_t>(-1)) {
+      return false;
+    }
   }
 
   if (this->_eventType == ProvenanceEventRecord::FORK || this->_eventType == ProvenanceEventRecord::CLONE || this->_eventType == ProvenanceEventRecord::JOIN) {
     // read UUIDs
     uint32_t number = 0;
-    ret = outStream.read(number);
-    if (ret != 4) {
-      return false;
+    {
+      const auto ret = outStream.read(number);
+      if (ret != 4) {
+        return false;
+      }
     }
 
     for (uint32_t i = 0; i < number; i++) {
       utils::Identifier parentUUID;
-      ret = outStream.read(parentUUID);
-      if (ret == static_cast<size_t>(-1)) {
-        return false;
+      {
+        const auto ret = outStream.read(parentUUID);
+        if (ret == 0 || ret == static_cast<size_t>(-1)) {
+          return false;
+        }
       }
       this->addParentUuid(parentUUID);
     }
     number = 0;
-    ret = outStream.read(number);
-    if (ret != 4) {
-      return false;
+    {
+      const auto ret = outStream.read(number);
+      if (ret != 4) {
+        return false;
+      }
     }
     for (uint32_t i = 0; i < number; i++) {
       utils::Identifier childUUID;
-      ret = outStream.read(childUUID);
-      if (ret == static_cast<size_t>(-1)) {
-        return false;
+      {
+        const auto ret = outStream.read(childUUID);
+        if (ret == 0 || ret == static_cast<size_t>(-1)) {
+          return false;
+        }
       }
       this->addChildUuid(childUUID);
     }
   } else if (this->_eventType == ProvenanceEventRecord::SEND || this->_eventType == ProvenanceEventRecord::FETCH) {
-    ret = outStream.read(this->_transitUri);
-    if (ret == static_cast<size_t>(-1)) {
-      return false;
+    {
+      const auto ret = outStream.read(this->_transitUri);
+      if (ret == 0 || ret == static_cast<size_t>(-1)) {
+        return false;
+      }
     }
   } else if (this->_eventType == ProvenanceEventRecord::RECEIVE) {
-    ret = outStream.read(this->_transitUri);
-    if (ret == static_cast<size_t>(-1)) {
-      return false;
+    {
+      const auto ret = outStream.read(this->_transitUri);
+      if (ret == 0 || ret == static_cast<size_t>(-1)) {
+        return false;
+      }
     }
-    ret = outStream.read(this->_sourceSystemFlowFileIdentifier);
-    if (ret == static_cast<size_t>(-1)) {
-      return false;
+    {
+      const auto ret = outStream.read(this->_sourceSystemFlowFileIdentifier);
+      if (ret == 0 || ret == static_cast<size_t>(-1)) {
+        return false;
+      }
     }
   }
 
diff --git a/libminifi/src/sitetosite/RawSocketProtocol.cpp b/libminifi/src/sitetosite/RawSocketProtocol.cpp
index c75eb17..9dce59d 100644
--- a/libminifi/src/sitetosite/RawSocketProtocol.cpp
+++ b/libminifi/src/sitetosite/RawSocketProtocol.cpp
@@ -357,37 +357,46 @@ bool RawSiteToSiteClient::getPeerList(std::vector<PeerStatus> &peers) {
     }
 
     uint32_t number;
-    auto ret = peer_->read(number);
-
-    if (ret == static_cast<size_t>(-1)) {
-      tearDown();
-      return false;
+    {
+      const auto ret = peer_->read(number);
+      if (ret == 0 || ret == static_cast<size_t>(-1)) {
+        tearDown();
+        return false;
+      }
     }
 
     for (uint32_t i = 0; i < number; i++) {
       std::string host;
-      ret = peer_->read(host);
-      if (ret == static_cast<size_t>(-1)) {
-        tearDown();
-        return false;
+      {
+        const auto ret = peer_->read(host);
+        if (ret == 0 || ret == static_cast<size_t>(-1)) {
+          tearDown();
+          return false;
+        }
       }
       uint32_t port;
-      ret = peer_->read(port);
-      if (ret == static_cast<size_t>(-1)) {
-        tearDown();
-        return false;
+      {
+        const auto ret = peer_->read(port);
+        if (ret == 0 || ret == static_cast<size_t>(-1)) {
+          tearDown();
+          return false;
+        }
       }
       uint8_t secure;
-      ret = peer_->read(secure);
-      if (ret == static_cast<size_t>(-1)) {
-        tearDown();
-        return false;
+      {
+        const auto ret = peer_->read(secure);
+        if (ret == 0 || ret == static_cast<size_t>(-1)) {
+          tearDown();
+          return false;
+        }
       }
       uint32_t count;
-      ret = peer_->read(count);
-      if (ret == static_cast<size_t>(-1)) {
-        tearDown();
-        return false;
+      {
+        const auto ret = peer_->read(count);
+        if (ret == 0 || ret == static_cast<size_t>(-1)) {
+          tearDown();
+          return false;
+        }
       }
       PeerStatus peer_status(std::make_shared<Peer>(port_id_, host, gsl::narrow<uint16_t>(port), secure != 0), count, true);
       peers.push_back(std::move(peer_status));
@@ -413,13 +422,13 @@ bool RawSiteToSiteClient::getPeerList(std::vector<PeerStatus> &peers) {
     std::string requestTypeStr;
 
     const auto ret = peer_->read(requestTypeStr);
-    if (ret == static_cast<size_t>(-1))
-      return gsl::narrow_cast<int>(ret);
+    if (ret == 0 || ret == static_cast<size_t>(-1))
+      return static_cast<int>(ret);
 
     for (int i = NEGOTIATE_FLOWFILE_CODEC; i <= SHUTDOWN; i++) {
       if (SiteToSiteRequest::RequestTypeStr[i] == requestTypeStr) {
         type = (RequestType) i;
-        return gsl::narrow_cast<int>(ret);
+        return static_cast<int>(ret);
       }
     }
 
diff --git a/libminifi/src/sitetosite/SiteToSiteClient.cpp b/libminifi/src/sitetosite/SiteToSiteClient.cpp
index b009c18..7e4a38d 100644
--- a/libminifi/src/sitetosite/SiteToSiteClient.cpp
+++ b/libminifi/src/sitetosite/SiteToSiteClient.cpp
@@ -42,7 +42,7 @@ int SiteToSiteClient::readResponse(const std::shared_ptr<Transaction>& /*transac
 
   uint8_t thirdByte;
   ret = peer_->read(thirdByte);
-  if (ret == static_cast<size_t>(-1))
+  if (ret == 0 || ret == static_cast<size_t>(-1))
     return gsl::narrow_cast<int>(ret);
 
   code = (RespondCode) thirdByte;
@@ -582,9 +582,11 @@ bool SiteToSiteClient::receive(const utils::Identifier& transactionID, DataPacke
 
   // start to read the packet
   uint32_t numAttributes;
-  auto ret = transaction->getStream().read(numAttributes);
-  if (ret != static_cast<size_t>(-1) || numAttributes > MAX_NUM_ATTRIBUTES) {
-    return false;
+  {
+    const auto ret = transaction->getStream().read(numAttributes);
+    if (ret == 0 || ret == static_cast<size_t>(-1) || numAttributes > MAX_NUM_ATTRIBUTES) {
+      return false;
+    }
   }
 
   // read the attributes
@@ -592,22 +594,28 @@ bool SiteToSiteClient::receive(const utils::Identifier& transactionID, DataPacke
   for (unsigned int i = 0; i < numAttributes; i++) {
     std::string key;
     std::string value;
-    ret = transaction->getStream().read(key, true);
-    if (ret != static_cast<size_t>(-1)) {
-      return false;
+    {
+      const auto ret = transaction->getStream().read(key, true);
+      if (ret == 0 || ret == static_cast<size_t>(-1)) {
+        return false;
+      }
     }
-    ret = transaction->getStream().read(value, true);
-    if (ret != static_cast<size_t>(-1)) {
-      return false;
+    {
+      const auto ret = transaction->getStream().read(value, true);
+      if (ret == 0 || ret == static_cast<size_t>(-1)) {
+        return false;
+      }
     }
     packet->_attributes[key] = value;
     logger_->log_debug("Site2Site transaction %s receives attribute key %s value %s", transactionID.to_string(), key, value);
   }
 
   uint64_t len;
-  ret = transaction->getStream().read(len);
-  if (ret != static_cast<size_t>(-1)) {
-    return false;
+  {
+    const auto ret = transaction->getStream().read(len);
+    if (ret == 0 || ret == static_cast<size_t>(-1)) {
+      return false;
+    }
   }
 
   packet->_size = len;
diff --git a/thirdparty/google-styleguide/run_linter.sh b/thirdparty/google-styleguide/run_linter.sh
index ca0a1ad..5015c94 100755
--- a/thirdparty/google-styleguide/run_linter.sh
+++ b/thirdparty/google-styleguide/run_linter.sh
@@ -40,5 +40,7 @@ done
 
 HEADERS=`find $INCLUDE_DIRS -name '*.h' | sort | uniq | tr '\n' ' '`
 SOURCES=`find $SOURCE_DIRS -name  '*.cpp' | sort | uniq | tr '\n' ' '`
-REPOSITORY="$(realpath --physical "$(dirname "$0")/../..")"
+# this realpath alternative should work on mac
+alias prealpath="python -c 'import os, sys; print(os.path.realpath(sys.argv[1]))'"
+REPOSITORY="$(python -c 'import os, sys; print(os.path.realpath(sys.argv[1] + "../.."))' $(dirname "$0"))"
 python ${SCRIPT_DIR}/cpplint.py --linelength=200 --repository="$REPOSITORY" ${HEADERS} ${SOURCES}

[nifi-minifi-cpp] 16/17: fix various issues

Posted by sz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

szaszm pushed a commit to branch MINIFICPP-1507
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit efd9dc647219c69439b2b785b83cdac2e04b5921
Author: Marton Szasz <sz...@gmail.com>
AuthorDate: Thu Mar 25 12:11:59 2021 +0100

    fix various issues
---
 extensions/libarchive/CompressContent.h | 39 ++++++++++++++++-----------------
 1 file changed, 19 insertions(+), 20 deletions(-)

diff --git a/extensions/libarchive/CompressContent.h b/extensions/libarchive/CompressContent.h
index aae9492..b93471f 100644
--- a/extensions/libarchive/CompressContent.h
+++ b/extensions/libarchive/CompressContent.h
@@ -44,7 +44,7 @@ namespace minifi {
 namespace processors {
 
 // CompressContent Class
-class CompressContent: public core::Processor {
+class CompressContent : public core::Processor {
 public:
   // Constructor
   /*!
@@ -120,7 +120,7 @@ public:
             status_ = -1;
             return -1;
           }
-          read_size += ret;
+          read_size += gsl::narrow<uint64_t>(ret);
         } else {
           break;
         }
@@ -134,26 +134,24 @@ public:
     std::shared_ptr<logging::Logger> logger_;
   };
   // Nest Callback Class for read stream from flow for decompress
-  class ReadCallbackDecompress: public InputStreamCallback {
-  public:
+  struct ReadCallbackDecompress : InputStreamCallback {
     explicit ReadCallbackDecompress(std::shared_ptr<core::FlowFile> flow) :
-        flow_(std::move(flow)) {
+        flow_file(std::move(flow)) {
     }
     ~ReadCallbackDecompress() override = default;
     int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
-      read_size_ = 0;
-      stream->seek(offset_);
-      const auto readRet = stream->read(buffer_, sizeof(buffer_));
-      read_size_ = readRet;
+      stream->seek(offset);
+      const auto readRet = stream->read(buffer, sizeof(buffer));
+      stream_read_result = readRet;
       if (!io::isError(readRet)) {
-        offset_ += readRet;
+        offset += readRet;
       }
       return gsl::narrow<int64_t>(readRet);
     }
-    size_t read_size_ = 0;
-    uint8_t buffer_[8192] = {0};
-    size_t offset_ = 0;
-    std::shared_ptr<core::FlowFile> flow_;
+    size_t stream_read_result = 0;  // read size or error code, to be checked with io::isError
+    uint8_t buffer[8192] = {0};
+    size_t offset = 0;
+    std::shared_ptr<core::FlowFile> flow_file;
   };
   // Nest Callback Class for write stream
   class WriteCallback: public OutputStreamCallback {
@@ -192,8 +190,9 @@ public:
     static la_ssize_t archive_read(struct archive*, void *context, const void **buff) {
       auto *callback = (WriteCallback *) context;
       callback->session_->read(callback->flow_, &callback->readDecompressCb_);
-      *buff = callback->readDecompressCb_.buffer_;
-      return gsl::narrow<la_ssize_t>(callback->readDecompressCb_.read_size_);
+      *buff = callback->readDecompressCb_.buffer;
+      if (io::isError(callback->readDecompressCb_.stream_read_result)) return -1;
+      return gsl::narrow<la_ssize_t>(callback->readDecompressCb_.stream_read_result);
     }
 
     static la_int64_t archive_skip(struct archive* /*a*/, void* /*client_data*/, la_int64_t /*request*/) {
@@ -420,14 +419,14 @@ public:
    * @param sessionFactory process session factory that is used when creating
    * ProcessSession objects.
    */
-  void onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory);
+  void onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory) override;
   // OnTrigger method, implemented by NiFi CompressContent
-  virtual void onTrigger(core::ProcessContext* /*context*/, core::ProcessSession* /*session*/) {
+  void onTrigger(core::ProcessContext* /*context*/, core::ProcessSession* /*session*/) override {
   }
   // OnTrigger method, implemented by NiFi CompressContent
-  virtual void onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session);
+  void onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) override;
   // Initialize, over write by NiFi CompressContent
-  virtual void initialize(void);
+  void initialize() override;
 
 private:
   static std::string toMimeType(CompressionFormat format);

[nifi-minifi-cpp] 09/17: uint64_t -> size_t is narrowing on windows

Posted by sz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

szaszm pushed a commit to branch MINIFICPP-1507
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit c035ad43ac76dffa74dd58ba90cb9fc50a88f4cd
Author: Marton Szasz <sz...@gmail.com>
AuthorDate: Wed Mar 17 15:30:31 2021 +0100

    uint64_t -> size_t is narrowing on windows
---
 libminifi/src/io/BufferStream.cpp | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/libminifi/src/io/BufferStream.cpp b/libminifi/src/io/BufferStream.cpp
index be0b68b..bd0a9d2 100644
--- a/libminifi/src/io/BufferStream.cpp
+++ b/libminifi/src/io/BufferStream.cpp
@@ -38,7 +38,7 @@ int BufferStream::write(const uint8_t *value, int size) {
 
 size_t BufferStream::read(uint8_t *buf, size_t len) {
   const auto bytes_available_in_buffer = buffer_.size() - readOffset_;
-  const auto readlen = std::min(len, size_t{bytes_available_in_buffer});
+  const auto readlen = std::min(len, gsl::narrow<size_t>(bytes_available_in_buffer));
   const auto begin = buffer_.begin() + gsl::narrow<decltype(buffer_)::difference_type>(readOffset_);
   std::copy(begin, begin + gsl::narrow<decltype(buffer_)::difference_type>(readlen), buf);
 

[nifi-minifi-cpp] 12/17: edited the wrong PutSQL.cpp

Posted by sz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

szaszm pushed a commit to branch MINIFICPP-1507
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit 6753eb1de6aac46e4d54ad4ef3f35a84c17ff679
Author: Marton Szasz <sz...@gmail.com>
AuthorDate: Thu Mar 18 13:52:14 2021 +0100

    edited the wrong PutSQL.cpp
---
 extensions/sql/processors/PutSQL.cpp | 1 -
 1 file changed, 1 deletion(-)

diff --git a/extensions/sql/processors/PutSQL.cpp b/extensions/sql/processors/PutSQL.cpp
index 9b44c5b..ee8b4ce 100644
--- a/extensions/sql/processors/PutSQL.cpp
+++ b/extensions/sql/processors/PutSQL.cpp
@@ -33,7 +33,6 @@
 #include "Exception.h"
 #include "data/DatabaseConnectors.h"
 #include "data/JSONSQLWriter.h"
-#include "utils/gsl.h"
 
 namespace org {
 namespace apache {

[nifi-minifi-cpp] 03/17: fix compilation on mac

Posted by sz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

szaszm pushed a commit to branch MINIFICPP-1507
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit a4b439cabd02866fd8519417886c02c335f8fe11
Author: Marton Szasz <sz...@gmail.com>
AuthorDate: Tue Mar 16 12:30:22 2021 +0100

    fix compilation on mac
---
 libminifi/include/sitetosite/SiteToSiteClient.h | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/libminifi/include/sitetosite/SiteToSiteClient.h b/libminifi/include/sitetosite/SiteToSiteClient.h
index 7a41e81..be1d327 100644
--- a/libminifi/include/sitetosite/SiteToSiteClient.h
+++ b/libminifi/include/sitetosite/SiteToSiteClient.h
@@ -288,7 +288,7 @@ class WriteCallback : public OutputStreamCallback {
     uint64_t len = _packet->_size;
     uint64_t total = 0;
     while (len > 0) {
-      const auto size = std::min(len, size_t{16384});
+      const auto size = std::min(len, uint64_t{16384});
       const auto ret = _packet->transaction_->getStream().read(buffer, size);
       if (ret != size) {
         logging::LOG_ERROR(_packet->logger_reference_) << "Site2Site Receive Flow Size " << size << " Failed " << ret << ", should have received " << len;

[nifi-minifi-cpp] 17/17: mqtt: defer casting read result to int

Posted by sz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

szaszm pushed a commit to branch MINIFICPP-1507
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit ea2b397fc5d4d45888194649f8fbe233655c5a9c
Author: Marton Szasz <sz...@gmail.com>
AuthorDate: Thu Mar 25 12:20:49 2021 +0100

    mqtt: defer casting read result to int
---
 extensions/mqtt/processors/PublishMQTT.h | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)

diff --git a/extensions/mqtt/processors/PublishMQTT.h b/extensions/mqtt/processors/PublishMQTT.h
index 745246f..164ade0 100644
--- a/extensions/mqtt/processors/PublishMQTT.h
+++ b/extensions/mqtt/processors/PublishMQTT.h
@@ -87,15 +87,15 @@ class PublishMQTT : public processors::AbstractMQTTProcessor {
       status_ = 0;
       while (read_size_ < flow_size_) {
         // MQTTClient_message::payloadlen is int, so we can't handle 2GB+
-        int readRet = static_cast<int>(stream->read(&buffer[0], max_seg_size_));
-        if (readRet < 0) {
+        const auto readRet = stream->read(&buffer[0], max_seg_size_);
+        if (io::isError(readRet)) {
           status_ = -1;
           return gsl::narrow<int64_t>(read_size_);
         }
         if (readRet > 0) {
           MQTTClient_message pubmsg = MQTTClient_message_initializer;
           pubmsg.payload = &buffer[0];
-          pubmsg.payloadlen = readRet;
+          pubmsg.payloadlen = gsl::narrow<int>(readRet);
           pubmsg.qos = qos_;
           pubmsg.retained = retain_;
           if (MQTTClient_publishMessage(client_, key_.c_str(), &pubmsg, &token_) != MQTTCLIENT_SUCCESS) {

[nifi-minifi-cpp] 05/17: fix compilation on some platforms with size_t not the same as uint64_t 2

Posted by sz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

szaszm pushed a commit to branch MINIFICPP-1507
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit d4c4d80dc39ea25cfeebcf0a18543943681ce259
Author: Marton Szasz <sz...@gmail.com>
AuthorDate: Tue Mar 16 14:12:35 2021 +0100

    fix compilation on some platforms with size_t not the same as uint64_t 2
---
 extensions/standard-processors/processors/ExtractText.cpp | 7 +++++--
 1 file changed, 5 insertions(+), 2 deletions(-)

diff --git a/extensions/standard-processors/processors/ExtractText.cpp b/extensions/standard-processors/processors/ExtractText.cpp
index 04e5e91..888ec47 100644
--- a/extensions/standard-processors/processors/ExtractText.cpp
+++ b/extensions/standard-processors/processors/ExtractText.cpp
@@ -161,8 +161,11 @@ int64_t ExtractText::ReadCallback::process(const std::shared_ptr<io::BaseStream>
     bool repeatingcapture;
     ctx_->getProperty(EnableRepeatingCaptureGroup.getName(), repeatingcapture);
 
-    size_t maxCaptureSize;
-    ctx_->getProperty(MaxCaptureGroupLen.getName(), maxCaptureSize);
+    const size_t maxCaptureSize = [this]{
+      uint64_t val;
+      ctx_->getProperty(MaxCaptureGroupLen.getName(), val);
+      return size_t{val};
+    }();
 
     std::string contentStr = contentStream.str();
 

[nifi-minifi-cpp] 01/17: convert InputStream::read to size_t

Posted by sz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

szaszm pushed a commit to branch MINIFICPP-1507
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit 5c41fff9c012860cebb8c830c38057b93efef7dc
Author: Marton Szasz <sz...@gmail.com>
AuthorDate: Fri Mar 12 14:10:55 2021 +0100

    convert InputStream::read to size_t
---
 extensions/aws/processors/PutS3Object.h            | 11 ++--
 extensions/civetweb/processors/ListenHTTP.h        | 17 +++---
 extensions/http-curl/client/HTTPCallback.h         |  2 +-
 extensions/http-curl/client/HTTPStream.cpp         |  9 ++--
 extensions/http-curl/client/HTTPStream.h           |  4 +-
 extensions/http-curl/tests/CivetStream.h           |  8 ++-
 extensions/http-curl/tests/HTTPHandlers.h          |  9 ++--
 extensions/jni/jvm/JniReferenceObjects.h           |  3 +-
 extensions/libarchive/CompressContent.h            | 49 +++++++++--------
 extensions/libarchive/FocusArchiveEntry.cpp        |  9 ++--
 extensions/librdkafka/PublishKafka.cpp             |  7 ++-
 extensions/mqtt/processors/ConvertJSONAck.h        | 13 ++---
 extensions/mqtt/processors/ConvertUpdate.cpp       |  2 +-
 extensions/mqtt/processors/PublishMQTT.h           | 18 ++++---
 extensions/opc/src/putopc.cpp                      | 14 ++---
 extensions/rocksdb-repos/RocksDbStream.cpp         | 35 +++++-------
 extensions/rocksdb-repos/RocksDbStream.h           |  4 +-
 extensions/script/python/PyBaseStream.cpp          |  8 ++-
 extensions/script/python/PyBaseStream.h            |  2 +-
 extensions/sftp/client/SFTPClient.cpp              | 14 ++---
 .../standard-processors/processors/ExtractText.cpp | 22 ++++----
 .../standard-processors/processors/LogAttribute.h  | 20 ++++---
 .../standard-processors/processors/PutFile.cpp     | 25 ++++-----
 .../standard-processors/processors/PutFile.h       |  2 +-
 libminifi/include/io/AtomicEntryStream.h           | 13 +++--
 libminifi/include/io/BufferStream.h                |  6 +--
 libminifi/include/io/CRCStream.h                   |  6 +--
 libminifi/include/io/ClientSocket.h                |  4 +-
 libminifi/include/io/DescriptorStream.h            |  4 +-
 libminifi/include/io/FileStream.h                  |  4 +-
 libminifi/include/io/InputStream.h                 | 16 +++---
 libminifi/include/io/Stream.h                      |  2 +-
 libminifi/include/io/StreamPipe.h                  | 12 ++---
 libminifi/include/io/tls/SecureDescriptorStream.h  |  4 +-
 libminifi/include/io/tls/TLSSocket.h               |  4 +-
 libminifi/include/provenance/Provenance.h          | 16 +++---
 libminifi/include/sitetosite/Peer.h                |  2 +-
 libminifi/include/sitetosite/SiteToSiteClient.h    | 55 ++++++++-----------
 libminifi/include/utils/ByteArrayCallback.h        |  7 +--
 libminifi/include/utils/Enum.h                     |  4 +-
 libminifi/include/utils/FileOutputCallback.h       |  2 +-
 libminifi/src/FlowFileRecord.cpp                   | 14 +++--
 libminifi/src/c2/ControllerSocketProtocol.cpp      | 63 +++++++++++-----------
 libminifi/src/core/ProcessSession.cpp              | 10 ++--
 libminifi/src/core/ProcessSessionReadCallback.cpp  | 13 ++---
 libminifi/src/io/BufferStream.cpp                  | 15 +++---
 libminifi/src/io/ClientSocket.cpp                  | 21 ++++----
 libminifi/src/io/DescriptorStream.cpp              | 15 +++---
 libminifi/src/io/FileStream.cpp                    | 22 ++++----
 libminifi/src/io/InputStream.cpp                   | 27 +++++-----
 libminifi/src/io/tls/SecureDescriptorStream.cpp    | 41 +++++++-------
 libminifi/src/io/tls/TLSSocket.cpp                 | 36 ++++++-------
 libminifi/src/provenance/Provenance.cpp            | 32 ++++++-----
 libminifi/src/sitetosite/RawSocketProtocol.cpp     | 35 ++++++------
 libminifi/src/sitetosite/SiteToSiteClient.cpp      | 35 ++++++------
 libminifi/src/utils/ByteArrayCallback.cpp          | 10 ++--
 libminifi/src/utils/FileOutputCallback.cpp         |  2 +-
 libminifi/test/BufferReader.h                      | 10 ++--
 .../test/archive-tests/CompressContentTests.cpp    | 40 ++++++--------
 libminifi/test/archive-tests/MergeFileTests.cpp    | 10 ++--
 .../test/rocksdb-tests/ContentSessionTests.cpp     |  9 ++--
 .../rocksdb-tests/DBContentRepositoryTests.cpp     |  4 +-
 .../test/rocksdb-tests/RocksDBStreamTests.cpp      |  2 +-
 libminifi/test/unit/FileStreamTests.cpp            | 28 +++++-----
 libminifi/test/unit/SiteToSiteHelper.h             |  2 +-
 nanofi/src/api/nanofi.cpp                          |  2 +-
 nanofi/tests/CSite2SiteTests.cpp                   | 11 ++--
 67 files changed, 458 insertions(+), 519 deletions(-)

diff --git a/extensions/aws/processors/PutS3Object.h b/extensions/aws/processors/PutS3Object.h
index ad3072a..e55ac1e 100644
--- a/extensions/aws/processors/PutS3Object.h
+++ b/extensions/aws/processors/PutS3Object.h
@@ -31,6 +31,7 @@
 
 #include "S3Processor.h"
 #include "utils/GeneralUtils.h"
+#include "utils/gsl.h"
 
 template<typename T>
 class S3TestsFixture;
@@ -94,20 +95,20 @@ class PutS3Object : public S3Processor {
       auto data_stream = std::make_shared<std::stringstream>();
       read_size_ = 0;
       while (read_size_ < flow_size_) {
-        auto next_read_size = (std::min)(flow_size_ - read_size_, BUFFER_SIZE);
-        int read_ret = stream->read(buffer, next_read_size);
-        if (read_ret < 0) {
+        const auto next_read_size = (std::min)(flow_size_ - read_size_, BUFFER_SIZE);
+        const auto read_ret = stream->read(buffer.data(), next_read_size);
+        if (read_ret == static_cast<size_t>(-1)) {
           return -1;
         }
         if (read_ret > 0) {
-          data_stream->write(reinterpret_cast<char*>(buffer.data()), next_read_size);
+          data_stream->write(reinterpret_cast<char*>(buffer.data()), gsl::narrow<std::streamsize>(next_read_size));
           read_size_ += read_ret;
         } else {
           break;
         }
       }
       result_ = s3_wrapper_.putObject(options_, data_stream);
-      return read_size_;
+      return gsl::narrow<int64_t>(read_size_);
     }
 
     uint64_t flow_size_;
diff --git a/extensions/civetweb/processors/ListenHTTP.h b/extensions/civetweb/processors/ListenHTTP.h
index adf4a76..960185c 100644
--- a/extensions/civetweb/processors/ListenHTTP.h
+++ b/extensions/civetweb/processors/ListenHTTP.h
@@ -32,6 +32,7 @@
 #include "core/Resource.h"
 #include "core/logging/LoggerConfiguration.h"
 #include "utils/MinifiConcurrentQueue.h"
+#include "utils/gsl.h"
 
 namespace org {
 namespace apache {
@@ -48,7 +49,7 @@ class ListenHTTP : public core::Processor {
   /*!
    * Create a new processor
    */
-  ListenHTTP(const std::string& name, const utils::Identifier& uuid = {})
+  explicit ListenHTTP(const std::string& name, const utils::Identifier& uuid = {})
       : Processor(name, uuid),
         logger_(logging::LoggerFactory<ListenHTTP>::getLogger()),
         batch_size_(0) {
@@ -56,7 +57,7 @@ class ListenHTTP : public core::Processor {
     callbacks_.log_access = &logAccess;
   }
   // Destructor
-  virtual ~ListenHTTP();
+  ~ListenHTTP() override;
   // Processor Name
   static constexpr char const *ProcessorName = "ListenHTTP";
   // Supported Properties
@@ -131,14 +132,12 @@ class ListenHTTP : public core::Processor {
     }
     int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
       out_str_->resize(stream->size());
-      uint64_t num_read = stream->read(reinterpret_cast<uint8_t *>(&(*out_str_)[0]),
-                                           gsl::narrow<int>(stream->size()));
-
+      const auto num_read = stream->read(reinterpret_cast<uint8_t *>(&(*out_str_)[0]), stream->size());
       if (num_read != stream->size()) {
         throw std::runtime_error("GraphReadCallback failed to fully read flow file input stream");
       }
 
-      return num_read;
+      return gsl::narrow<int64_t>(num_read);
     }
 
    private:
@@ -148,7 +147,7 @@ class ListenHTTP : public core::Processor {
   // Write callback for transferring data from HTTP request to content repo
   class WriteCallback : public OutputStreamCallback {
    public:
-    WriteCallback(std::unique_ptr<io::BufferStream>);
+    explicit WriteCallback(std::unique_ptr<io::BufferStream>);
     int64_t process(const std::shared_ptr<io::BaseStream>& stream) override;
 
    private:
@@ -159,11 +158,11 @@ class ListenHTTP : public core::Processor {
     try {
       struct mg_context* ctx = mg_get_context(conn);
       /* CivetServer stores 'this' as the userdata when calling mg_start */
-      CivetServer* server = static_cast<CivetServer*>(mg_get_user_data(ctx));
+      auto* const server = static_cast<CivetServer*>(mg_get_user_data(ctx));
       if (server == nullptr) {
         return 0;
       }
-      std::shared_ptr<logging::Logger>* logger = static_cast<std::shared_ptr<logging::Logger>*>(const_cast<void*>(server->getUserContext()));
+      auto* const logger = static_cast<std::shared_ptr<logging::Logger>*>(const_cast<void*>(server->getUserContext()));
       if (logger == nullptr) {
         return 0;
       }
diff --git a/extensions/http-curl/client/HTTPCallback.h b/extensions/http-curl/client/HTTPCallback.h
index 921a8c5..242990e 100644
--- a/extensions/http-curl/client/HTTPCallback.h
+++ b/extensions/http-curl/client/HTTPCallback.h
@@ -83,7 +83,7 @@ class HttpStreamingCallback : public ByteInputCallBack {
 
     if (stream->size() > 0) {
       vec.resize(stream->size());
-      stream->read(reinterpret_cast<uint8_t*>(vec.data()), gsl::narrow<int>(stream->size()));
+      stream->read(reinterpret_cast<uint8_t*>(vec.data()), stream->size());
     }
 
     return processInner(std::move(vec));
diff --git a/extensions/http-curl/client/HTTPStream.cpp b/extensions/http-curl/client/HTTPStream.cpp
index 5ba7f3f..f9b8f49 100644
--- a/extensions/http-curl/client/HTTPStream.cpp
+++ b/extensions/http-curl/client/HTTPStream.cpp
@@ -51,7 +51,7 @@ void HttpStream::close() {
   http_read_callback_.close();
 }
 
-void HttpStream::seek(uint64_t /*offset*/) {
+void HttpStream::seek(size_t /*offset*/) {
   // seek is an unnecessary part of this implementatino
   throw std::logic_error{"HttpStream::seek is unimplemented"};
 }
@@ -81,8 +81,7 @@ int HttpStream::write(const uint8_t *value, int size) {
   }
 }
 
-int HttpStream::read(uint8_t *buf, int buflen) {
-  gsl_Expects(buflen >= 0);
+size_t HttpStream::read(uint8_t *buf, size_t buflen) {
   if (buflen == 0) {
     return 0;
   }
@@ -97,10 +96,10 @@ int HttpStream::read(uint8_t *buf, int buflen) {
         started_ = true;
       }
     }
-    return gsl::narrow<int>(http_read_callback_.readFully((char*) buf, buflen));
+    return http_read_callback_.readFully(reinterpret_cast<char*>(buf), buflen);
 
   } else {
-    return -1;
+    return static_cast<size_t>(-1);
   }
 }
 
diff --git a/extensions/http-curl/client/HTTPStream.h b/extensions/http-curl/client/HTTPStream.h
index 711d1d2..8d49d02 100644
--- a/extensions/http-curl/client/HTTPStream.h
+++ b/extensions/http-curl/client/HTTPStream.h
@@ -77,7 +77,7 @@ class HttpStream : public io::BaseStream {
    * Skip to the specified offset.
    * @param offset offset to which we will skip
    */
-  void seek(uint64_t offset) override;
+  void seek(size_t offset) override;
 
   size_t size() const override {
     return written;
@@ -91,7 +91,7 @@ class HttpStream : public io::BaseStream {
    * @param buf buffer in which we extract data
    * @param buflen
    */
-  int read(uint8_t *buf, int buflen) override;
+  size_t read(uint8_t *buf, size_t buflen) override;
 
   /**
    * writes value to stream
diff --git a/extensions/http-curl/tests/CivetStream.h b/extensions/http-curl/tests/CivetStream.h
index 9cbba27..e333b54 100644
--- a/extensions/http-curl/tests/CivetStream.h
+++ b/extensions/http-curl/tests/CivetStream.h
@@ -28,6 +28,8 @@
 #include "io/BaseStream.h"
 #include "civetweb.h"
 #include "CivetServer.h"
+#include "utils/gsl.h"
+
 namespace org {
 namespace apache {
 namespace nifi {
@@ -49,8 +51,10 @@ class CivetStream : public io::InputStream {
    * @param buf buffer in which we extract data
    * @param buflen
    */
-  int read(uint8_t *buf, int buflen) override {
-    return mg_read(conn, buf, buflen);
+  size_t read(uint8_t *buf, size_t buflen) override {
+    const auto ret = mg_read(conn, buf, buflen);
+    if (ret < 0) return static_cast<size_t>(-1);
+    return gsl::narrow_cast<size_t>(ret);
   }
 
  protected:
diff --git a/extensions/http-curl/tests/HTTPHandlers.h b/extensions/http-curl/tests/HTTPHandlers.h
index cedb834..5c49205 100644
--- a/extensions/http-curl/tests/HTTPHandlers.h
+++ b/extensions/http-curl/tests/HTTPHandlers.h
@@ -214,7 +214,7 @@ class FlowFileResponder : public ServerAwareHandler {
       minifi::io::CivetStream civet_stream(conn);
       minifi::io::CRCStream < minifi::io::CivetStream > stream(gsl::make_not_null(&civet_stream));
       uint32_t num_attributes;
-      int read;
+      size_t read;
       uint64_t total_size = 0;
       read = stream.read(num_attributes);
       if(!isServerRunning())return false;
@@ -241,9 +241,10 @@ class FlowFileResponder : public ServerAwareHandler {
       flow->data.resize(gsl::narrow<size_t>(length));
       flow->total_size = total_size;
 
-      read = stream.read(flow->data.data(), gsl::narrow<int>(length));
-      if(!isServerRunning())return false;
-      assert(read == gsl::narrow<int>(length));
+      read = stream.read(flow->data.data(), length);
+      if (!isServerRunning()) return false;
+      (void)read;
+      assert(read == length);
 
       if (!invalid_checksum) {
         site2site_rest_resp = std::to_string(stream.getCRC());
diff --git a/extensions/jni/jvm/JniReferenceObjects.h b/extensions/jni/jvm/JniReferenceObjects.h
index 67798e9..f81391b 100644
--- a/extensions/jni/jvm/JniReferenceObjects.h
+++ b/extensions/jni/jvm/JniReferenceObjects.h
@@ -142,7 +142,8 @@ class JniByteInputStream : public minifi::InputStreamCallback {
     int writtenOffset = 0;
     int read = 0;
     do {
-      int actual = stream_->read(buffer_, std::min(remaining, buffer_size_));
+      // JNI takes size as int, there's not much we can do here to support 2GB+ sizes
+      int actual = static_cast<int>(stream_->read(buffer_, std::min(remaining, buffer_size_)));
       if (actual <= 0) {
         if (read == 0) {
           stream_ = nullptr;
diff --git a/extensions/libarchive/CompressContent.h b/extensions/libarchive/CompressContent.h
index fc7851e..df7fc75 100644
--- a/extensions/libarchive/CompressContent.h
+++ b/extensions/libarchive/CompressContent.h
@@ -21,6 +21,7 @@
 #define __COMPRESS_CONTENT_H__
 
 #include <cinttypes>
+#include <utility>
 
 #include "archive_entry.h"
 #include "archive.h"
@@ -56,7 +57,7 @@ public:
     , encapsulateInTar_(false) {
   }
   // Destructor
-  virtual ~CompressContent() = default;
+  ~CompressContent() override = default;
   // Processor Name
   static constexpr char const* ProcessorName = "CompressContent";
   // Supported Properties
@@ -94,8 +95,8 @@ public:
     ReadCallbackCompress(std::shared_ptr<core::FlowFile> &flow, struct archive *arch, struct archive_entry *entry) :
         flow_(flow), arch_(arch), entry_(entry), status_(0), logger_(logging::LoggerFactory<CompressContent>::getLogger()) {
     }
-    ~ReadCallbackCompress() = default;
-    int64_t process(const std::shared_ptr<io::BaseStream>& stream) {
+    ~ReadCallbackCompress() override = default;
+    int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
       uint8_t buffer[4096U];
       int64_t ret = 0;
       uint64_t read_size = 0;
@@ -107,13 +108,13 @@ public:
         return -1;
       }
       while (read_size < flow_->getSize()) {
-        ret = stream->read(buffer, sizeof(buffer));
-        if (ret < 0) {
+        const auto readret = stream->read(buffer, sizeof(buffer));
+        if (readret == static_cast<size_t>(-1)) {
           status_ = -1;
           return -1;
         }
-        if (ret > 0) {
-          ret = archive_write_data(arch_, buffer, gsl::narrow<size_t>(ret));
+        if (readret > 0) {
+          ret = archive_write_data(arch_, buffer, readret);
           if (ret < 0) {
             logger_->log_error("Compress Content archive error %s", archive_error_string(arch_));
             status_ = -1;
@@ -124,7 +125,7 @@ public:
           break;
         }
       }
-      return read_size;
+      return gsl::narrow<int64_t>(read_size);
     }
     std::shared_ptr<core::FlowFile> flow_;
     struct archive *arch_;
@@ -135,25 +136,23 @@ public:
   // Nest Callback Class for read stream from flow for decompress
   class ReadCallbackDecompress: public InputStreamCallback {
   public:
-    ReadCallbackDecompress(const std::shared_ptr<core::FlowFile> &flow) :
-        read_size_(0), offset_(0), flow_(flow) {
-      origin_offset_ = flow_->getOffset();
+    explicit ReadCallbackDecompress(std::shared_ptr<core::FlowFile> flow) :
+        flow_(std::move(flow)) {
     }
-    ~ReadCallbackDecompress() = default;
-    int64_t process(const std::shared_ptr<io::BaseStream>& stream) {
+    ~ReadCallbackDecompress() override = default;
+    int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
       read_size_ = 0;
       stream->seek(offset_);
-      int readRet = stream->read(buffer_, sizeof(buffer_));
+      const auto readRet = stream->read(buffer_, sizeof(buffer_));
       read_size_ = readRet;
-      if (readRet > 0) {
-        offset_ += read_size_;
+      if (readRet != static_cast<size_t>(-1)) {
+        offset_ += readRet;
       }
-      return readRet;
+      return gsl::narrow<int64_t>(readRet);
     }
-    int64_t read_size_;
-    uint8_t buffer_[8192];
-    uint64_t offset_;
-    uint64_t origin_offset_;
+    size_t read_size_ = 0;
+    uint8_t buffer_[8192] = {0};
+    size_t offset_ = 0;
     std::shared_ptr<core::FlowFile> flow_;
   };
   // Nest Callback Class for write stream
@@ -383,8 +382,8 @@ public:
           std::vector<uint8_t> buffer(16 * 1024U);
           int64_t read_size = 0;
           while (read_size < gsl::narrow<int64_t>(writer_.flow_->getSize())) {
-            int ret = inputStream->read(buffer.data(), gsl::narrow<int>(buffer.size()));
-            if (ret < 0) {
+            const auto ret = inputStream->read(buffer.data(), buffer.size());
+            if (ret == static_cast<size_t>(-1)) {
               return -1;
             } else if (ret == 0) {
               break;
@@ -414,7 +413,7 @@ public:
 
       success_ = filterStream->isFinished();
 
-      return flow_->getSize();
+      return gsl::narrow<int64_t>(flow_->getSize());
     }
   };
 
@@ -440,7 +439,7 @@ private:
   void processFlowFile(const std::shared_ptr<core::FlowFile>& flowFile, const std::shared_ptr<core::ProcessSession>& session);
 
   std::shared_ptr<logging::Logger> logger_;
-  int compressLevel_;
+  int compressLevel_{};
   CompressionMode compressMode_;
   ExtendedCompressionFormat compressFormat_;
   bool updateFileName_;
diff --git a/extensions/libarchive/FocusArchiveEntry.cpp b/extensions/libarchive/FocusArchiveEntry.cpp
index 59a43e0..9452dce 100644
--- a/extensions/libarchive/FocusArchiveEntry.cpp
+++ b/extensions/libarchive/FocusArchiveEntry.cpp
@@ -34,6 +34,7 @@
 #include "core/ProcessContext.h"
 #include "core/ProcessSession.h"
 #include "Exception.h"
+#include "utils/gsl.h"
 
 namespace org {
 namespace apache {
@@ -150,20 +151,20 @@ typedef struct {
 la_ssize_t FocusArchiveEntry::ReadCallback::read_cb(struct archive * a, void *d, const void **buf) {
   auto data = static_cast<FocusArchiveEntryReadData *>(d);
   *buf = data->buf;
-  int read = 0;
-  int last_read = 0;
+  size_t read = 0;
+  size_t last_read = 0;
 
   do {
     last_read = data->stream->read(reinterpret_cast<uint8_t *>(data->buf), 8196 - read);
     read += last_read;
-  } while (data->processor->isRunning() && last_read > 0 && read < 8196);
+  } while (data->processor->isRunning() && last_read > 0 && last_read != static_cast<size_t>(-1) && read < 8196);
 
   if (!data->processor->isRunning()) {
     archive_set_error(a, EINTR, "Processor shut down during read");
     return -1;
   }
 
-  return read;
+  return gsl::narrow<la_ssize_t>(read);
 }
 
 int64_t FocusArchiveEntry::ReadCallback::process(const std::shared_ptr<io::BaseStream>& stream) {
diff --git a/extensions/librdkafka/PublishKafka.cpp b/extensions/librdkafka/PublishKafka.cpp
index b83d028..bbb66a8 100644
--- a/extensions/librdkafka/PublishKafka.cpp
+++ b/extensions/librdkafka/PublishKafka.cpp
@@ -409,14 +409,13 @@ class ReadCallback : public InputStreamCallback {
     }
 
     for (size_t segment_num = 0; read_size_ < flow_size_; ++segment_num) {
-      const int readRet = stream->read(buffer.data(), gsl::narrow<int>(buffer.size()));
-      if (readRet < 0) {
+      const auto readRet = stream->read(buffer.data(), buffer.size());
+      if (readRet == static_cast<size_t>(-1)) {
         status_ = -1;
         error_ = "Failed to read from stream";
         return read_size_;
       }
-
-      if (readRet <= 0) { break; }
+      if (readRet == 0) { break; }
 
       const auto err = produce(segment_num, buffer, readRet);
       if (err) {
diff --git a/extensions/mqtt/processors/ConvertJSONAck.h b/extensions/mqtt/processors/ConvertJSONAck.h
index 47f53d0..659af59 100644
--- a/extensions/mqtt/processors/ConvertJSONAck.h
+++ b/extensions/mqtt/processors/ConvertJSONAck.h
@@ -31,6 +31,8 @@
 #include "MQTTClient.h"
 #include "c2/protocols/RESTProtocol.h"
 #include "ConvertBase.h"
+#include "utils/gsl.h"
+
 namespace org {
 namespace apache {
 namespace nifi {
@@ -52,7 +54,7 @@ class ConvertJSONAck : public ConvertBase {
         logger_(logging::LoggerFactory<ConvertJSONAck>::getLogger()) {
   }
   // Destructor
-  virtual ~ConvertJSONAck() = default;
+  ~ConvertJSONAck() override = default;
   // Processor Name
   static constexpr char const* ProcessorName = "ConvertJSONAck";
 
@@ -72,14 +74,13 @@ class ConvertJSONAck : public ConvertBase {
   class ReadCallback : public InputStreamCallback {
    public:
     ReadCallback() = default;
-    ~ReadCallback() = default;
-    int64_t process(const std::shared_ptr<io::BaseStream>& stream) {
-      int64_t ret = 0;
+    ~ReadCallback() override = default;
+    int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
       if (nullptr == stream)
         return 0;
       buffer_.resize(stream->size());
-      ret = stream->read(reinterpret_cast<uint8_t*>(buffer_.data()), stream->size());
-      return ret;
+      const auto ret = stream->read(reinterpret_cast<uint8_t*>(buffer_.data()), stream->size());
+      return gsl::narrow<int64_t>(ret);
     }
     std::vector<char> buffer_;
   };
diff --git a/extensions/mqtt/processors/ConvertUpdate.cpp b/extensions/mqtt/processors/ConvertUpdate.cpp
index ca5a8d5..69da1fc 100644
--- a/extensions/mqtt/processors/ConvertUpdate.cpp
+++ b/extensions/mqtt/processors/ConvertUpdate.cpp
@@ -37,7 +37,7 @@ void ConvertUpdate::onTrigger(const std::shared_ptr<core::ProcessContext> &conte
   bool received_update = false;
   while (mqtt_service_->get(100, listening_topic, update)) {
     // first we have the input topic string followed by the update URI
-    if (update.size() > 0) {
+    if (!update.empty()) {
 
       io::BufferStream stream(update.data(), update.size());
 
diff --git a/extensions/mqtt/processors/PublishMQTT.h b/extensions/mqtt/processors/PublishMQTT.h
index fdad09f..ba14c0f 100644
--- a/extensions/mqtt/processors/PublishMQTT.h
+++ b/extensions/mqtt/processors/PublishMQTT.h
@@ -20,6 +20,8 @@
 #ifndef __PUBLISH_MQTT_H__
 #define __PUBLISH_MQTT_H__
 
+#include <limits>
+
 #include "FlowFileRecord.h"
 #include "core/Processor.h"
 #include "core/ProcessSession.h"
@@ -50,7 +52,7 @@ class PublishMQTT : public processors::AbstractMQTTProcessor {
     max_seg_size_ = ULLONG_MAX;
   }
   // Destructor
-  virtual ~PublishMQTT() = default;
+  ~PublishMQTT() override = default;
   // Processor Name
   static constexpr char const* ProcessorName = "PublishMQTT";
   // Supported Properties
@@ -74,18 +76,20 @@ class PublishMQTT : public processors::AbstractMQTTProcessor {
       status_ = 0;
       read_size_ = 0;
     }
-    ~ReadCallback() = default;
-    int64_t process(const std::shared_ptr<io::BaseStream>& stream) {
+    ~ReadCallback() override = default;
+    int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
       if (flow_size_ < max_seg_size_)
         max_seg_size_ = flow_size_;
+      gsl_Expects(max_seg_size_ < gsl::narrow<uint64_t>(std::numeric_limits<int>::max()));
       std::vector<unsigned char> buffer(max_seg_size_);
       read_size_ = 0;
       status_ = 0;
       while (read_size_ < flow_size_) {
-        int readRet = stream->read(&buffer[0], max_seg_size_);
+        // MQTTClient_message::payloadlen is int, so we can't handle 2GB+
+        int readRet = static_cast<int>(stream->read(&buffer[0], max_seg_size_));
         if (readRet < 0) {
           status_ = -1;
-          return read_size_;
+          return gsl::narrow<int64_t>(read_size_);
         }
         if (readRet > 0) {
           MQTTClient_message pubmsg = MQTTClient_message_initializer;
@@ -97,12 +101,12 @@ class PublishMQTT : public processors::AbstractMQTTProcessor {
             status_ = -1;
             return -1;
           }
-          read_size_ += readRet;
+          read_size_ += gsl::narrow<size_t>(readRet);
         } else {
           break;
         }
       }
-      return read_size_;
+      return gsl::narrow<int64_t>(read_size_);
     }
     uint64_t flow_size_;
     uint64_t max_seg_size_;
diff --git a/extensions/opc/src/putopc.cpp b/extensions/opc/src/putopc.cpp
index 1f3fe63..e613d1b 100644
--- a/extensions/opc/src/putopc.cpp
+++ b/extensions/opc/src/putopc.cpp
@@ -428,21 +428,15 @@ namespace processors {
     uint64_t size = 0;
 
     do {
-      int read = stream->read(buf_.data() + size, 1024);
-
-      if (read < 0) {
-        return -1;
-      }
-
-      if (read == 0) {
-        break;
-      }
+      const auto read = stream->read(buf_.data() + size, 1024);
+      if (read == static_cast<size_t>(-1)) return -1;
+      if (read == 0) break;
       size += read;
     } while (size < stream->size());
 
     logger_->log_trace("Read %llu bytes from flowfile content to buffer", stream->size());
 
-    return size;
+    return gsl::narrow<int64_t>(size);
   }
 
 } /* namespace processors */
diff --git a/extensions/rocksdb-repos/RocksDbStream.cpp b/extensions/rocksdb-repos/RocksDbStream.cpp
index 86daa20..27b342e 100644
--- a/extensions/rocksdb-repos/RocksDbStream.cpp
+++ b/extensions/rocksdb-repos/RocksDbStream.cpp
@@ -47,7 +47,7 @@ RocksDbStream::RocksDbStream(std::string path, gsl::not_null<minifi::internal::R
 void RocksDbStream::close() {
 }
 
-void RocksDbStream::seek(uint64_t /*offset*/) {
+void RocksDbStream::seek(size_t /*offset*/) {
   // noop
 }
 
@@ -84,28 +84,17 @@ int RocksDbStream::write(const uint8_t *value, int size) {
   }
 }
 
-int RocksDbStream::read(uint8_t *buf, int buflen) {
-  gsl_Expects(buflen >= 0);
-  if (!exists_) {
-    return -1;
-  }
-  if (buflen == 0) {
-    return 0;
-  }
-  if (!IsNullOrEmpty(buf)) {
-    size_t amtToRead = gsl::narrow<size_t>(buflen);
-    if (offset_ >= value_.size()) {
-      return 0;
-    }
-    if (amtToRead > value_.size() - offset_) {
-      amtToRead = value_.size() - offset_;
-    }
-    std::memcpy(buf, value_.data() + offset_, amtToRead);
-    offset_ += amtToRead;
-    return gsl::narrow<int>(amtToRead);
-  } else {
-    return -1;
-  }
+size_t RocksDbStream::read(uint8_t *buf, size_t buflen) {
+  // The check have to be in this order for RocksDBStreamTest "Read zero bytes" to succeed
+  if (!exists_) return static_cast<size_t>(-1);
+  if (buflen == 0) return 0;
+  if (IsNullOrEmpty(buf)) return static_cast<size_t>(-1);
+  if (offset_ >= value_.size()) return 0;
+
+  const auto amtToRead = std::min(buflen, value_.size() - offset_);
+  std::memcpy(buf, value_.data() + offset_, amtToRead);
+  offset_ += amtToRead;
+  return amtToRead;
 }
 
 } /* namespace io */
diff --git a/extensions/rocksdb-repos/RocksDbStream.h b/extensions/rocksdb-repos/RocksDbStream.h
index 8e47b51..d051802 100644
--- a/extensions/rocksdb-repos/RocksDbStream.h
+++ b/extensions/rocksdb-repos/RocksDbStream.h
@@ -56,7 +56,7 @@ class RocksDbStream : public io::BaseStream {
    * Skip to the specified offset.
    * @param offset offset to which we will skip
    */
-  void seek(uint64_t offset) override;
+  void seek(size_t offset) override;
 
   size_t size() const override {
     return size_;
@@ -70,7 +70,7 @@ class RocksDbStream : public io::BaseStream {
    * @param buf buffer in which we extract data
    * @param buflen
    */
-  int read(uint8_t *buf, int buflen) override;
+  size_t read(uint8_t *buf, size_t buflen) override;
 
   /**
    * writes value to stream
diff --git a/extensions/script/python/PyBaseStream.cpp b/extensions/script/python/PyBaseStream.cpp
index fcde480..1733994 100644
--- a/extensions/script/python/PyBaseStream.cpp
+++ b/extensions/script/python/PyBaseStream.cpp
@@ -49,13 +49,11 @@ py::bytes PyBaseStream::read(size_t len) {
 
   std::vector<uint8_t> buffer(len);
 
-  auto read = stream_->read(buffer.data(), static_cast<int>(len));
-  auto result = py::bytes(reinterpret_cast<char *>(buffer.data()), static_cast<size_t>(read));
-
-  return result;
+  const auto read = stream_->read(buffer.data(), len);
+  return py::bytes(reinterpret_cast<char *>(buffer.data()), static_cast<size_t>(read));
 }
 
-size_t PyBaseStream::write(py::bytes buf) {
+size_t PyBaseStream::write(const py::bytes& buf) {
   const auto &&buf_str = buf.operator std::string();
   return static_cast<size_t>(stream_->write(reinterpret_cast<uint8_t *>(const_cast<char *>(buf_str.data())),
                                                 static_cast<int>(buf_str.length())));
diff --git a/extensions/script/python/PyBaseStream.h b/extensions/script/python/PyBaseStream.h
index f07fe80..1f11065 100644
--- a/extensions/script/python/PyBaseStream.h
+++ b/extensions/script/python/PyBaseStream.h
@@ -37,7 +37,7 @@ class PyBaseStream {
 
   py::bytes read();
   py::bytes read(size_t len = 0);
-  size_t write(py::bytes buf);
+  size_t write(const py::bytes& buf);
 
  private:
   std::shared_ptr<io::BaseStream> stream_;
diff --git a/extensions/sftp/client/SFTPClient.cpp b/extensions/sftp/client/SFTPClient.cpp
index e657d2e..8d528ab 100644
--- a/extensions/sftp/client/SFTPClient.cpp
+++ b/extensions/sftp/client/SFTPClient.cpp
@@ -562,12 +562,12 @@ bool SFTPClient::putFile(const std::string& path, io::BaseStream& input, bool ov
     return true;
   }
 
-  const size_t buf_size = expected_size < 0 ? MAX_BUFFER_SIZE : std::min<size_t>(expected_size, MAX_BUFFER_SIZE);
+  const size_t buf_size = expected_size < 0 ? MAX_BUFFER_SIZE : std::min(gsl::narrow<size_t>(expected_size), MAX_BUFFER_SIZE);
   std::vector<uint8_t> buf(buf_size);
   uint64_t total_read = 0U;
   do {
-    int read_ret = input.read(buf.data(), buf.size());
-    if (read_ret < 0) {
+    const auto read_ret = input.read(buf.data(), buf.size());
+    if (read_ret == static_cast<size_t>(-1)) {
       last_error_.setLibssh2Error(LIBSSH2_FX_OK);
       logger_->log_error("Error while reading input");
       return false;
@@ -577,20 +577,20 @@ bool SFTPClient::putFile(const std::string& path, io::BaseStream& input, bool ov
     }
     logger_->log_trace("Read %d bytes", read_ret);
     total_read += read_ret;
-    ssize_t remaining = read_ret;
+    auto remaining = gsl::narrow<size_t>(read_ret);
     while (remaining > 0) {
-      int write_ret = libssh2_sftp_write(file_handle, reinterpret_cast<char*>(buf.data() + (read_ret - remaining)), remaining);
+      const auto write_ret = libssh2_sftp_write(file_handle, reinterpret_cast<char*>(buf.data() + (read_ret - remaining)), remaining);
       if (write_ret < 0) {
         last_error_.setSftpError(SFTPError::IoError);
         logger_->log_error("Failed to write remote file \"%s\"", path.c_str());
         return false;
       }
       logger_->log_trace("Wrote %d bytes to remote file \"%s\"", write_ret, path.c_str());
-      remaining -= write_ret;
+      remaining -= gsl::narrow<size_t>(write_ret);
     }
   } while (true);
 
-  if (expected_size >= 0 && total_read != gsl::narrow<uint64_t>(expected_size)) {
+  if (expected_size >= 0 && total_read != gsl::narrow<size_t>(expected_size)) {
     last_error_.setLibssh2Error(LIBSSH2_FX_OK);
     logger_->log_error("Input has unexpected size, expected: %ld, actual: %lu", path.c_str(), expected_size, total_read);
     return false;
diff --git a/extensions/standard-processors/processors/ExtractText.cpp b/extensions/standard-processors/processors/ExtractText.cpp
index 345b84a..04e5e91 100644
--- a/extensions/standard-processors/processors/ExtractText.cpp
+++ b/extensions/standard-processors/processors/ExtractText.cpp
@@ -113,10 +113,9 @@ void ExtractText::onTrigger(core::ProcessContext *context, core::ProcessSession
 }
 
 int64_t ExtractText::ReadCallback::process(const std::shared_ptr<io::BaseStream>& stream) {
-  int64_t ret = 0;
-  uint64_t read_size = 0;
+  size_t read_size = 0;
   bool regex_mode;
-  uint64_t size_limit = flowFile_->getSize();
+  size_t size_limit = flowFile_->getSize();
 
   std::string attrKey, sizeLimitStr;
   ctx_->getProperty(Attribute.getName(), attrKey);
@@ -126,22 +125,22 @@ int64_t ExtractText::ReadCallback::process(const std::shared_ptr<io::BaseStream>
   if (sizeLimitStr.empty())
     size_limit = DEFAULT_SIZE_LIMIT;
   else if (sizeLimitStr != "0")
-    size_limit = std::stoi(sizeLimitStr);
+    size_limit = gsl::narrow_cast<size_t>(std::stoi(sizeLimitStr));
 
   std::ostringstream contentStream;
 
   while (read_size < size_limit) {
     // Don't read more than config limit or the size of the buffer
-    int length = gsl::narrow<int>(std::min<uint64_t>(size_limit - read_size, buffer_.size()));
-    ret = stream->read(buffer_, length);
+    const auto length = std::min(size_limit - read_size, buffer_.size());
+    const auto ret = stream->read(buffer_, length);
 
-    if (ret < 0) {
+    if (ret == static_cast<size_t>(-1)) {
       return -1;  // Stream error
     } else if (ret == 0) {
       break;  // End of stream, no more data
     }
 
-    contentStream.write(reinterpret_cast<const char*>(buffer_.data()), ret);
+    contentStream.write(reinterpret_cast<const char*>(buffer_.data()), gsl::narrow<std::streamsize>(ret));
     read_size += ret;
     if (contentStream.fail()) {
       return -1;
@@ -162,9 +161,8 @@ int64_t ExtractText::ReadCallback::process(const std::shared_ptr<io::BaseStream>
     bool repeatingcapture;
     ctx_->getProperty(EnableRepeatingCaptureGroup.getName(), repeatingcapture);
 
-    int maxCaptureSizeProperty;
-    ctx_->getProperty(MaxCaptureGroupLen.getName(), maxCaptureSizeProperty);
-    size_t maxCaptureSize = gsl::narrow<size_t>(maxCaptureSizeProperty);
+    size_t maxCaptureSize;
+    ctx_->getProperty(MaxCaptureGroupLen.getName(), maxCaptureSize);
 
     std::string contentStr = contentStream.str();
 
@@ -212,7 +210,7 @@ int64_t ExtractText::ReadCallback::process(const std::shared_ptr<io::BaseStream>
   } else {
     flowFile_->setAttribute(attrKey, contentStream.str());
   }
-  return read_size;
+  return gsl::narrow<int64_t>(read_size);
 }
 
 ExtractText::ReadCallback::ReadCallback(std::shared_ptr<core::FlowFile> flowFile, core::ProcessContext *ctx,  std::shared_ptr<logging::Logger> lgr)
diff --git a/extensions/standard-processors/processors/LogAttribute.h b/extensions/standard-processors/processors/LogAttribute.h
index 6d4b2ef..7a7d920 100644
--- a/extensions/standard-processors/processors/LogAttribute.h
+++ b/extensions/standard-processors/processors/LogAttribute.h
@@ -47,14 +47,14 @@ class LogAttribute : public core::Processor {
    * Create a new processor
    */
   explicit LogAttribute(const std::string& name, const utils::Identifier& uuid = {})
-      : Processor(std::move(name), uuid),
+      : Processor(name, uuid),
         flowfiles_to_log_(1),
         hexencode_(false),
         max_line_length_(80U),
         logger_(logging::LoggerFactory<LogAttribute>::getLogger()) {
   }
   // Destructor
-  virtual ~LogAttribute() = default;
+  ~LogAttribute() override = default;
   // Processor Name
   static constexpr char const* ProcessorName = "LogAttribute";
   // Supported Properties
@@ -103,16 +103,14 @@ class LogAttribute : public core::Processor {
         : logger_(std::move(logger))
         , buffer_(size)  {
     }
-    int64_t process(const std::shared_ptr<io::BaseStream>& stream) {
-      if (buffer_.size() == 0U) {
-        return 0U;
-      }
-      int ret = stream->read(buffer_.data(), gsl::narrow<int>(buffer_.size()));
-      if (ret < 0 || static_cast<uint64_t>(ret) != buffer_.size()) {
-        logger_->log_error("%zu bytes were requested from the stream but %d bytes were read. Rolling back.", buffer_.size(), ret);
+    int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
+      if (buffer_.empty()) return 0U;
+      const auto ret = stream->read(buffer_.data(), buffer_.size());
+      if (ret != buffer_.size()) {
+        logger_->log_error("%zu bytes were requested from the stream but %zu bytes were read. Rolling back.", buffer_.size(), size_t{ret});
         throw Exception(PROCESSOR_EXCEPTION, "Failed to read the entire FlowFile.");
       }
-      return buffer_.size();
+      return gsl::narrow<int64_t>(buffer_.size());
     }
     std::shared_ptr<logging::Logger> logger_;
     std::vector<uint8_t> buffer_;
@@ -123,7 +121,7 @@ class LogAttribute : public core::Processor {
   // OnTrigger method, implemented by NiFi LogAttribute
   void onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) override;
   // Initialize, over write by NiFi LogAttribute
-  void initialize(void) override;
+  void initialize() override;
 
  private:
   uint64_t flowfiles_to_log_;
diff --git a/extensions/standard-processors/processors/PutFile.cpp b/extensions/standard-processors/processors/PutFile.cpp
index 3765856..2060867 100644
--- a/extensions/standard-processors/processors/PutFile.cpp
+++ b/extensions/standard-processors/processors/PutFile.cpp
@@ -25,10 +25,12 @@
 #include <memory>
 #include <string>
 #include <set>
+#include <utility>
 #ifdef WIN32
 #include <Windows.h>
 #endif
 #include "utils/file/FileUtils.h"
+#include "utils/gsl.h"
 
 namespace org {
 namespace apache {
@@ -304,9 +306,9 @@ void PutFile::getDirectoryPermissions(core::ProcessContext *context) {
 }
 #endif
 
-PutFile::ReadCallback::ReadCallback(const std::string &tmp_file, const std::string &dest_file)
-    : tmp_file_(tmp_file),
-      dest_file_(dest_file) {
+PutFile::ReadCallback::ReadCallback(std::string tmp_file, std::string dest_file)
+    : tmp_file_(std::move(tmp_file)),
+      dest_file_(std::move(dest_file)) {
 }
 
 // Copy the entire file contents to the temporary file
@@ -319,17 +321,10 @@ int64_t PutFile::ReadCallback::process(const std::shared_ptr<io::BaseStream>& st
   std::ofstream tmp_file_os(tmp_file_, std::ios::out | std::ios::binary);
 
   do {
-    int read = stream->read(buffer, 1024);
-
-    if (read < 0) {
-      return -1;
-    }
-
-    if (read == 0) {
-      break;
-    }
-
-    tmp_file_os.write(reinterpret_cast<char *>(buffer), read);
+    const auto read = stream->read(buffer, 1024);
+    if (read == static_cast<size_t>(-1)) return -1;
+    if (read == 0) break;
+    tmp_file_os.write(reinterpret_cast<char *>(buffer), gsl::narrow<std::streamsize>(read));
     size += read;
   } while (size < stream->size());
 
@@ -339,7 +334,7 @@ int64_t PutFile::ReadCallback::process(const std::shared_ptr<io::BaseStream>& st
     write_succeeded_ = true;
   }
 
-  return size;
+  return gsl::narrow<int64_t>(size);
 }
 
 // Renames tmp file to final destination
diff --git a/extensions/standard-processors/processors/PutFile.h b/extensions/standard-processors/processors/PutFile.h
index c09bffb..487213b 100644
--- a/extensions/standard-processors/processors/PutFile.h
+++ b/extensions/standard-processors/processors/PutFile.h
@@ -82,7 +82,7 @@ class PutFile : public core::Processor {
 
   class ReadCallback : public InputStreamCallback {
    public:
-    ReadCallback(const std::string &tmp_file, const std::string &dest_file);
+    ReadCallback(std::string tmp_file, std::string dest_file);
     ~ReadCallback() override;
     int64_t process(const std::shared_ptr<io::BaseStream>& stream) override;
     bool commit();
diff --git a/libminifi/include/io/AtomicEntryStream.h b/libminifi/include/io/AtomicEntryStream.h
index 75b2d4c..e33b3c5 100644
--- a/libminifi/include/io/AtomicEntryStream.h
+++ b/libminifi/include/io/AtomicEntryStream.h
@@ -64,7 +64,7 @@ class AtomicEntryStream : public BaseStream {
    * Skip to the specified offset.
    * @param offset offset to which we will skip
    */
-  void seek(uint64_t offset) override;
+  void seek(size_t offset) override;
 
   size_t size() const override {
     return length_;
@@ -75,7 +75,7 @@ class AtomicEntryStream : public BaseStream {
    * @param buf buffer in which we extract data
    * @param buflen
    */
-  int read(uint8_t *buf, int buflen) override;
+  size_t read(uint8_t *buf, size_t buflen) override;
 
   /**
    * writes value to stream
@@ -103,7 +103,7 @@ AtomicEntryStream<T>::~AtomicEntryStream() {
 }
 
 template<typename T>
-void AtomicEntryStream<T>::seek(uint64_t offset) {
+void AtomicEntryStream<T>::seek(size_t offset) {
   std::lock_guard<std::recursive_mutex> lock(entry_lock_);
   offset_ = gsl::narrow<size_t>(offset);
 }
@@ -129,14 +129,13 @@ int AtomicEntryStream<T>::write(const uint8_t *value, int size) {
 }
 
 template<typename T>
-int AtomicEntryStream<T>::read(uint8_t *buf, int buflen) {
-  gsl_Expects(buflen >= 0);
+size_t AtomicEntryStream<T>::read(uint8_t *buf, size_t buflen) {
   if (buflen == 0) {
     return 0;
   }
   if (nullptr != buf && !invalid_stream_) {
     std::lock_guard<std::recursive_mutex> lock(entry_lock_);
-    int len = buflen;
+    auto len = buflen;
     core::repository::RepoValue<T> *value;
     if (entry_->getValue(key_, &value)) {
       if (offset_ + len > value->getBufferSize()) {
@@ -152,7 +151,7 @@ int AtomicEntryStream<T>::read(uint8_t *buf, int buflen) {
       return len;
     }
   }
-  return -1;
+  return static_cast<size_t>(-1);
 }
 
 }  // namespace io
diff --git a/libminifi/include/io/BufferStream.h b/libminifi/include/io/BufferStream.h
index 3f0bfdc..7ab2b9e 100644
--- a/libminifi/include/io/BufferStream.h
+++ b/libminifi/include/io/BufferStream.h
@@ -35,7 +35,7 @@ class BufferStream : public BaseStream {
  public:
   BufferStream() = default;
 
-  BufferStream(const uint8_t *buf, const unsigned int len) {
+  BufferStream(const uint8_t *buf, const size_t len) {
     write(buf, len);
   }
 
@@ -48,7 +48,7 @@ class BufferStream : public BaseStream {
 
   int write(const uint8_t* data, int len) final;
 
-  int read(uint8_t* buffer, int len) override;
+  size_t read(uint8_t* buffer, size_t len) override;
 
   int initialize() override {
     buffer_.clear();
@@ -56,7 +56,7 @@ class BufferStream : public BaseStream {
     return 0;
   }
 
-  void seek(uint64_t offset) override {
+  void seek(size_t offset) override {
     readOffset_ += offset;
   }
 
diff --git a/libminifi/include/io/CRCStream.h b/libminifi/include/io/CRCStream.h
index 2204537..abe0762 100644
--- a/libminifi/include/io/CRCStream.h
+++ b/libminifi/include/io/CRCStream.h
@@ -84,9 +84,9 @@ class InputCRCStream : public virtual CRCStreamBase<StreamType>, public InputStr
  public:
   using InputStream::read;
 
-  int read(uint8_t *buf, int buflen) override {
-    int ret = child_stream_->read(buf, buflen);
-    if (ret > 0) {
+  size_t read(uint8_t *buf, size_t buflen) override {
+    const auto ret = child_stream_->read(buf, buflen);
+    if (ret > 0 && ret != static_cast<size_t>(-1)) {
       crc_ = crc32(crc_, buf, ret);
     }
     return ret;
diff --git a/libminifi/include/io/ClientSocket.h b/libminifi/include/io/ClientSocket.h
index 37626c8..398a1d5 100644
--- a/libminifi/include/io/ClientSocket.h
+++ b/libminifi/include/io/ClientSocket.h
@@ -159,7 +159,7 @@ class Socket : public BaseStream {
    * @param buflen
    * @param retrieve_all_bytes determines if we should read all bytes before returning
    */
-  int read(uint8_t *buf, int buflen) override {
+  size_t read(uint8_t *buf, size_t buflen) override {
     return read(buf, buflen, true);
   }
 
@@ -169,7 +169,7 @@ class Socket : public BaseStream {
    * @param buflen
    * @param retrieve_all_bytes determines if we should read all bytes before returning
    */
-  virtual int read(uint8_t *buf, int buflen, bool retrieve_all_bytes);
+  virtual size_t read(uint8_t *buf, size_t buflen, bool retrieve_all_bytes);
 
  protected:
   /**
diff --git a/libminifi/include/io/DescriptorStream.h b/libminifi/include/io/DescriptorStream.h
index a7ea713..ee5aec7 100644
--- a/libminifi/include/io/DescriptorStream.h
+++ b/libminifi/include/io/DescriptorStream.h
@@ -52,14 +52,14 @@ class DescriptorStream : public io::BaseStream {
    * Skip to the specified offset.
    * @param offset offset to which we will skip
    */
-  void seek(uint64_t offset) override;
+  void seek(size_t offset) override;
 
   /**
    * Reads data and places it into buf
    * @param buf buffer in which we extract data
    * @param buflen
    */
-  int read(uint8_t *buf, int buflen) override;
+  size_t read(uint8_t *buf, size_t buflen) override;
 
   /**
    * writes value to stream
diff --git a/libminifi/include/io/FileStream.h b/libminifi/include/io/FileStream.h
index 3156a30..ad7f0dc 100644
--- a/libminifi/include/io/FileStream.h
+++ b/libminifi/include/io/FileStream.h
@@ -63,7 +63,7 @@ class FileStream : public io::BaseStream {
    * Skip to the specified offset.
    * @param offset offset to which we will skip
    */
-  void seek(uint64_t offset) override;
+  void seek(size_t offset) override;
 
   size_t size() const override {
     return length_;
@@ -77,7 +77,7 @@ class FileStream : public io::BaseStream {
    * @param buf buffer in which we extract data
    * @param buflen
    */
-  int read(uint8_t *buf, int buflen) override;
+  size_t read(uint8_t *buf, size_t buflen) override;
 
   /**
    * writes value to stream
diff --git a/libminifi/include/io/InputStream.h b/libminifi/include/io/InputStream.h
index 60a297c..cebf266 100644
--- a/libminifi/include/io/InputStream.h
+++ b/libminifi/include/io/InputStream.h
@@ -39,32 +39,32 @@ class InputStream : public virtual Stream {
    * reads a byte array from the stream
    * @param value reference in which will set the result
    * @param len length to read
-   * @return resulting read size
+   * @return resulting read size or static_cast<size_t>(-1) on error or static_cast<size_t>(-2) for ClientSocket EAGAIN
    **/
-  virtual int read(uint8_t *value, int len) = 0;
+  virtual size_t read(uint8_t *value, size_t len) = 0;
 
-  int read(std::vector<uint8_t>& buffer, int len);
+  size_t read(std::vector<uint8_t>& buffer, size_t len);
 
   /**
    * read string from stream
    * @param str reference string
    * @return resulting read size
    **/
-  int read(std::string &str, bool widen = false);
+  size_t read(std::string &str, bool widen = false);
 
   /**
    * read a bool from stream
    * @param value reference to the output
    * @return resulting read size
    **/
-  int read(bool& value);
+  size_t read(bool& value);
 
   /**
    * read a uuid from stream
    * @param value reference to the output
    * @return resulting read size
    **/
-  int read(utils::Identifier& value);
+  size_t read(utils::Identifier& value);
 
   /**
   * reads sizeof(Integral) bytes from the stream
@@ -72,10 +72,10 @@ class InputStream : public virtual Stream {
   * @return resulting read size
   **/
   template<typename Integral, typename = std::enable_if<std::is_unsigned<Integral>::value && !std::is_same<Integral, bool>::value>>
-  int read(Integral& value) {
+  size_t read(Integral& value) {
     uint8_t buf[sizeof(Integral)]{};
     if (read(buf, sizeof(Integral)) != sizeof(Integral)) {
-      return -1;
+      return static_cast<size_t>(-1);
     }
 
     value = 0;
diff --git a/libminifi/include/io/Stream.h b/libminifi/include/io/Stream.h
index e067b52..cb528e5 100644
--- a/libminifi/include/io/Stream.h
+++ b/libminifi/include/io/Stream.h
@@ -31,7 +31,7 @@ class Stream {
  public:
   virtual void close() {}
 
-  virtual void seek(uint64_t /*offset*/) {
+  virtual void seek(size_t /*offset*/) {
     throw std::runtime_error("Seek is not supported");
   }
 
diff --git a/libminifi/include/io/StreamPipe.h b/libminifi/include/io/StreamPipe.h
index b3fafce..8c88568 100644
--- a/libminifi/include/io/StreamPipe.h
+++ b/libminifi/include/io/StreamPipe.h
@@ -49,14 +49,10 @@ inline int64_t pipe(const std::shared_ptr<io::InputStream>& src, const std::shar
   uint8_t buffer[4096U];
   int64_t totalTransferred = 0;
   while (true) {
-    int readRet = src->read(buffer, sizeof(buffer));
-    if (readRet < 0) {
-      return readRet;
-    }
-    if (readRet == 0) {
-      break;
-    }
-    int remaining = readRet;
+    const auto readRet = src->read(buffer, sizeof(buffer));
+    if (readRet == static_cast<size_t>(-1)) return -1;
+    if (readRet == 0) break;
+    auto remaining = readRet;
     int transferred = 0;
     while (remaining > 0) {
       int writeRet = dst->write(buffer + transferred, remaining);
diff --git a/libminifi/include/io/tls/SecureDescriptorStream.h b/libminifi/include/io/tls/SecureDescriptorStream.h
index 7699910..5f285c1 100644
--- a/libminifi/include/io/tls/SecureDescriptorStream.h
+++ b/libminifi/include/io/tls/SecureDescriptorStream.h
@@ -63,7 +63,7 @@ class SecureDescriptorStream : public io::BaseStream {
    * Skip to the specified offset.
    * @param offset offset to which we will skip
    */
-  void seek(uint64_t offset) override;
+  void seek(size_t offset) override;
 
   size_t size() const override {
     return -1;
@@ -74,7 +74,7 @@ class SecureDescriptorStream : public io::BaseStream {
    * @param buf buffer in which we extract data
    * @param buflen
    */
-  int read(uint8_t *buf, int buflen) override;
+  size_t read(uint8_t *buf, size_t buflen) override;
 
   /**
    * writes value to stream
diff --git a/libminifi/include/io/tls/TLSSocket.h b/libminifi/include/io/tls/TLSSocket.h
index 6bf741d..24df1d9 100644
--- a/libminifi/include/io/tls/TLSSocket.h
+++ b/libminifi/include/io/tls/TLSSocket.h
@@ -136,14 +136,14 @@ class TLSSocket : public Socket {
   using Socket::read;
   using Socket::write;
 
-  int read(uint8_t *buf, int buflen, bool retrieve_all_bytes) override;
+  size_t read(uint8_t *buf, size_t buflen, bool retrieve_all_bytes) override;
 
   /**
    * Reads data and places it into buf
    * @param buf buffer in which we extract data
    * @param buflen
    */
-  int read(uint8_t *buf, int buflen) override;
+  size_t read(uint8_t *buf, size_t buflen) override;
 
   /**
    * Write value to the stream using uint8_t ptr
diff --git a/libminifi/include/provenance/Provenance.h b/libminifi/include/provenance/Provenance.h
index 3b7b4f7..60ee8c2 100644
--- a/libminifi/include/provenance/Provenance.h
+++ b/libminifi/include/provenance/Provenance.h
@@ -363,25 +363,23 @@ class ProvenanceEventRecord : public core::SerializableComponent {
 
   uint64_t getEventTime(const uint8_t *buffer, const size_t bufferSize) {
     const auto size = std::min<size_t>(72, bufferSize);
-    org::apache::nifi::minifi::io::BufferStream outStream(buffer, gsl::narrow<int>(size));
+    org::apache::nifi::minifi::io::BufferStream outStream(buffer, size);
 
     std::string uuid;
-    int ret = outStream.read(uuid);
-
-    if (ret <= 0) {
+    const auto uuidret = outStream.read(uuid);
+    if (uuidret <= 0) {
       return 0;
     }
 
     uint32_t eventType;
-    ret = outStream.read(eventType);
-    if (ret != 4) {
+    const auto typeret = outStream.read(eventType);
+    if (typeret != 4) {
       return 0;
     }
 
     uint64_t event_time;
-
-    ret = outStream.read(event_time);
-    if (ret != 8) {
+    const auto timeret = outStream.read(event_time);
+    if (timeret != 8) {
       return 0;
     }
 
diff --git a/libminifi/include/sitetosite/Peer.h b/libminifi/include/sitetosite/Peer.h
index a82e3b4..d2138ea 100644
--- a/libminifi/include/sitetosite/Peer.h
+++ b/libminifi/include/sitetosite/Peer.h
@@ -299,7 +299,7 @@ class SiteToSitePeer : public org::apache::nifi::minifi::io::BaseStream {
     return stream_->write(data, len);
   }
 
-  int read(uint8_t* data, int len) override {
+  size_t read(uint8_t* data, size_t len) override {
     return stream_->read(data, len);
   }
 
diff --git a/libminifi/include/sitetosite/SiteToSiteClient.h b/libminifi/include/sitetosite/SiteToSiteClient.h
index fc1dc91..7a41e81 100644
--- a/libminifi/include/sitetosite/SiteToSiteClient.h
+++ b/libminifi/include/sitetosite/SiteToSiteClient.h
@@ -19,6 +19,7 @@
 #ifndef LIBMINIFI_INCLUDE_SITETOSITE_SITETOSITECLIENT_H_
 #define LIBMINIFI_INCLUDE_SITETOSITE_SITETOSITECLIENT_H_
 
+#include <algorithm>
 #include <map>
 #include <memory>
 #include <string>
@@ -30,6 +31,7 @@
 #include "core/ProcessSession.h"
 #include "core/ProcessContext.h"
 #include "core/Connectable.h"
+#include "utils/gsl.h"
 
 namespace org {
 namespace apache {
@@ -65,11 +67,6 @@ class SiteToSiteClient : public core::Connectable {
         _batchSendNanos(5000000000),
         ssl_context_service_(nullptr),
         logger_(logging::LoggerFactory<SiteToSiteClient>::getLogger()) {
-    _supportedVersion[0] = 5;
-    _supportedVersion[1] = 4;
-    _supportedVersion[2] = 3;
-    _supportedVersion[3] = 2;
-    _supportedVersion[4] = 1;
     _currentVersion = _supportedVersion[0];
     _currentVersionIndex = 0;
     _supportedCodecVersion[0] = 1;
@@ -77,7 +74,7 @@ class SiteToSiteClient : public core::Connectable {
     _currentCodecVersionIndex = 0;
   }
 
-  virtual ~SiteToSiteClient() = default;
+  ~SiteToSiteClient() override = default;
 
   void setSSLContextService(const std::shared_ptr<minifi::controllers::SSLContextService> &context_service) {
     ssl_context_service_ = context_service;
@@ -189,13 +186,13 @@ class SiteToSiteClient : public core::Connectable {
     return logger_;
   }
 
-  virtual void yield() {
+  void yield() override {
   }
 
   /**
    * Determines if we are connected and operating
    */
-  virtual bool isRunning() {
+  bool isRunning() override {
     return running_;
   }
 
@@ -203,7 +200,7 @@ class SiteToSiteClient : public core::Connectable {
    * Determines if work is available by this connectable
    * @return boolean if work is available.
    */
-  virtual bool isWorkAvailable() {
+  bool isWorkAvailable() override {
     return true;
   }
 
@@ -234,12 +231,12 @@ class SiteToSiteClient : public core::Connectable {
   virtual int writeResponse(const std::shared_ptr<Transaction> &transaction, RespondCode code, std::string message);
   // getRespondCodeContext
   virtual RespondCodeContext *getRespondCodeContext(RespondCode code) {
-    for (unsigned int i = 0; i < sizeof(SiteToSiteRequest::respondCodeContext) / sizeof(RespondCodeContext); i++) {
-      if (SiteToSiteRequest::respondCodeContext[i].code == code) {
-        return &SiteToSiteRequest::respondCodeContext[i];
+    for (auto & i : SiteToSiteRequest::respondCodeContext) {
+      if (i.code == code) {
+        return &i;
       }
     }
-    return NULL;
+    return nullptr;
   }
 
   // Peer State
@@ -254,7 +251,7 @@ class SiteToSiteClient : public core::Connectable {
   // Peer Connection
   std::unique_ptr<SiteToSitePeer> peer_;
 
-  std::atomic<bool> running_;
+  std::atomic<bool> running_{false};
 
   // transaction map
   std::map<utils::Identifier, std::shared_ptr<Transaction>> known_transactions_;
@@ -265,10 +262,10 @@ class SiteToSiteClient : public core::Connectable {
   /***
    * versioning
    */
-  uint32_t _supportedVersion[5];
+  uint32_t _supportedVersion[5] = {5, 4, 3, 2, 1};
   uint32_t _currentVersion;
   int _currentVersionIndex;
-  uint32_t _supportedCodecVersion[1];
+  uint32_t _supportedCodecVersion[1] = {1};
   uint32_t _currentCodecVersion;
   int _currentCodecVersionIndex;
 
@@ -286,13 +283,13 @@ class WriteCallback : public OutputStreamCallback {
   }
   DataPacket *_packet;
   // void process(std::ofstream *stream) {
-  int64_t process(const std::shared_ptr<io::BaseStream>& stream) {
+  int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
     uint8_t buffer[16384];
     uint64_t len = _packet->_size;
     uint64_t total = 0;
     while (len > 0) {
-      int size = len < 16384 ? static_cast<int>(len) : 16384;
-      int ret = _packet->transaction_->getStream().read(buffer, size);
+      const auto size = std::min(len, size_t{16384});
+      const auto ret = _packet->transaction_->getStream().read(buffer, size);
       if (ret != size) {
         logging::LOG_ERROR(_packet->logger_reference_) << "Site2Site Receive Flow Size " << size << " Failed " << ret << ", should have received " << len;
         return -1;
@@ -302,7 +299,7 @@ class WriteCallback : public OutputStreamCallback {
       total += size;
     }
     logging::LOG_INFO(_packet->logger_reference_) << "Received " << total << " from stream";
-    return len;
+    return gsl::narrow<int64_t>(len);
   }
 };
 // Nest Callback Class for read stream
@@ -312,21 +309,15 @@ class ReadCallback : public InputStreamCallback {
       : _packet(packet) {
   }
   DataPacket *_packet;
-  int64_t process(const std::shared_ptr<io::BaseStream>& stream) {
+  int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
     _packet->_size = 0;
     uint8_t buffer[8192] = { 0 };
-    int readSize;
     size_t size = 0;
     do {
-      readSize = stream->read(buffer, 8192);
-
-      if (readSize == 0) {
-        break;
-      }
-      if (readSize < 0) {
-        return -1;
-      }
-      int ret = _packet->transaction_->getStream().write(buffer, readSize);
+      const auto readSize = stream->read(buffer, 8192);
+      if (readSize == 0) break;
+      if (readSize == static_cast<size_t>(-1)) return -1;
+      const auto ret = _packet->transaction_->getStream().write(buffer, readSize);
       if (ret != readSize) {
         logging::LOG_INFO(_packet->logger_reference_) << "Site2Site Send Flow Size " << readSize << " Failed " << ret;
         return -1;
@@ -334,7 +325,7 @@ class ReadCallback : public InputStreamCallback {
       size += readSize;
     } while (size < stream->size());
     _packet->_size = size;
-    return size;
+    return gsl::narrow<int64_t>(size);
   }
 };
 
diff --git a/libminifi/include/utils/ByteArrayCallback.h b/libminifi/include/utils/ByteArrayCallback.h
index 84e3362..ace72b6 100644
--- a/libminifi/include/utils/ByteArrayCallback.h
+++ b/libminifi/include/utils/ByteArrayCallback.h
@@ -24,6 +24,7 @@
 #include "concurrentqueue.h"
 #include "FlowFileRecord.h"
 #include "core/logging/LoggerConfiguration.h"
+#include "utils/gsl.h"
 
 namespace org {
 namespace apache {
@@ -45,10 +46,10 @@ class ByteInputCallBack : public InputStreamCallback {
     if (stream->size() > 0) {
       vec.resize(stream->size());
 
-      stream->read(reinterpret_cast<uint8_t*>(vec.data()), gsl::narrow<int>(stream->size()));
+      stream->read(reinterpret_cast<uint8_t*>(vec.data()), stream->size());
     }
 
-    return vec.size();
+    return gsl::narrow<int64_t>(vec.size());
   }
 
   virtual void seek(size_t) { }
@@ -101,7 +102,7 @@ class ByteOutputCallback : public OutputStreamCallback {
 
   virtual int64_t process(const std::shared_ptr<io::BaseStream>& stream);
 
-  virtual const std::vector<char> to_string();
+  virtual std::vector<char> to_string();
 
   virtual void close();
 
diff --git a/libminifi/include/utils/Enum.h b/libminifi/include/utils/Enum.h
index 440b158..00facc4 100644
--- a/libminifi/include/utils/Enum.h
+++ b/libminifi/include/utils/Enum.h
@@ -72,8 +72,8 @@ namespace utils {
 
 #define SMART_ENUM_BODY(Clazz, ...) \
     constexpr Clazz(Type value = static_cast<Type>(-1)) : value_{value} {} \
-    Clazz(const std::string& str) : value_{parse(str.c_str()).value_} {} \
-    Clazz(const char* str) : value_{parse(str).value_} {} \
+    explicit Clazz(const std::string& str) : value_{parse(str.c_str()).value_} {} \
+    explicit Clazz(const char* str) : value_{parse(str).value_} {} \
    private: \
     Type value_; \
    public: \
diff --git a/libminifi/include/utils/FileOutputCallback.h b/libminifi/include/utils/FileOutputCallback.h
index 1343fae..ee68db5 100644
--- a/libminifi/include/utils/FileOutputCallback.h
+++ b/libminifi/include/utils/FileOutputCallback.h
@@ -51,7 +51,7 @@ class FileOutputCallback : public ByteOutputCallback {
 
   int64_t process(const std::shared_ptr<io::BaseStream>& stream) override;
 
-  const std::vector<char> to_string() override;
+  std::vector<char> to_string() override;
 
   void close() override;
 
diff --git a/libminifi/src/FlowFileRecord.cpp b/libminifi/src/FlowFileRecord.cpp
index 813137b..047c571 100644
--- a/libminifi/src/FlowFileRecord.cpp
+++ b/libminifi/src/FlowFileRecord.cpp
@@ -161,11 +161,9 @@ bool FlowFileRecord::Persist(const std::shared_ptr<core::Repository>& flowReposi
 }
 
 std::shared_ptr<FlowFileRecord> FlowFileRecord::DeSerialize(io::InputStream& inStream, const std::shared_ptr<core::ContentRepository>& content_repo, utils::Identifier& container) {
-  int ret;
-
   auto file = std::make_shared<FlowFileRecord>();
 
-  ret = inStream.read(file->event_time_);
+  auto ret = inStream.read(file->event_time_);
   if (ret != 8) {
     return {};
   }
@@ -181,12 +179,12 @@ std::shared_ptr<FlowFileRecord> FlowFileRecord::DeSerialize(io::InputStream& inS
   }
 
   ret = inStream.read(file->uuid_);
-  if (ret <= 0) {
+  if (ret == static_cast<size_t>(-1)) {
     return {};
   }
 
   ret = inStream.read(container);
-  if (ret <= 0) {
+  if (ret == static_cast<size_t>(-1)) {
     return {};
   }
 
@@ -200,12 +198,12 @@ std::shared_ptr<FlowFileRecord> FlowFileRecord::DeSerialize(io::InputStream& inS
   for (uint32_t i = 0; i < numAttributes; i++) {
     std::string key;
     ret = inStream.read(key, true);
-    if (ret <= 0) {
+    if (ret == static_cast<size_t>(-1)) {
       return {};
     }
     std::string value;
     ret = inStream.read(value, true);
-    if (ret <= 0) {
+    if (ret == static_cast<size_t>(-1)) {
       return {};
     }
     file->attributes_[key] = value;
@@ -213,7 +211,7 @@ std::shared_ptr<FlowFileRecord> FlowFileRecord::DeSerialize(io::InputStream& inS
 
   std::string content_full_path;
   ret = inStream.read(content_full_path);
-  if (ret <= 0) {
+  if (ret == static_cast<size_t>(-1)) {
     return {};
   }
 
diff --git a/libminifi/src/c2/ControllerSocketProtocol.cpp b/libminifi/src/c2/ControllerSocketProtocol.cpp
index 8322302..73b74ff 100644
--- a/libminifi/src/c2/ControllerSocketProtocol.cpp
+++ b/libminifi/src/c2/ControllerSocketProtocol.cpp
@@ -101,8 +101,8 @@ void ControllerSocketProtocol::initialize(core::controller::ControllerServicePro
         case Operation::START:
         {
           std::string componentStr;
-          int size = stream->read(componentStr);
-          if ( size != -1 ) {
+          const auto size = stream->read(componentStr);
+          if ( size != static_cast<size_t>(-1) ) {
             auto components = update_sink_->getComponents(componentStr);
             for (const auto& component : components) {
               component->start();
@@ -115,8 +115,8 @@ void ControllerSocketProtocol::initialize(core::controller::ControllerServicePro
         case Operation::STOP:
         {
           std::string componentStr;
-          int size = stream->read(componentStr);
-          if ( size != -1 ) {
+          const auto size = stream->read(componentStr);
+          if ( size != static_cast<size_t>(-1) ) {
             auto components = update_sink_->getComponents(componentStr);
             for (const auto& component : components) {
               component->stop();
@@ -129,8 +129,8 @@ void ControllerSocketProtocol::initialize(core::controller::ControllerServicePro
         case Operation::CLEAR:
         {
           std::string connection;
-          int size = stream->read(connection);
-          if ( size != -1 ) {
+          const auto size = stream->read(connection);
+          if ( size != static_cast<size_t>(-1) ) {
             update_sink_->clearConnection(connection);
           }
         }
@@ -138,18 +138,18 @@ void ControllerSocketProtocol::initialize(core::controller::ControllerServicePro
         case Operation::UPDATE:
         {
           std::string what;
-          int size = stream->read(what);
-          if (size == -1) {
+          const auto size = stream->read(what);
+          if (size == static_cast<size_t>(-1)) {
             logger_->log_debug("Connection broke");
             break;
           }
           if (what == "flow") {
             std::string ff_loc;
-            int size = stream->read(ff_loc);
+            const auto size = stream->read(ff_loc);
             std::ifstream tf(ff_loc);
             std::string configuration((std::istreambuf_iterator<char>(tf)),
                 std::istreambuf_iterator<char>());
-            if (size == -1) {
+            if (size == static_cast<size_t>(-1)) {
               logger_->log_debug("Connection broke");
               break;
             }
@@ -160,15 +160,15 @@ void ControllerSocketProtocol::initialize(core::controller::ControllerServicePro
         case Operation::DESCRIBE:
         {
           std::string what;
-          int size = stream->read(what);
-          if (size == -1) {
+          const auto size = stream->read(what);
+          if (size == static_cast<size_t>(-1)) {
             logger_->log_debug("Connection broke");
             break;
           }
           if (what == "queue") {
             std::string connection;
-            int size = stream->read(connection);
-            if (size == -1) {
+            const auto size_ = stream->read(connection);
+            if (size_ == static_cast<size_t>(-1)) {
               logger_->log_debug("Connection broke");
               break;
             }
@@ -184,8 +184,8 @@ void ControllerSocketProtocol::initialize(core::controller::ControllerServicePro
           } else if (what == "components") {
             io::BufferStream resp;
             resp.write(&head, 1);
-            uint16_t size = gsl::narrow<uint16_t>(update_sink_->getAllComponents().size());
-            resp.write(size);
+            const auto size_ = gsl::narrow<uint16_t>(update_sink_->getAllComponents().size());
+            resp.write(size_);
             for (const auto &component : update_sink_->getAllComponents()) {
               resp.write(component->getComponentName());
               resp.write(component->isRunning() ? "true" : "false");
@@ -210,8 +210,8 @@ void ControllerSocketProtocol::initialize(core::controller::ControllerServicePro
           } else if (what == "connections") {
             io::BufferStream resp;
             resp.write(&head, 1);
-            uint16_t size = gsl::narrow<uint16_t>(queue_full_.size());
-            resp.write(size);
+            const auto size_ = gsl::narrow<uint16_t>(queue_full_.size());
+            resp.write(size_);
             for (const auto &connection : queue_full_) {
               resp.write(connection.first, false);
             }
@@ -220,17 +220,17 @@ void ControllerSocketProtocol::initialize(core::controller::ControllerServicePro
             std::vector<std::string> full_connections;
             {
               std::lock_guard<std::mutex> lock(controller_mutex_);
-              for (auto conn : queue_full_) {
-                if (conn.second == true) {
+              for (const auto& conn : queue_full_) {
+                if (conn.second) {
                   full_connections.push_back(conn.first);
                 }
               }
             }
             io::BufferStream resp;
             resp.write(&head, 1);
-            uint16_t full_connection_count = gsl::narrow<uint16_t>(full_connections.size());
+            const auto full_connection_count = gsl::narrow<uint16_t>(full_connections.size());
             resp.write(full_connection_count);
-            for (auto conn : full_connections) {
+            for (const auto& conn : full_connections) {
               resp.write(conn);
             }
             stream->write(const_cast<uint8_t*>(resp.getBuffer()), gsl::narrow<int>(resp.size()));
@@ -248,11 +248,11 @@ void ControllerSocketProtocol::initialize(core::controller::ControllerServicePro
 void ControllerSocketProtocol::parse_content(const std::vector<C2ContentResponse> &content) {
   for (const auto &payload_content : content) {
     if (payload_content.name == "Components") {
-      for (auto content : payload_content.operation_arguments) {
+      for (const auto& content_ : payload_content.operation_arguments) {
         bool is_enabled = false;
-        minifi::utils::StringUtils::StringToBool(content.second.to_string(), is_enabled);
+        minifi::utils::StringUtils::StringToBool(content_.second.to_string(), is_enabled);
         std::lock_guard<std::mutex> lock(controller_mutex_);
-        component_map_[content.first] = is_enabled;
+        component_map_[content_.first] = is_enabled;
       }
     }
   }
@@ -261,21 +261,20 @@ void ControllerSocketProtocol::parse_content(const std::vector<C2ContentResponse
 int16_t ControllerSocketProtocol::heartbeat(const C2Payload &payload) {
   if (server_socket_ == nullptr)
     return 0;
-  const std::vector<C2ContentResponse> &content = payload.getContent();
   for (const auto &pc : payload.getNestedPayloads()) {
     if (pc.getLabel() == "flowInfo" || pc.getLabel() == "metrics") {
       for (const auto &metrics_payload : pc.getNestedPayloads()) {
         if (metrics_payload.getLabel() == "QueueMetrics" || metrics_payload.getLabel() == "queues") {
           for (const auto &queue_metrics : metrics_payload.getNestedPayloads()) {
-            auto metric_content = queue_metrics.getContent();
-            for (const auto &payload_content : queue_metrics.getContent()) {
+            const auto& metric_content = queue_metrics.getContent();
+            for (const auto &payload_content : metric_content) {
               uint64_t size = 0;
               uint64_t max = 0;
-              for (auto content : payload_content.operation_arguments) {
+              for (const auto& content : payload_content.operation_arguments) {
                 if (content.first == "datasize") {
-                  size = std::stol(content.second.to_string());
+                  size = std::stoull(content.second.to_string());
                 } else if (content.first == "datasizemax") {
-                  max = std::stol(content.second.to_string());
+                  max = std::stoull(content.second.to_string());
                 }
               }
               std::lock_guard<std::mutex> lock(controller_mutex_);
@@ -293,7 +292,7 @@ int16_t ControllerSocketProtocol::heartbeat(const C2Payload &payload) {
     }
   }
 
-  parse_content(content);
+  parse_content(payload.getContent());
 
   std::vector<uint8_t> buffer;
   buffer.resize(1024);
diff --git a/libminifi/src/core/ProcessSession.cpp b/libminifi/src/core/ProcessSession.cpp
index f8e8c31..6a4077e 100644
--- a/libminifi/src/core/ProcessSession.cpp
+++ b/libminifi/src/core/ProcessSession.cpp
@@ -325,8 +325,8 @@ void ProcessSession::importFrom(io::InputStream&& stream, const std::shared_ptr<
  *
  */
 void ProcessSession::importFrom(io::InputStream &stream, const std::shared_ptr<core::FlowFile> &flow) {
-  std::shared_ptr<ResourceClaim> claim = content_session_->create();
-  int max_read = getpagesize();
+  const std::shared_ptr<ResourceClaim> claim = content_session_->create();
+  const auto max_read = gsl::narrow_cast<size_t>(getpagesize());
   std::vector<uint8_t> charBuffer(max_read);
 
   try {
@@ -336,10 +336,10 @@ void ProcessSession::importFrom(io::InputStream &stream, const std::shared_ptr<c
     if (nullptr == content_stream) {
       throw Exception(FILE_OPERATION_EXCEPTION, "Could not obtain claim for " + claim->getContentFullPath());
     }
-    int position = 0;
-    const int max_size = gsl::narrow<int>(stream.size());
+    size_t position = 0;
+    const auto max_size = stream.size();
     while (position < max_size) {
-      const int read_size = std::min(max_read, max_size - position);
+      const auto read_size = std::min(max_read, max_size - position);
       stream.read(charBuffer, read_size);
 
       content_stream->write(charBuffer.data(), read_size);
diff --git a/libminifi/src/core/ProcessSessionReadCallback.cpp b/libminifi/src/core/ProcessSessionReadCallback.cpp
index 8a21f73..d62fac8 100644
--- a/libminifi/src/core/ProcessSessionReadCallback.cpp
+++ b/libminifi/src/core/ProcessSessionReadCallback.cpp
@@ -24,6 +24,7 @@
 
 #include "core/logging/LoggerConfiguration.h"
 #include "io/BaseStream.h"
+#include "utils/gsl.h"
 
 namespace org {
 namespace apache {
@@ -47,20 +48,16 @@ int64_t ProcessSessionReadCallback::process(const std::shared_ptr<io::BaseStream
   size_t size = 0;
   uint8_t buffer[8192];
   do {
-    int read = stream->read(buffer, 8192);
-    if (read < 0) {
-      return -1;
-    }
-    if (read == 0) {
-      break;
-    }
+    const auto read = stream->read(buffer, 8192);
+    if (read == static_cast<size_t>(-1)) return -1;
+    if (read == 0) break;
     if (!_tmpFileOs.write(reinterpret_cast<char*>(buffer), read)) {
       return -1;
     }
     size += read;
   } while (size < stream->size());
   _writeSucceeded = true;
-  return size;
+  return gsl::narrow<int64_t>(size);
 }
 
 // Renames tmp file to final destination
diff --git a/libminifi/src/io/BufferStream.cpp b/libminifi/src/io/BufferStream.cpp
index 6d3ebab..12220ef 100644
--- a/libminifi/src/io/BufferStream.cpp
+++ b/libminifi/src/io/BufferStream.cpp
@@ -36,17 +36,16 @@ int BufferStream::write(const uint8_t *value, int size) {
   return size;
 }
 
-int BufferStream::read(uint8_t *buf, int len) {
-  gsl_Expects(len >= 0);
-  int bytes_available_in_buffer = gsl::narrow<int>(buffer_.size() - readOffset_);
-  len = std::min(len, bytes_available_in_buffer);
-  auto begin = buffer_.begin() + readOffset_;
-  std::copy(begin, begin + len, buf);
+size_t BufferStream::read(uint8_t *buf, size_t len) {
+  const auto bytes_available_in_buffer = buffer_.size() - readOffset_;
+  const auto readlen = std::min(len, bytes_available_in_buffer);
+  auto begin = buffer_.begin() + gsl::narrow<decltype(buffer_)::difference_type>(readOffset_);
+  std::copy(begin, begin + gsl::narrow<decltype(buffer_)::difference_type>(readlen), buf);
 
   // increase offset for the next read
-  readOffset_ += len;
+  readOffset_ += readlen;
 
-  return len;
+  return readlen;
 }
 
 } /* namespace io */
diff --git a/libminifi/src/io/ClientSocket.cpp b/libminifi/src/io/ClientSocket.cpp
index 6427447..cc65a8b 100644
--- a/libminifi/src/io/ClientSocket.cpp
+++ b/libminifi/src/io/ClientSocket.cpp
@@ -47,6 +47,8 @@
 #include "core/logging/LoggerConfiguration.h"
 #include "utils/file/FileUtils.h"
 #include "utils/GeneralUtils.h"
+#include "utils/gsl.h"
+
 namespace util = org::apache::nifi::minifi::utils;
 namespace mio = org::apache::nifi::minifi::io;
 
@@ -523,9 +525,8 @@ int Socket::write(const uint8_t *value, int size) {
   return bytes;
 }
 
-int Socket::read(uint8_t *buf, int buflen, bool retrieve_all_bytes) {
-  gsl_Expects(buflen >= 0);
-  int32_t total_read = 0;
+size_t Socket::read(uint8_t *buf, size_t buflen, bool retrieve_all_bytes) {
+  size_t total_read = 0;
   while (buflen) {
     int16_t fd = select_descriptor(1000);
     if (fd < 0) {
@@ -533,9 +534,9 @@ int Socket::read(uint8_t *buf, int buflen, bool retrieve_all_bytes) {
         logger_->log_debug("fd %d close %i", fd, buflen);
         utils::file::FileUtils::close(socket_file_descriptor_);
       }
-      return -1;
+      return static_cast<size_t>(-1);
     }
-    int bytes_read = recv(fd, reinterpret_cast<char*>(buf), buflen, 0);
+    const auto bytes_read = recv(fd, reinterpret_cast<char*>(buf), buflen, 0);
     logger_->log_trace("Recv call %d", bytes_read);
     if (bytes_read <= 0) {
       if (bytes_read == 0) {
@@ -545,23 +546,23 @@ int Socket::read(uint8_t *buf, int buflen, bool retrieve_all_bytes) {
         int err = WSAGetLastError();
         if (err == WSAEWOULDBLOCK) {
           // continue
-          return -2;
+          return static_cast<size_t>(-2);
         }
         logger_->log_error("Could not recv on %d (port %d), error code: %d", fd, port_, err);
 #else
         if (errno == EAGAIN || errno == EWOULDBLOCK) {
           // continue
-          return -2;
+          return static_cast<size_t>(-2);
         }
         logger_->log_error("Could not recv on %d (port %d), error: %s", fd, port_, strerror(errno));
 
 #endif  // WIN32
       }
-      return -1;
+      return static_cast<size_t>(-1);
     }
-    buflen -= bytes_read;
+    buflen -= gsl::narrow<size_t>(bytes_read);
     buf += bytes_read;
-    total_read += bytes_read;
+    total_read += gsl::narrow<size_t>(bytes_read);
     if (!retrieve_all_bytes) {
       break;
     }
diff --git a/libminifi/src/io/DescriptorStream.cpp b/libminifi/src/io/DescriptorStream.cpp
index 2b5fc70..6330b5e 100644
--- a/libminifi/src/io/DescriptorStream.cpp
+++ b/libminifi/src/io/DescriptorStream.cpp
@@ -40,7 +40,7 @@ DescriptorStream::DescriptorStream(int fd)
       logger_(logging::LoggerFactory<DescriptorStream>::getLogger()) {
 }
 
-void DescriptorStream::seek(uint64_t offset) {
+void DescriptorStream::seek(size_t offset) {
   std::lock_guard<std::recursive_mutex> lock(file_lock_);
 #ifdef WIN32
   _lseeki64(fd_, gsl::narrow<int64_t>(offset), 0x00);
@@ -70,25 +70,24 @@ int DescriptorStream::write(const uint8_t *value, int size) {
   }
 }
 
-int DescriptorStream::read(uint8_t *buf, int buflen) {
-  gsl_Expects(buflen >= 0);
+size_t DescriptorStream::read(uint8_t *buf, size_t buflen) {
   if (buflen == 0) {
     return 0;
   }
   if (!IsNullOrEmpty(buf)) {
 #ifdef WIN32
-    auto size_read = _read(fd_, buf, buflen);
+    const auto size_read = _read(fd_, buf, buflen);
 #else
-    auto size_read = ::read(fd_, buf, buflen);
+    const auto size_read = ::read(fd_, buf, buflen);
 #endif
 
     if (size_read < 0) {
-      return -1;
+      return static_cast<size_t>(-1);
     }
-    return  size_read;
+    return gsl::narrow<size_t>(size_read);
 
   } else {
-    return -1;
+    return static_cast<size_t>(-1);
   }
 }
 
diff --git a/libminifi/src/io/FileStream.cpp b/libminifi/src/io/FileStream.cpp
index 4f759ae..c88f8dd 100644
--- a/libminifi/src/io/FileStream.cpp
+++ b/libminifi/src/io/FileStream.cpp
@@ -97,13 +97,13 @@ void FileStream::close() {
   file_stream_.reset();
 }
 
-void FileStream::seek(uint64_t offset) {
+void FileStream::seek(size_t offset) {
   std::lock_guard<std::mutex> lock(file_lock_);
   if (file_stream_ == nullptr || !file_stream_->is_open()) {
     logging::LOG_ERROR(logger_) << SEEK_ERROR_MSG << INVALID_FILE_STREAM_ERROR_MSG;
     return;
   }
-  offset_ = gsl::narrow<size_t>(offset);
+  offset_ = offset;
   file_stream_->clear();
   if (!file_stream_->seekg(offset_))
     logging::LOG_ERROR(logger_) << SEEK_ERROR_MSG << SEEKG_CALL_ERROR_MSG;
@@ -142,8 +142,7 @@ int FileStream::write(const uint8_t *value, int size) {
   }
 }
 
-int FileStream::read(uint8_t *buf, int buflen) {
-  gsl_Expects(buflen >= 0);
+size_t FileStream::read(uint8_t *buf, size_t buflen) {
   if (buflen == 0) {
     return 0;
   }
@@ -151,32 +150,31 @@ int FileStream::read(uint8_t *buf, int buflen) {
     std::lock_guard<std::mutex> lock(file_lock_);
     if (file_stream_ == nullptr || !file_stream_->is_open()) {
       logging::LOG_ERROR(logger_) << READ_ERROR_MSG << INVALID_FILE_STREAM_ERROR_MSG;
-      return -1;
+      return static_cast<size_t>(-1);
     }
-    file_stream_->read(reinterpret_cast<char*>(buf), buflen);
+    file_stream_->read(reinterpret_cast<char*>(buf), gsl::narrow<std::streamsize>(buflen));
     if (file_stream_->eof() || file_stream_->fail()) {
       file_stream_->clear();
       seekToEndOfFile(READ_ERROR_MSG);
       auto tellg_result = file_stream_->tellg();
       if (tellg_result == std::streampos(-1)) {
         logging::LOG_ERROR(logger_) << READ_ERROR_MSG << TELLG_CALL_ERROR_MSG;
-        return -1;
+        return static_cast<size_t>(-1);
       }
-      size_t len = gsl::narrow<size_t>(tellg_result);
+      const auto len = gsl::narrow<size_t>(tellg_result);
       size_t ret = len - offset_;
       offset_ = len;
       length_ = len;
       logging::LOG_DEBUG(logger_) << path_ << " eof bit, ended at " << offset_;
-      return gsl::narrow<int>(ret);
+      return ret;
     } else {
       offset_ += buflen;
-      file_stream_->seekp(offset_);
+      file_stream_->seekp(gsl::narrow<std::streamoff>(offset_));
       return buflen;
     }
-
   } else {
     logging::LOG_ERROR(logger_) << READ_ERROR_MSG << INVALID_BUFFER_ERROR_MSG;
-    return -1;
+    return static_cast<size_t>(-1);
   }
 }
 
diff --git a/libminifi/src/io/InputStream.cpp b/libminifi/src/io/InputStream.cpp
index cdd101e..cd4166b 100644
--- a/libminifi/src/io/InputStream.cpp
+++ b/libminifi/src/io/InputStream.cpp
@@ -30,42 +30,43 @@ namespace nifi {
 namespace minifi {
 namespace io {
 
-int InputStream::read(std::vector<uint8_t>& buffer, int len) {
-  if (buffer.size() < gsl::narrow<size_t>(len)) {
+size_t InputStream::read(std::vector<uint8_t>& buffer, size_t len) {
+  if (buffer.size() < len) {
     buffer.resize(len);
   }
-  int ret = read(buffer.data(), len);
-  buffer.resize((std::max)(ret, 0));
+  const auto ret = read(buffer.data(), len);
+  if (ret == static_cast<size_t>(-1)) return ret;
+  buffer.resize((std::max)(ret, size_t{0}));
   return ret;
 }
 
-int InputStream::read(bool &value) {
+size_t InputStream::read(bool &value) {
   uint8_t buf = 0;
 
   if (read(&buf, 1) != 1) {
-    return -1;
+    return static_cast<size_t>(-1);
   }
   value = buf;
   return 1;
 }
 
-int InputStream::read(utils::Identifier &value) {
+size_t InputStream::read(utils::Identifier &value) {
   std::string uuidStr;
-  int ret = read(uuidStr);
+  const auto ret = read(uuidStr);
   if (ret < 0) {
     return ret;
   }
   auto optional_uuid = utils::Identifier::parse(uuidStr);
   if (!optional_uuid) {
-    return -1;
+    return static_cast<size_t>(-1);
   }
   value = optional_uuid.value();
   return ret;
 }
 
-int InputStream::read(std::string &str, bool widen) {
+size_t InputStream::read(std::string &str, bool widen) {
   uint32_t len = 0;
-  int ret = 0;
+  size_t ret = 0;
   if (!widen) {
     uint16_t shortLength = 0;
     ret = read(shortLength);
@@ -84,9 +85,9 @@ int InputStream::read(std::string &str, bool widen) {
   }
 
   std::vector<uint8_t> buffer(len);
-  uint32_t bytes_read = gsl::narrow<uint32_t>(read(buffer.data(), len));
+  const auto bytes_read = gsl::narrow<uint32_t>(read(buffer.data(), len));
   if (bytes_read != len) {
-    return -1;
+    return static_cast<size_t>(-1);
   }
 
   str = std::string(reinterpret_cast<const char*>(buffer.data()), len);
diff --git a/libminifi/src/io/tls/SecureDescriptorStream.cpp b/libminifi/src/io/tls/SecureDescriptorStream.cpp
index c5b0b3f..66d23b5 100644
--- a/libminifi/src/io/tls/SecureDescriptorStream.cpp
+++ b/libminifi/src/io/tls/SecureDescriptorStream.cpp
@@ -34,7 +34,7 @@ SecureDescriptorStream::SecureDescriptorStream(int fd, SSL *ssl)
       logger_(logging::LoggerFactory<SecureDescriptorStream>::getLogger()) {
 }
 
-void SecureDescriptorStream::seek(uint64_t offset) {
+void SecureDescriptorStream::seek(size_t offset) {
   std::lock_guard<std::recursive_mutex> lock(file_lock_);
 #ifdef WIN32
   _lseeki64(fd_, gsl::narrow<int64_t>(offset), 0x00);
@@ -71,34 +71,29 @@ int SecureDescriptorStream::write(const uint8_t *value, int size) {
   }
 }
 
-int SecureDescriptorStream::read(uint8_t *buf, int buflen) {
-  gsl_Expects(buflen >= 0);
+size_t SecureDescriptorStream::read(uint8_t * const buf, const size_t buflen) {
   if (buflen == 0) {
     return 0;
   }
-  if (!IsNullOrEmpty(buf)) {
-    int total_read = 0;
-      int status = 0;
-      while (buflen) {
-        int sslStatus;
-        do {
-          status = SSL_read(ssl_, buf, buflen);
-          sslStatus = SSL_get_error(ssl_, status);
-        } while (status < 0 && sslStatus == SSL_ERROR_WANT_READ);
+  if (IsNullOrEmpty(buf)) return static_cast<size_t>(-1);
+  size_t total_read = 0;
+  uint8_t* writepos = buf;
+  while (buflen > total_read) {
+    int status;
+    int sslStatus;
+    do {
+      status = SSL_read(ssl_, writepos, gsl::narrow<int>(std::min(buflen - total_read, gsl::narrow<size_t>(std::numeric_limits<int>::max()))));
+      sslStatus = SSL_get_error(ssl_, status);
+    } while (status < 0 && sslStatus == SSL_ERROR_WANT_READ);
 
-        if (status < 0)
-              break;
+    if (status < 0)
+      break;
 
-        buflen -= status;
-        buf += status;
-        total_read += status;
-      }
-
-      return total_read;
-
-  } else {
-    return -1;
+    writepos += status;
+    total_read += gsl::narrow<size_t>(status);
   }
+
+  return total_read;
 }
 
 } /* namespace io */
diff --git a/libminifi/src/io/tls/TLSSocket.cpp b/libminifi/src/io/tls/TLSSocket.cpp
index 776fa44..49d07ec 100644
--- a/libminifi/src/io/tls/TLSSocket.cpp
+++ b/libminifi/src/io/tls/TLSSocket.cpp
@@ -351,36 +351,37 @@ int16_t TLSSocket::select_descriptor(const uint16_t msec) {
   return -1;
 }
 
-int TLSSocket::read(uint8_t *buf, int buflen, bool /*retrieve_all_bytes*/) {
-  gsl_Expects(buflen >= 0);
-  int total_read = 0;
+size_t TLSSocket::read(uint8_t *buf, size_t buflen, bool /*retrieve_all_bytes*/) {
+  size_t total_read = 0;
   int status = 0;
   int loc = 0;
   int16_t fd = select_descriptor(1000);
   if (fd < 0) {
     close();
-    return -1;
+    return static_cast<size_t>(-1);
   }
   auto fd_ssl = get_ssl(fd);
   if (IsNullOrEmpty(fd_ssl)) {
-    return -1;
+    return static_cast<size_t>(-1);
   }
   if (!SSL_pending(fd_ssl)) {
     return 0;
   }
   while (buflen) {
     if (fd <= 0) {
-      return -1;
+      return static_cast<size_t>(-1);
     }
     int sslStatus;
     do {
-      status = SSL_read(fd_ssl, buf + loc, buflen);
+      status = SSL_read(fd_ssl, buf + loc, gsl::narrow<int>(std::min(buflen, gsl::narrow<size_t>(std::numeric_limits<int>::max()))));
       sslStatus = SSL_get_error(fd_ssl, status);
     } while (status < 0 && sslStatus == SSL_ERROR_WANT_READ && SSL_pending(fd_ssl));
 
-    buflen -= status;
+    if (status < 0) break;
+
+    buflen -= gsl::narrow<size_t>(status);
     loc += status;
-    total_read += status;
+    total_read += gsl::narrow<size_t>(status);
   }
 
   return total_read;
@@ -418,33 +419,32 @@ int TLSSocket::write(const uint8_t *value, int size) {
   return writeData(value, size, fd);
 }
 
-int TLSSocket::read(uint8_t *buf, int buflen) {
-  gsl_Expects(buflen >= 0);
-  int total_read = 0;
+size_t TLSSocket::read(uint8_t *buf, size_t buflen) {
+  size_t total_read = 0;
   int status = 0;
   while (buflen) {
-    int16_t fd = select_descriptor(1000);
+    const int16_t fd = select_descriptor(1000);
     if (fd < 0) {
       close();
-      return -1;
+      return static_cast<size_t>(-1);
     }
 
     int sslStatus;
     do {
       auto fd_ssl = get_ssl(fd);
       if (IsNullOrEmpty(fd_ssl)) {
-        return -1;
+        return static_cast<size_t>(-1);
       }
-      status = SSL_read(fd_ssl, buf, buflen);
+      status = SSL_read(fd_ssl, buf, gsl::narrow<int>(std::min(buflen, gsl::narrow<size_t>(std::numeric_limits<int>::max()))));
       sslStatus = SSL_get_error(fd_ssl, status);
     } while (status < 0 && sslStatus == SSL_ERROR_WANT_READ);
 
     if (status < 0)
       break;
 
-    buflen -= status;
+    buflen -= gsl::narrow<size_t>(status);
     buf += status;
-    total_read += status;
+    total_read += gsl::narrow<size_t>(status);
   }
 
   return total_read;
diff --git a/libminifi/src/provenance/Provenance.cpp b/libminifi/src/provenance/Provenance.cpp
index f9af7f9..2fac08b 100644
--- a/libminifi/src/provenance/Provenance.cpp
+++ b/libminifi/src/provenance/Provenance.cpp
@@ -235,12 +235,10 @@ bool ProvenanceEventRecord::Serialize(const std::shared_ptr<core::SerializableCo
 }
 
 bool ProvenanceEventRecord::DeSerialize(const uint8_t *buffer, const size_t bufferSize) {
-  int ret;
-
   org::apache::nifi::minifi::io::BufferStream outStream(buffer, gsl::narrow<unsigned int>(bufferSize));
 
-  ret = outStream.read(uuid_);
-  if (ret <= 0) {
+  auto ret = outStream.read(uuid_);
+  if (ret == static_cast<size_t>(-1)) {
     return false;
   }
 
@@ -272,22 +270,22 @@ bool ProvenanceEventRecord::DeSerialize(const uint8_t *buffer, const size_t buff
   }
 
   ret = outStream.read(this->_componentId);
-  if (ret <= 0) {
+  if (ret == static_cast<size_t>(-1)) {
     return false;
   }
 
   ret = outStream.read(this->_componentType);
-  if (ret <= 0) {
+  if (ret == static_cast<size_t>(-1)) {
     return false;
   }
 
   ret = outStream.read(this->flow_uuid_);
-  if (ret <= 0) {
+  if (ret == static_cast<size_t>(-1)) {
     return false;
   }
 
   ret = outStream.read(this->_details);
-  if (ret <= 0) {
+  if (ret == static_cast<size_t>(-1)) {
     return false;
   }
 
@@ -301,19 +299,19 @@ bool ProvenanceEventRecord::DeSerialize(const uint8_t *buffer, const size_t buff
   for (uint32_t i = 0; i < numAttributes; i++) {
     std::string key;
     ret = outStream.read(key);
-    if (ret <= 0) {
+    if (ret == static_cast<size_t>(-1)) {
       return false;
     }
     std::string value;
     ret = outStream.read(value);
-    if (ret <= 0) {
+    if (ret == static_cast<size_t>(-1)) {
       return false;
     }
     this->_attributes[key] = value;
   }
 
   ret = outStream.read(this->_contentFullPath);
-  if (ret <= 0) {
+  if (ret == static_cast<size_t>(-1)) {
     return false;
   }
 
@@ -328,7 +326,7 @@ bool ProvenanceEventRecord::DeSerialize(const uint8_t *buffer, const size_t buff
   }
 
   ret = outStream.read(this->_sourceQueueIdentifier);
-  if (ret <= 0) {
+  if (ret == static_cast<size_t>(-1)) {
     return false;
   }
 
@@ -343,7 +341,7 @@ bool ProvenanceEventRecord::DeSerialize(const uint8_t *buffer, const size_t buff
     for (uint32_t i = 0; i < number; i++) {
       utils::Identifier parentUUID;
       ret = outStream.read(parentUUID);
-      if (ret <= 0) {
+      if (ret == static_cast<size_t>(-1)) {
         return false;
       }
       this->addParentUuid(parentUUID);
@@ -356,23 +354,23 @@ bool ProvenanceEventRecord::DeSerialize(const uint8_t *buffer, const size_t buff
     for (uint32_t i = 0; i < number; i++) {
       utils::Identifier childUUID;
       ret = outStream.read(childUUID);
-      if (ret <= 0) {
+      if (ret == static_cast<size_t>(-1)) {
         return false;
       }
       this->addChildUuid(childUUID);
     }
   } else if (this->_eventType == ProvenanceEventRecord::SEND || this->_eventType == ProvenanceEventRecord::FETCH) {
     ret = outStream.read(this->_transitUri);
-    if (ret <= 0) {
+    if (ret == static_cast<size_t>(-1)) {
       return false;
     }
   } else if (this->_eventType == ProvenanceEventRecord::RECEIVE) {
     ret = outStream.read(this->_transitUri);
-    if (ret <= 0) {
+    if (ret == static_cast<size_t>(-1)) {
       return false;
     }
     ret = outStream.read(this->_sourceSystemFlowFileIdentifier);
-    if (ret <= 0) {
+    if (ret == static_cast<size_t>(-1)) {
       return false;
     }
   }
diff --git a/libminifi/src/sitetosite/RawSocketProtocol.cpp b/libminifi/src/sitetosite/RawSocketProtocol.cpp
index 7ea213f..0eb1f7e 100644
--- a/libminifi/src/sitetosite/RawSocketProtocol.cpp
+++ b/libminifi/src/sitetosite/RawSocketProtocol.cpp
@@ -342,40 +342,40 @@ bool RawSiteToSiteClient::getPeerList(std::vector<PeerStatus> &peers) {
     }
 
     uint32_t number;
-    status = peer_->read(number);
+    auto ret = peer_->read(number);
 
-    if (status <= 0) {
+    if (ret == static_cast<size_t>(-1)) {
       tearDown();
       return false;
     }
 
     for (uint32_t i = 0; i < number; i++) {
       std::string host;
-      status = peer_->read(host);
-      if (status <= 0) {
+      ret = peer_->read(host);
+      if (ret == static_cast<size_t>(-1)) {
         tearDown();
         return false;
       }
       uint32_t port;
-      status = peer_->read(port);
-      if (status <= 0) {
+      ret = peer_->read(port);
+      if (ret == static_cast<size_t>(-1)) {
         tearDown();
         return false;
       }
       uint8_t secure;
-      status = peer_->read(secure);
-      if (status <= 0) {
+      ret = peer_->read(secure);
+      if (ret == static_cast<size_t>(-1)) {
         tearDown();
         return false;
       }
       uint32_t count;
-      status = peer_->read(count);
-      if (status <= 0) {
+      ret = peer_->read(count);
+      if (ret == static_cast<size_t>(-1)) {
         tearDown();
         return false;
       }
-      PeerStatus status(std::make_shared<Peer>(port_id_, host, gsl::narrow<uint16_t>(port), secure != 0), count, true);
-      peers.push_back(std::move(status));
+      PeerStatus peer_status(std::make_shared<Peer>(port_id_, host, gsl::narrow<uint16_t>(port), secure != 0), count, true);
+      peers.push_back(std::move(peer_status));
       logging::LOG_TRACE(logger_) << "Site2Site Peer host " << host << " port " << port << " Secure " << std::to_string(secure);
     }
 
@@ -397,15 +397,14 @@ bool RawSiteToSiteClient::getPeerList(std::vector<PeerStatus> &peers) {
   int RawSiteToSiteClient::readRequestType(RequestType &type) {
     std::string requestTypeStr;
 
-    int ret = peer_->read(requestTypeStr);
-
-    if (ret <= 0)
-      return ret;
+    const auto ret = peer_->read(requestTypeStr);
+    if (ret == static_cast<size_t>(-1))
+      return gsl::narrow_cast<int>(ret);
 
     for (int i = NEGOTIATE_FLOWFILE_CODEC; i <= SHUTDOWN; i++) {
       if (SiteToSiteRequest::RequestTypeStr[i] == requestTypeStr) {
         type = (RequestType) i;
-        return ret;
+        return gsl::narrow_cast<int>(ret);
       }
     }
 
@@ -417,7 +416,7 @@ int RawSiteToSiteClient::readRespond(const std::shared_ptr<Transaction> &transac
 }
 
 int RawSiteToSiteClient::writeRespond(const std::shared_ptr<Transaction> &transaction, RespondCode code, std::string message) {
-  return writeResponse(transaction, code, message);
+  return writeResponse(transaction, code, std::move(message));
 }
 
 bool RawSiteToSiteClient::negotiateCodec() {
diff --git a/libminifi/src/sitetosite/SiteToSiteClient.cpp b/libminifi/src/sitetosite/SiteToSiteClient.cpp
index 1833144..fb24f73 100644
--- a/libminifi/src/sitetosite/SiteToSiteClient.cpp
+++ b/libminifi/src/sitetosite/SiteToSiteClient.cpp
@@ -32,7 +32,7 @@ namespace sitetosite {
 int SiteToSiteClient::readResponse(const std::shared_ptr<Transaction>& /*transaction*/, RespondCode &code, std::string &message) {
   uint8_t firstByte;
 
-  int ret = peer_->read(firstByte);
+  auto ret = peer_->read(firstByte);
 
   if (ret <= 0 || firstByte != CODE_SEQUENCE_VALUE_1)
     return -1;
@@ -48,27 +48,25 @@ int SiteToSiteClient::readResponse(const std::shared_ptr<Transaction>& /*transac
 
   ret = peer_->read(thirdByte);
 
-  if (ret <= 0)
-    return ret;
+  if (ret != static_cast<size_t>(-1))
+    return gsl::narrow_cast<int>(ret);
 
   code = (RespondCode) thirdByte;
 
   RespondCodeContext *resCode = this->getRespondCodeContext(code);
-
-  if (resCode == NULL) {
-    // Not a valid respond code
+  if (!resCode) {
     return -1;
   }
   if (resCode->hasDescription) {
     ret = peer_->read(message);
-    if (ret <= 0)
+    if (ret != static_cast<size_t>(-1))
       return -1;
   }
   return gsl::narrow<int>(3 + message.size());
 }
 
 void SiteToSiteClient::deleteTransaction(const utils::Identifier& transactionID) {
-  std::shared_ptr<Transaction> transaction = NULL;
+  std::shared_ptr<Transaction> transaction;
 
   auto it = this->known_transactions_.find(transactionID);
 
@@ -85,7 +83,7 @@ void SiteToSiteClient::deleteTransaction(const utils::Identifier& transactionID)
 int SiteToSiteClient::writeResponse(const std::shared_ptr<Transaction>& /*transaction*/, RespondCode code, std::string message) {
   RespondCodeContext *resCode = this->getRespondCodeContext(code);
 
-  if (resCode == NULL) {
+  if (!resCode) {
     // Not a valid respond code
     return -1;
   }
@@ -205,7 +203,7 @@ bool SiteToSiteClient::transferFlowFiles(const std::shared_ptr<core::ProcessCont
 
 bool SiteToSiteClient::confirm(const utils::Identifier& transactionID) {
   int ret;
-  std::shared_ptr<Transaction> transaction = NULL;
+  std::shared_ptr<Transaction> transaction;
 
   if (peer_state_ != READY) {
     bootstrap();
@@ -529,8 +527,7 @@ int16_t SiteToSiteClient::send(const utils::Identifier &transactionID, DataPacke
 }
 
 bool SiteToSiteClient::receive(const utils::Identifier& transactionID, DataPacket *packet, bool &eof) {
-  int ret;
-  std::shared_ptr<Transaction> transaction = NULL;
+  std::shared_ptr<Transaction> transaction;
 
   if (peer_state_ != READY) {
     bootstrap();
@@ -568,9 +565,7 @@ bool SiteToSiteClient::receive(const utils::Identifier& transactionID, DataPacke
     RespondCode code;
     std::string message;
 
-    ret = readResponse(transaction, code, message);
-
-    if (ret <= 0) {
+    if (readResponse(transaction, code, message) <= 0) {
       return false;
     }
     if (code == CONTINUE_TRANSACTION) {
@@ -595,8 +590,8 @@ bool SiteToSiteClient::receive(const utils::Identifier& transactionID, DataPacke
 
   // start to read the packet
   uint32_t numAttributes;
-  ret = transaction->getStream().read(numAttributes);
-  if (ret <= 0 || numAttributes > MAX_NUM_ATTRIBUTES) {
+  auto ret = transaction->getStream().read(numAttributes);
+  if (ret != static_cast<size_t>(-1) || numAttributes > MAX_NUM_ATTRIBUTES) {
     return false;
   }
 
@@ -606,11 +601,11 @@ bool SiteToSiteClient::receive(const utils::Identifier& transactionID, DataPacke
     std::string key;
     std::string value;
     ret = transaction->getStream().read(key, true);
-    if (ret <= 0) {
+    if (ret != static_cast<size_t>(-1)) {
       return false;
     }
     ret = transaction->getStream().read(value, true);
-    if (ret <= 0) {
+    if (ret != static_cast<size_t>(-1)) {
       return false;
     }
     packet->_attributes[key] = value;
@@ -619,7 +614,7 @@ bool SiteToSiteClient::receive(const utils::Identifier& transactionID, DataPacke
 
   uint64_t len;
   ret = transaction->getStream().read(len);
-  if (ret <= 0) {
+  if (ret != static_cast<size_t>(-1)) {
     return false;
   }
 
diff --git a/libminifi/src/utils/ByteArrayCallback.cpp b/libminifi/src/utils/ByteArrayCallback.cpp
index 33c9634..a64c3ac 100644
--- a/libminifi/src/utils/ByteArrayCallback.cpp
+++ b/libminifi/src/utils/ByteArrayCallback.cpp
@@ -35,10 +35,10 @@ int64_t ByteOutputCallback::process(const std::shared_ptr<io::BaseStream>& strea
   if (stream->size() > 0) {
     std::unique_ptr<char> buffer = std::unique_ptr<char>(new char[stream->size()]);
     readFully(buffer.get(), stream->size());
-    stream->read(reinterpret_cast<uint8_t*>(buffer.get()), gsl::narrow<int>(stream->size()));
-    return stream->size();
+    stream->read(reinterpret_cast<uint8_t*>(buffer.get()), stream->size());
+    return gsl::narrow<int64_t>(stream->size());
   }
-  return size_.load();
+  return gsl::narrow<int64_t>(size_.load());
 }
 
 int64_t StreamOutputCallback::process(const std::shared_ptr<io::BaseStream>& stream) {
@@ -46,7 +46,7 @@ int64_t StreamOutputCallback::process(const std::shared_ptr<io::BaseStream>& str
   std::unique_ptr<char> buffer = std::unique_ptr<char>(new char[size_.load()]);
   auto written = readFully(buffer.get(), size_);
   stream->write(reinterpret_cast<uint8_t*>(buffer.get()), gsl::narrow<int>(written));
-  return stream->size();
+  return gsl::narrow<int64_t>(stream->size());
 }
 
 void StreamOutputCallback::write(char *data, size_t size) {
@@ -55,7 +55,7 @@ void StreamOutputCallback::write(char *data, size_t size) {
   write_and_notify(data, size);
 }
 
-const std::vector<char> ByteOutputCallback::to_string() {
+std::vector<char> ByteOutputCallback::to_string() {
   std::vector<char> buffer;
   buffer.resize(size_.load());
   readFully(buffer.data(), size_.load());
diff --git a/libminifi/src/utils/FileOutputCallback.cpp b/libminifi/src/utils/FileOutputCallback.cpp
index 6dfa565..bbd1265 100644
--- a/libminifi/src/utils/FileOutputCallback.cpp
+++ b/libminifi/src/utils/FileOutputCallback.cpp
@@ -34,7 +34,7 @@ int64_t FileOutputCallback::process(const std::shared_ptr<io::BaseStream>& strea
   return size_.load();
 }
 
-const std::vector<char> FileOutputCallback::to_string() {
+std::vector<char> FileOutputCallback::to_string() {
   std::vector<char> buffer;
   buffer.insert(std::end(buffer), std::begin(file_), std::end(file_));
   return buffer;
diff --git a/libminifi/test/BufferReader.h b/libminifi/test/BufferReader.h
index 2b4bf73..312ca76 100644
--- a/libminifi/test/BufferReader.h
+++ b/libminifi/test/BufferReader.h
@@ -26,14 +26,14 @@ class BufferReader : public org::apache::nifi::minifi::InputStreamCallback {
  public:
   explicit BufferReader(std::vector<uint8_t>& buffer) : buffer_(buffer){}
 
-  int write(org::apache::nifi::minifi::io::BaseStream& input, std::size_t len) {
+  size_t write(org::apache::nifi::minifi::io::BaseStream& input, std::size_t len) {
     uint8_t tmpBuffer[4096]{};
     std::size_t remaining_len = len;
-    int total_read = 0;
+    size_t total_read = 0;
     while (remaining_len > 0) {
-      auto ret = input.read(tmpBuffer, gsl::narrow<int>(std::min(remaining_len, sizeof(tmpBuffer))));
+      const auto ret = input.read(tmpBuffer, std::min(remaining_len, sizeof(tmpBuffer)));
       if (ret == 0) break;
-      if (ret < 0) return ret;
+      if (ret == static_cast<size_t>(-1)) return ret;
       remaining_len -= ret;
       total_read += ret;
       auto prevSize = buffer_.size();
@@ -44,7 +44,7 @@ class BufferReader : public org::apache::nifi::minifi::InputStreamCallback {
   }
 
   int64_t process(const std::shared_ptr<org::apache::nifi::minifi::io::BaseStream>& stream) {
-    return write(*stream.get(), stream->size());
+    return static_cast<int64_t>(write(*stream.get(), stream->size()));
   }
 
  private:
diff --git a/libminifi/test/archive-tests/CompressContentTests.cpp b/libminifi/test/archive-tests/CompressContentTests.cpp
index 76507df..3d0da83 100644
--- a/libminifi/test/archive-tests/CompressContentTests.cpp
+++ b/libminifi/test/archive-tests/CompressContentTests.cpp
@@ -41,29 +41,23 @@
 #include "processors/PutFile.h"
 #include "utils/file/FileUtils.h"
 #include "../Utils.h"
+#include "utils/gsl.h"
 
 class ReadCallback: public minifi::InputStreamCallback {
  public:
-  explicit ReadCallback(size_t size) :
-      read_size_(0) {
-    buffer_size_ = size;
-    buffer_ = new uint8_t[buffer_size_];
-    archive_buffer_ = nullptr;
-    archive_buffer_size_ = 0;
-  }
-  ~ReadCallback() {
-    if (buffer_)
-      delete[] buffer_;
-    if (archive_buffer_)
-      delete[] archive_buffer_;
-  }
-  int64_t process(const std::shared_ptr<minifi::io::BaseStream>& stream) {
+  explicit ReadCallback(size_t size)
+      :buffer_size_{size}
+  {}
+  ~ReadCallback() override {
+    delete[] buffer_;
+    delete[] archive_buffer_;
+  }
+  int64_t process(const std::shared_ptr<minifi::io::BaseStream>& stream) override {
     int64_t total_read = 0;
-    int64_t ret = 0;
     do {
-      ret = stream->read(buffer_ + read_size_, gsl::narrow<int>(buffer_size_ - read_size_));
+      const auto ret = stream->read(buffer_ + read_size_, buffer_size_ - read_size_);
       if (ret == 0) break;
-      if (ret < 0) return ret;
+      if (ret == static_cast<size_t>(-1)) return -1;
       read_size_ += gsl::narrow<size_t>(ret);
       total_read += ret;
     } while (buffer_size_ != read_size_);
@@ -78,18 +72,18 @@ class ReadCallback: public minifi::InputStreamCallback {
     struct archive_entry *ae;
 
     REQUIRE(archive_read_next_header(a, &ae) == ARCHIVE_OK);
-    int size = gsl::narrow<int>(archive_entry_size(ae));
+    const auto size = archive_entry_size(ae);
     archive_buffer_ = new char[size];
     archive_buffer_size_ = size;
-    archive_read_data(a, archive_buffer_, size);
+    archive_read_data(a, archive_buffer_, gsl::narrow<size_t>(size));
     archive_read_free(a);
   }
 
-  uint8_t *buffer_;
   size_t buffer_size_;
-  size_t read_size_;
-  char *archive_buffer_;
-  int archive_buffer_size_;
+  uint8_t *buffer_ = new uint8_t[buffer_size_];
+  size_t read_size_ = 0;
+  char *archive_buffer_ = nullptr;
+  int64_t archive_buffer_size_ = 0;
 };
 
 /**
diff --git a/libminifi/test/archive-tests/MergeFileTests.cpp b/libminifi/test/archive-tests/MergeFileTests.cpp
index 60e233f..074b153 100644
--- a/libminifi/test/archive-tests/MergeFileTests.cpp
+++ b/libminifi/test/archive-tests/MergeFileTests.cpp
@@ -88,9 +88,9 @@ class FixedBuffer : public minifi::InputStreamCallback {
     REQUIRE(size_ + len <= capacity_);
     int total_read = 0;
     do {
-      auto ret = input.read(end(), gsl::narrow<int>(len));
+      const auto ret = input.read(end(), len);
       if (ret == 0) break;
-      if (ret < 0) return ret;
+      if (ret == static_cast<size_t>(-1)) return -1;
       size_ += ret;
       len -= ret;
       total_read += ret;
@@ -98,7 +98,7 @@ class FixedBuffer : public minifi::InputStreamCallback {
     return total_read;
   }
   int64_t process(const std::shared_ptr<minifi::io::BaseStream>& stream) {
-    return write(*stream.get(), capacity_);
+    return write(*stream, capacity_);
   }
 
  private:
@@ -111,8 +111,8 @@ std::vector<FixedBuffer> read_archives(const FixedBuffer& input) {
   class ArchiveEntryReader {
    public:
     explicit ArchiveEntryReader(archive* arch) : arch(arch) {}
-    int read(uint8_t* out, std::size_t len) {
-      return gsl::narrow<int>(archive_read_data(arch, out, len));
+    size_t read(uint8_t* out, std::size_t len) {
+      return gsl::narrow_cast<size_t>(archive_read_data(arch, out, len));
     }
    private:
     archive* arch;
diff --git a/libminifi/test/rocksdb-tests/ContentSessionTests.cpp b/libminifi/test/rocksdb-tests/ContentSessionTests.cpp
index 059dfa9..d2e6d51 100644
--- a/libminifi/test/rocksdb-tests/ContentSessionTests.cpp
+++ b/libminifi/test/rocksdb-tests/ContentSessionTests.cpp
@@ -56,12 +56,11 @@ const std::shared_ptr<minifi::io::BaseStream>& operator>>(const std::shared_ptr<
   str = "";
   uint8_t buffer[4096]{};
   while (true) {
-    auto ret = stream->read(buffer, sizeof(buffer));
+    const auto ret = stream->read(buffer, sizeof(buffer));
     REQUIRE(ret >= 0);
-    if (ret == 0) {
-      break;
-    }
-    str += std::string{reinterpret_cast<char*>(buffer), static_cast<std::size_t>(ret)};
+    REQUIRE(ret != static_cast<size_t>(-1));
+    if (ret == 0) { break; }
+    str += std::string{reinterpret_cast<char*>(buffer), ret};
   }
   return stream;
 }
diff --git a/libminifi/test/rocksdb-tests/DBContentRepositoryTests.cpp b/libminifi/test/rocksdb-tests/DBContentRepositoryTests.cpp
index 6409f3e..7b5c75c 100644
--- a/libminifi/test/rocksdb-tests/DBContentRepositoryTests.cpp
+++ b/libminifi/test/rocksdb-tests/DBContentRepositoryTests.cpp
@@ -104,7 +104,7 @@ TEST_CASE("Delete Claim", "[TestDBCR2]") {
   std::string readstr;
 
   // -1 tell us we have an invalid stream
-  REQUIRE(read_stream->read(readstr) == -1);
+  REQUIRE(read_stream->read(readstr) == static_cast<size_t>(-1));
 }
 
 TEST_CASE("Test Empty Claim", "[TestDBCR3]") {
@@ -140,7 +140,7 @@ TEST_CASE("Test Empty Claim", "[TestDBCR3]") {
   std::string readstr;
 
   // -1 tell us we have an invalid stream
-  REQUIRE(read_stream->read(readstr) == -1);
+  REQUIRE(read_stream->read(readstr) == static_cast<size_t>(-1));
 }
 
 TEST_CASE("Delete NonExistent Claim", "[TestDBCR4]") {
diff --git a/libminifi/test/rocksdb-tests/RocksDBStreamTests.cpp b/libminifi/test/rocksdb-tests/RocksDBStreamTests.cpp
index 88b3929..5e7c3b6 100644
--- a/libminifi/test/rocksdb-tests/RocksDBStreamTests.cpp
+++ b/libminifi/test/rocksdb-tests/RocksDBStreamTests.cpp
@@ -72,5 +72,5 @@ TEST_CASE_METHOD(RocksDBStreamTest, "Read zero bytes") {
 
   minifi::io::RocksDbStream nonExistingStream("two", gsl::make_not_null(db.get()));
 
-  REQUIRE(nonExistingStream.read(nullptr, 0) == -1);
+  REQUIRE(nonExistingStream.read(nullptr, 0) == static_cast<size_t>(-1));
 }
diff --git a/libminifi/test/unit/FileStreamTests.cpp b/libminifi/test/unit/FileStreamTests.cpp
index b13dcfa..1ff4ab9 100644
--- a/libminifi/test/unit/FileStreamTests.cpp
+++ b/libminifi/test/unit/FileStreamTests.cpp
@@ -45,7 +45,7 @@ TEST_CASE("TestFileOverWrite", "[TestFiles]") {
 
   minifi::io::FileStream stream(path, 0, true);
   std::vector<uint8_t> readBuffer;
-  REQUIRE(stream.read(readBuffer, gsl::narrow<int>(stream.size())) == stream.size());
+  REQUIRE(stream.read(readBuffer, stream.size()) == stream.size());
 
   uint8_t* data = readBuffer.data();
 
@@ -59,7 +59,7 @@ TEST_CASE("TestFileOverWrite", "[TestFiles]") {
 
   std::vector<uint8_t> verifybuffer;
 
-  REQUIRE(stream.read(verifybuffer, gsl::narrow<int>(stream.size())) == stream.size());
+  REQUIRE(stream.read(verifybuffer, stream.size()) == stream.size());
 
   data = verifybuffer.data();
 
@@ -83,7 +83,7 @@ TEST_CASE("TestFileBadArgumentNoChange", "[TestLoader]") {
 
   minifi::io::FileStream stream(path, 0, true);
   std::vector<uint8_t> readBuffer;
-  REQUIRE(stream.read(readBuffer, gsl::narrow<int>(stream.size())) == stream.size());
+  REQUIRE(stream.read(readBuffer, stream.size()) == stream.size());
 
   uint8_t* data = readBuffer.data();
 
@@ -97,7 +97,7 @@ TEST_CASE("TestFileBadArgumentNoChange", "[TestLoader]") {
 
   std::vector<uint8_t> verifybuffer;
 
-  REQUIRE(stream.read(verifybuffer, gsl::narrow<int>(stream.size())) == stream.size());
+  REQUIRE(stream.read(verifybuffer, stream.size()) == stream.size());
 
   data = verifybuffer.data();
 
@@ -121,7 +121,7 @@ TEST_CASE("TestFileBadArgumentNoChange2", "[TestLoader]") {
 
   minifi::io::FileStream stream(path, 0, true);
   std::vector<uint8_t> readBuffer;
-  REQUIRE(stream.read(readBuffer, gsl::narrow<int>(stream.size())) == stream.size());
+  REQUIRE(stream.read(readBuffer, stream.size()) == stream.size());
 
   uint8_t* data = readBuffer.data();
 
@@ -135,7 +135,7 @@ TEST_CASE("TestFileBadArgumentNoChange2", "[TestLoader]") {
 
   std::vector<uint8_t> verifybuffer;
 
-  REQUIRE(stream.read(verifybuffer, gsl::narrow<int>(stream.size())) == stream.size());
+  REQUIRE(stream.read(verifybuffer, stream.size()) == stream.size());
 
   data = verifybuffer.data();
 
@@ -159,7 +159,7 @@ TEST_CASE("TestFileBadArgumentNoChange3", "[TestLoader]") {
 
   minifi::io::FileStream stream(path, 0, true);
   std::vector<uint8_t> readBuffer;
-  REQUIRE(stream.read(readBuffer, gsl::narrow<int>(stream.size())) == stream.size());
+  REQUIRE(stream.read(readBuffer, stream.size()) == stream.size());
 
   uint8_t* data = readBuffer.data();
 
@@ -173,11 +173,11 @@ TEST_CASE("TestFileBadArgumentNoChange3", "[TestLoader]") {
 
   std::vector<uint8_t> verifybuffer;
 
-  REQUIRE(stream.read(nullptr, gsl::narrow<int>(stream.size())) == -1);
+  REQUIRE(stream.read(nullptr, stream.size()) == static_cast<size_t>(-1));
 
   data = verifybuffer.data();
 
-  REQUIRE(std::string(reinterpret_cast<char*>(data), verifybuffer.size()) == "");
+  REQUIRE(std::string(reinterpret_cast<char*>(data), verifybuffer.size()).empty());
 
   std::remove(ss.str().c_str());
 }
@@ -197,7 +197,7 @@ TEST_CASE("TestFileBeyondEnd3", "[TestLoader]") {
 
   minifi::io::FileStream stream(path, 0, true);
   std::vector<uint8_t> readBuffer;
-  REQUIRE(stream.read(readBuffer, gsl::narrow<int>(stream.size())) == stream.size());
+  REQUIRE(stream.read(readBuffer, stream.size()) == stream.size());
 
   uint8_t* data = readBuffer.data();
 
@@ -232,7 +232,7 @@ TEST_CASE("TestFileExceedSize", "[TestLoader]") {
 
   minifi::io::FileStream stream(path, 0, true);
   std::vector<uint8_t> readBuffer;
-  REQUIRE(stream.read(readBuffer, gsl::narrow<int>(stream.size())) == stream.size());
+  REQUIRE(stream.read(readBuffer, stream.size()) == stream.size());
 
   uint8_t* data = readBuffer.data();
 
@@ -280,7 +280,7 @@ TEST_CASE("Non-existing file read/write test") {
   REQUIRE(test_controller.getLog().getInstance().contains("Error writing to file: invalid file stream", std::chrono::seconds(0)));
   std::vector<uint8_t> readBuffer;
   stream.seek(0);
-  REQUIRE(stream.read(readBuffer, 1) == -1);
+  REQUIRE(stream.read(readBuffer, 1) == static_cast<size_t>(-1));
   REQUIRE(test_controller.getLog().getInstance().contains("Error reading from file: invalid file stream", std::chrono::seconds(0)));
 }
 
@@ -300,10 +300,10 @@ TEST_CASE("Existing file read/write test") {
   REQUIRE_FALSE(test_controller.getLog().getInstance().contains("Error writing to file", std::chrono::seconds(0)));
   std::vector<uint8_t> readBuffer;
   stream.seek(0);
-  REQUIRE_FALSE(stream.read(readBuffer, 11) == -1);
+  REQUIRE_FALSE(stream.read(readBuffer, 11) == static_cast<size_t>(-1));
   REQUIRE_FALSE(test_controller.getLog().getInstance().contains("Error reading from file", std::chrono::seconds(0)));
   stream.seek(0);
-  REQUIRE(stream.read(nullptr, 11) == -1);
+  REQUIRE(stream.read(nullptr, 11) == static_cast<size_t>(-1));
   REQUIRE(test_controller.getLog().getInstance().contains("Error reading from file: invalid buffer", std::chrono::seconds(0)));
 }
 
diff --git a/libminifi/test/unit/SiteToSiteHelper.h b/libminifi/test/unit/SiteToSiteHelper.h
index b4d5e8f..61257e2 100644
--- a/libminifi/test/unit/SiteToSiteHelper.h
+++ b/libminifi/test/unit/SiteToSiteHelper.h
@@ -59,7 +59,7 @@ class SiteToSiteResponder : public minifi::io::BaseStream {
    * @param len length to read
    * @return resulting read size
    **/
-  int read(uint8_t *value, int len) override {
+  size_t read(uint8_t *value, size_t len) override {
     return server_responses_.read(value, len);
   }
 
diff --git a/nanofi/src/api/nanofi.cpp b/nanofi/src/api/nanofi.cpp
index b339a28..ca60e7f 100644
--- a/nanofi/src/api/nanofi.cpp
+++ b/nanofi/src/api/nanofi.cpp
@@ -459,7 +459,7 @@ int get_content(const flow_file_record* ff, uint8_t* target, int size) {
     std::shared_ptr<minifi::ResourceClaim> claim = std::make_shared<minifi::ResourceClaim>(ff->contentLocation,
                                                                                            *content_repo);
     auto stream = (*content_repo)->read(*claim);
-    return stream->read(target, size);
+    return static_cast<int>(stream->read(target, static_cast<size_t>(size)));
   } else {
     file_buffer fb = file_to_buffer(ff->contentLocation);
     if (size < 0) {
diff --git a/nanofi/tests/CSite2SiteTests.cpp b/nanofi/tests/CSite2SiteTests.cpp
index a1f8a08..ad3c086 100644
--- a/nanofi/tests/CSite2SiteTests.cpp
+++ b/nanofi/tests/CSite2SiteTests.cpp
@@ -39,6 +39,7 @@
 #include "core/cstructs.h"
 #include "RandomServerSocket.h"
 #include "core/log.h"
+#include "utils/gsl.h"
 
 #define FMT_DEFAULT fmt_lower
 
@@ -145,15 +146,15 @@ void sunny_path_bootstrap(minifi::io::BaseStream* stream, TransferState& transfe
   size_t read_len = 0;
   while(!found_codec) {
     uint8_t handshake_data[1000];
-    int actual_len = stream->read(handshake_data+read_len, 1000-read_len);
-    if(actual_len <= 0) {
+    const auto actual_len = stream->read(handshake_data+read_len, 1000-read_len);
+    if(actual_len == 0 || actual_len == static_cast<size_t>(-1)) {
       continue;
     }
     read_len += actual_len;
-    std::string incoming_data(reinterpret_cast<const char *>(handshake_data), read_len);
-    auto it = std::search(incoming_data.begin(), incoming_data.end(), CODEC_NAME.begin(), CODEC_NAME.end());
+    const std::string incoming_data(reinterpret_cast<const char *>(handshake_data), read_len);
+    const auto it = std::search(incoming_data.begin(), incoming_data.end(), CODEC_NAME.begin(), CODEC_NAME.end());
     if(it != incoming_data.end()){
-      size_t idx = std::distance(incoming_data.begin(), it);
+      const auto idx = gsl::narrow<size_t>(std::distance(incoming_data.begin(), it));
       // Actual version follows the string as an uint32_t // that should be the end of the buffer
       found_codec = idx + CODEC_NAME.length() + sizeof(uint32_t) == read_len;
     }

[nifi-minifi-cpp] 02/17: fix s2s issues

Posted by sz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

szaszm pushed a commit to branch MINIFICPP-1507
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit ee2ac71af1822c1da5b78bf6d3a032c3543546a6
Author: Marton Szasz <sz...@gmail.com>
AuthorDate: Tue Mar 16 11:58:51 2021 +0100

    fix s2s issues
---
 libminifi/src/io/BufferStream.cpp              |   2 +-
 libminifi/src/sitetosite/RawSocketProtocol.cpp | 133 ++++++++++++++-----------
 libminifi/src/sitetosite/SiteToSiteClient.cpp  |  14 +--
 3 files changed, 78 insertions(+), 71 deletions(-)

diff --git a/libminifi/src/io/BufferStream.cpp b/libminifi/src/io/BufferStream.cpp
index 12220ef..1dd64ea 100644
--- a/libminifi/src/io/BufferStream.cpp
+++ b/libminifi/src/io/BufferStream.cpp
@@ -39,7 +39,7 @@ int BufferStream::write(const uint8_t *value, int size) {
 size_t BufferStream::read(uint8_t *buf, size_t len) {
   const auto bytes_available_in_buffer = buffer_.size() - readOffset_;
   const auto readlen = std::min(len, bytes_available_in_buffer);
-  auto begin = buffer_.begin() + gsl::narrow<decltype(buffer_)::difference_type>(readOffset_);
+  const auto begin = buffer_.begin() + gsl::narrow<decltype(buffer_)::difference_type>(readOffset_);
   std::copy(begin, begin + gsl::narrow<decltype(buffer_)::difference_type>(readlen), buf);
 
   // increase offset for the next read
diff --git a/libminifi/src/sitetosite/RawSocketProtocol.cpp b/libminifi/src/sitetosite/RawSocketProtocol.cpp
index 0eb1f7e..c75eb17 100644
--- a/libminifi/src/sitetosite/RawSocketProtocol.cpp
+++ b/libminifi/src/sitetosite/RawSocketProtocol.cpp
@@ -114,28 +114,31 @@ bool RawSiteToSiteClient::initiateResourceNegotiation() {
 
   logger_->log_debug("Negotiate protocol version with destination port %s current version %d", port_id_.to_string(), _currentVersion);
 
-  int ret = peer_->write(getResourceName());
-
-  logger_->log_trace("result of writing resource name is %i", ret);
-  if (ret <= 0) {
-    logger_->log_debug("result of writing resource name is %i", ret);
-    // tearDown();
-    return false;
+  {
+    const auto ret = peer_->write(getResourceName());
+    logger_->log_trace("result of writing resource name is %i", ret);
+    if (ret <= 0) {
+      logger_->log_debug("result of writing resource name is %i", ret);
+      // tearDown();
+      return false;
+    }
   }
 
-  ret = peer_->write(_currentVersion);
-
-  if (ret <= 0) {
-    logger_->log_debug("result of writing version is %i", ret);
-    return false;
+  {
+    const auto ret = peer_->write(_currentVersion);
+    if (ret <= 0) {
+      logger_->log_debug("result of writing version is %i", ret);
+      return false;
+    }
   }
 
   uint8_t statusCode;
-  ret = peer_->read(statusCode);
-
-  if (ret <= 0) {
-    logger_->log_debug("result of writing version status code  %i", ret);
-    return false;
+  {
+    const auto ret = peer_->read(statusCode);
+    if (ret == 0 || ret == static_cast<size_t>(-1)) {
+      logger_->log_debug("result of writing version status code  %i", ret);
+      return false;
+    }
   }
   logger_->log_debug("status code is %i", statusCode);
   switch (statusCode) {
@@ -144,9 +147,11 @@ bool RawSiteToSiteClient::initiateResourceNegotiation() {
       return true;
     case DIFFERENT_RESOURCE_VERSION:
       uint32_t serverVersion;
-      ret = peer_->read(serverVersion);
-      if (ret <= 0) {
-        return false;
+      {
+        const auto ret = peer_->read(serverVersion);
+        if (ret == 0 || ret == static_cast<size_t>(-1)) {
+          return false;
+        }
       }
 
       logging::LOG_INFO(logger_) << "Site2Site Server Response asked for a different protocol version " << serverVersion;
@@ -178,36 +183,40 @@ bool RawSiteToSiteClient::initiateCodecResourceNegotiation() {
 
   logger_->log_trace("Negotiate Codec version with destination port %s current version %d", port_id_.to_string(), _currentCodecVersion);
 
-  int ret = peer_->write(getCodecResourceName());
-
-  if (ret <= 0) {
-    logger_->log_debug("result of getCodecResourceName is %i", ret);
-    return false;
+  {
+    const auto ret = peer_->write(getCodecResourceName());
+    if (ret <= 0) {
+      logger_->log_debug("result of getCodecResourceName is %i", ret);
+      return false;
+    }
   }
 
-  ret = peer_->write(_currentCodecVersion);
-
-  if (ret <= 0) {
-    logger_->log_debug("result of _currentCodecVersion is %i", ret);
-    return false;
+  {
+    const auto ret = peer_->write(_currentCodecVersion);
+    if (ret <= 0) {
+      logger_->log_debug("result of _currentCodecVersion is %i", ret);
+      return false;
+    }
   }
 
   uint8_t statusCode;
-  ret = peer_->read(statusCode);
-
-  if (ret <= 0) {
-    return false;
+  {
+    const auto ret = peer_->read(statusCode);
+    if (ret == 0 || ret == static_cast<size_t>(-1)) {
+      return false;
+    }
   }
-
   switch (statusCode) {
     case RESOURCE_OK:
       logger_->log_trace("Site2Site Codec Negotiate version OK");
       return true;
     case DIFFERENT_RESOURCE_VERSION:
       uint32_t serverVersion;
-      ret = peer_->read(serverVersion);
-      if (ret <= 0) {
-        return false;
+      {
+        const auto ret = peer_->read(serverVersion);
+        if (ret == 0 || ret == static_cast<size_t>(-1)) {
+          return false;
+        }
       }
       logging::LOG_INFO(logger_) << "Site2Site Server Response asked for a different protocol version " << serverVersion;
 
@@ -237,10 +246,11 @@ bool RawSiteToSiteClient::handShake() {
   logger_->log_debug("Site2Site Protocol Perform hand shake with destination port %s", port_id_.to_string());
   _commsIdentifier = id_generator_->generate();
 
-  int ret = peer_->write(_commsIdentifier);
-
-  if (ret <= 0) {
-    return false;
+  {
+    const auto ret = peer_->write(_commsIdentifier);
+    if (ret <= 0) {
+      return false;
+    }
   }
 
   std::map<std::string, std::string> properties;
@@ -257,27 +267,33 @@ bool RawSiteToSiteClient::handShake() {
   }
 
   if (_currentVersion >= 3) {
-    ret = peer_->write(peer_->getURL());
+    const auto ret = peer_->write(peer_->getURL());
     if (ret <= 0) {
       return false;
     }
   }
 
-  uint32_t size = gsl::narrow<uint32_t>(properties.size());
-  ret = peer_->write(size);
-  if (ret <= 0) {
-    return false;
+  {
+    const auto size = gsl::narrow<uint32_t>(properties.size());
+    const auto ret = peer_->write(size);
+    if (ret <= 0) {
+      return false;
+    }
   }
 
   std::map<std::string, std::string>::iterator it;
   for (it = properties.begin(); it != properties.end(); it++) {
-    ret = peer_->write(it->first);
-    if (ret <= 0) {
-      return false;
+    {
+      const auto ret = peer_->write(it->first);
+      if (ret <= 0) {
+        return false;
+      }
     }
-    ret = peer_->write(it->second);
-    if (ret <= 0) {
-      return false;
+    {
+      const auto ret = peer_->write(it->second);
+      if (ret <= 0) {
+        return false;
+      }
     }
     logger_->log_debug("Site2Site Protocol Send handshake properties %s %s", it->first, it->second);
   }
@@ -285,10 +301,11 @@ bool RawSiteToSiteClient::handShake() {
   RespondCode code;
   std::string message;
 
-  ret = readRespond(nullptr, code, message);
-
-  if (ret <= 0) {
-    return false;
+  {
+    const auto ret = readRespond(nullptr, code, message);
+    if (ret <= 0) {
+      return false;
+    }
   }
 
   std::string error;
@@ -310,13 +327,11 @@ bool RawSiteToSiteClient::handShake() {
     // Unknown error
     default:
       logger_->log_error("HandShake Failed because of unknown respond code %d", code);
-      ret = -1;
       return false;
   }
 
   // All known error cases handled here
   logger_->log_error("Site2Site HandShake Failed because destination port, %s, is %s", port_id_.to_string(), error);
-  ret = -1;
   return false;
 }
 
diff --git a/libminifi/src/sitetosite/SiteToSiteClient.cpp b/libminifi/src/sitetosite/SiteToSiteClient.cpp
index fb24f73..b009c18 100644
--- a/libminifi/src/sitetosite/SiteToSiteClient.cpp
+++ b/libminifi/src/sitetosite/SiteToSiteClient.cpp
@@ -31,28 +31,21 @@ namespace sitetosite {
 
 int SiteToSiteClient::readResponse(const std::shared_ptr<Transaction>& /*transaction*/, RespondCode &code, std::string &message) {
   uint8_t firstByte;
-
   auto ret = peer_->read(firstByte);
-
-  if (ret <= 0 || firstByte != CODE_SEQUENCE_VALUE_1)
+  if (ret == 0 || ret == static_cast<size_t>(-1) || firstByte != CODE_SEQUENCE_VALUE_1)
     return -1;
 
   uint8_t secondByte;
-
   ret = peer_->read(secondByte);
-
-  if (ret <= 0 || secondByte != CODE_SEQUENCE_VALUE_2)
+  if (ret == 0 || ret == static_cast<size_t>(-1) || secondByte != CODE_SEQUENCE_VALUE_2)
     return -1;
 
   uint8_t thirdByte;
-
   ret = peer_->read(thirdByte);
-
-  if (ret != static_cast<size_t>(-1))
+  if (ret == static_cast<size_t>(-1))
     return gsl::narrow_cast<int>(ret);
 
   code = (RespondCode) thirdByte;
-
   RespondCodeContext *resCode = this->getRespondCodeContext(code);
   if (!resCode) {
     return -1;
@@ -69,7 +62,6 @@ void SiteToSiteClient::deleteTransaction(const utils::Identifier& transactionID)
   std::shared_ptr<Transaction> transaction;
 
   auto it = this->known_transactions_.find(transactionID);
-
   if (it == known_transactions_.end()) {
     return;
   } else {

[nifi-minifi-cpp] 10/17: various review feedback-related changes

Posted by sz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

szaszm pushed a commit to branch MINIFICPP-1507
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit 722180e71533b4f3aa15c00c51c81c1bc330b1b9
Author: Marton Szasz <sz...@gmail.com>
AuthorDate: Wed Mar 17 16:28:12 2021 +0100

    various review feedback-related changes
---
 extensions/standard-processors/processors/ExtractText.cpp | 2 +-
 libminifi/src/c2/ControllerSocketProtocol.cpp             | 6 +++---
 libminifi/src/io/tls/SecureDescriptorStream.cpp           | 3 ++-
 libminifi/src/sitetosite/SiteToSiteClient.cpp             | 2 +-
 libminifi/test/rocksdb-tests/DBContentRepositoryTests.cpp | 8 ++++----
 5 files changed, 11 insertions(+), 10 deletions(-)

diff --git a/extensions/standard-processors/processors/ExtractText.cpp b/extensions/standard-processors/processors/ExtractText.cpp
index 9774cf8..0e2a458 100644
--- a/extensions/standard-processors/processors/ExtractText.cpp
+++ b/extensions/standard-processors/processors/ExtractText.cpp
@@ -125,7 +125,7 @@ int64_t ExtractText::ReadCallback::process(const std::shared_ptr<io::BaseStream>
   if (sizeLimitStr.empty())
     size_limit = DEFAULT_SIZE_LIMIT;
   else if (sizeLimitStr != "0")
-    size_limit = gsl::narrow_cast<size_t>(std::stoi(sizeLimitStr));
+    size_limit = static_cast<size_t>(std::stoi(sizeLimitStr));
 
   std::ostringstream contentStream;
 
diff --git a/libminifi/src/c2/ControllerSocketProtocol.cpp b/libminifi/src/c2/ControllerSocketProtocol.cpp
index d441182..03e8306 100644
--- a/libminifi/src/c2/ControllerSocketProtocol.cpp
+++ b/libminifi/src/c2/ControllerSocketProtocol.cpp
@@ -252,11 +252,11 @@ void ControllerSocketProtocol::initialize(core::controller::ControllerServicePro
 void ControllerSocketProtocol::parse_content(const std::vector<C2ContentResponse> &content) {
   for (const auto &payload_content : content) {
     if (payload_content.name == "Components") {
-      for (const auto& content_ : payload_content.operation_arguments) {
+      for (const auto& operation_argument : payload_content.operation_arguments) {
         bool is_enabled = false;
-        minifi::utils::StringUtils::StringToBool(content_.second.to_string(), is_enabled);
+        minifi::utils::StringUtils::StringToBool(operation_argument.second.to_string(), is_enabled);
         std::lock_guard<std::mutex> lock(controller_mutex_);
-        component_map_[content_.first] = is_enabled;
+        component_map_[operation_argument.first] = is_enabled;
       }
     }
   }
diff --git a/libminifi/src/io/tls/SecureDescriptorStream.cpp b/libminifi/src/io/tls/SecureDescriptorStream.cpp
index 66d23b5..5acf950 100644
--- a/libminifi/src/io/tls/SecureDescriptorStream.cpp
+++ b/libminifi/src/io/tls/SecureDescriptorStream.cpp
@@ -82,7 +82,8 @@ size_t SecureDescriptorStream::read(uint8_t * const buf, const size_t buflen) {
     int status;
     int sslStatus;
     do {
-      status = SSL_read(ssl_, writepos, gsl::narrow<int>(std::min(buflen - total_read, gsl::narrow<size_t>(std::numeric_limits<int>::max()))));
+      const auto ssl_read_size = gsl::narrow<int>(std::min(buflen - total_read, gsl::narrow<size_t>(std::numeric_limits<int>::max())));
+      status = SSL_read(ssl_, writepos, ssl_read_size);
       sslStatus = SSL_get_error(ssl_, status);
     } while (status < 0 && sslStatus == SSL_ERROR_WANT_READ);
 
diff --git a/libminifi/src/sitetosite/SiteToSiteClient.cpp b/libminifi/src/sitetosite/SiteToSiteClient.cpp
index c7cd919..60cef54 100644
--- a/libminifi/src/sitetosite/SiteToSiteClient.cpp
+++ b/libminifi/src/sitetosite/SiteToSiteClient.cpp
@@ -58,7 +58,7 @@ int SiteToSiteClient::readResponse(const std::shared_ptr<Transaction>& /*transac
   }
   if (resCode->hasDescription) {
     const auto ret = peer_->read(message);
-    if (ret == 0 || !io::isError(ret))
+    if (ret == 0 || io::isError(ret))
       return -1;
   }
   return gsl::narrow<int>(3 + message.size());
diff --git a/libminifi/test/rocksdb-tests/DBContentRepositoryTests.cpp b/libminifi/test/rocksdb-tests/DBContentRepositoryTests.cpp
index 4d5e5a3..38b7690 100644
--- a/libminifi/test/rocksdb-tests/DBContentRepositoryTests.cpp
+++ b/libminifi/test/rocksdb-tests/DBContentRepositoryTests.cpp
@@ -103,7 +103,7 @@ TEST_CASE("Delete Claim", "[TestDBCR2]") {
 
   std::string readstr;
 
-  // error tell us we have an invalid stream
+  // error tells us we have an invalid stream
   REQUIRE(minifi::io::isError(read_stream->read(readstr)));
 }
 
@@ -139,7 +139,7 @@ TEST_CASE("Test Empty Claim", "[TestDBCR3]") {
 
   std::string readstr;
 
-  // -1 tell us we have an invalid stream
+  // error tells us we have an invalid stream
   REQUIRE(minifi::io::isError(read_stream->read(readstr)));
 }
 
@@ -179,7 +179,7 @@ TEST_CASE("Delete NonExistent Claim", "[TestDBCR4]") {
 
   std::string readstr;
 
-  // -1 tell us we have an invalid stream
+  // error tells us we have an invalid stream
   read_stream->read(readstr);
 
   REQUIRE(readstr == "well hello there");
@@ -227,7 +227,7 @@ TEST_CASE("Delete Remove Count Claim", "[TestDBCR5]") {
 
   std::string readstr;
 
-  // -1 tell us we have an invalid stream
+  // error tells us we have an invalid stream
   read_stream->read(readstr);
 
   REQUIRE(readstr == "well hello there");

[nifi-minifi-cpp] 06/17: Update Provenance.h: handle uuid read errors

Posted by sz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

szaszm pushed a commit to branch MINIFICPP-1507
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit e4090707245f5b4713ae9b80a4ab72dcfe7bffbd
Author: Márton Szász <sz...@gmail.com>
AuthorDate: Tue Mar 16 15:30:15 2021 +0000

    Update Provenance.h: handle uuid read errors
    
    Co-authored-by: Ferenc Gerlits <fg...@users.noreply.github.com>
---
 libminifi/include/provenance/Provenance.h | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/libminifi/include/provenance/Provenance.h b/libminifi/include/provenance/Provenance.h
index 60ee8c2..c2fd38c 100644
--- a/libminifi/include/provenance/Provenance.h
+++ b/libminifi/include/provenance/Provenance.h
@@ -367,7 +367,7 @@ class ProvenanceEventRecord : public core::SerializableComponent {
 
     std::string uuid;
     const auto uuidret = outStream.read(uuid);
-    if (uuidret <= 0) {
+    if (uuidret == 0 || uuidret == static_cast<size_t>(-1)) {
       return 0;
     }
 

[nifi-minifi-cpp] 15/17: Update extensions/libarchive/CompressContent.h

Posted by sz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

szaszm pushed a commit to branch MINIFICPP-1507
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit e8df736bc9cf284a8fc1ffe4e36ecbf6e8db586f
Author: Márton Szász <sz...@gmail.com>
AuthorDate: Thu Mar 25 10:22:43 2021 +0000

    Update extensions/libarchive/CompressContent.h
    
    Co-authored-by: adamdebreceni <64...@users.noreply.github.com>
---
 extensions/libarchive/CompressContent.h | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/extensions/libarchive/CompressContent.h b/extensions/libarchive/CompressContent.h
index 8f456d8..aae9492 100644
--- a/extensions/libarchive/CompressContent.h
+++ b/extensions/libarchive/CompressContent.h
@@ -384,7 +384,7 @@ public:
               break;
             } else {
               const auto writeret = outputStream_->write(buffer.data(), ret);
-              if (!io::isError(writeret) && gsl::narrow<size_t>(writeret) != ret) {
+              if (io::isError(writeret) || gsl::narrow<size_t>(writeret) != ret) {
                 return -1;
               }
               read_size += ret;

[nifi-minifi-cpp] 11/17: fix issues

Posted by sz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

szaszm pushed a commit to branch MINIFICPP-1507
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit 3010f2b0ad05dc3eec39d4c98c8e8659894b3268
Author: Marton Szasz <sz...@gmail.com>
AuthorDate: Thu Mar 18 13:49:15 2021 +0100

    fix issues
---
 extensions/http-curl/tests/CivetStream.h                    |  2 +-
 extensions/mqtt/processors/ConvertJSONAck.h                 |  2 +-
 extensions/mqtt/processors/PublishMQTT.h                    |  1 +
 extensions/opc/src/putopc.cpp                               |  1 +
 extensions/sql/processors/PutSQL.cpp                        |  1 +
 .../standard-processors/processors/ExecuteProcess.cpp       |  1 +
 extensions/standard-processors/processors/ExtractText.cpp   |  4 ++--
 extensions/tensorflow/TFExtractTopLabels.cpp                |  1 +
 libminifi/src/io/FileStream.cpp                             |  2 ++
 libminifi/src/io/InputStream.cpp                            |  2 +-
 libminifi/src/io/tls/SecureDescriptorStream.cpp             |  2 ++
 libminifi/src/io/tls/TLSSocket.cpp                          |  8 +++++---
 libminifi/src/sitetosite/RawSocketProtocol.cpp              | 13 +++++--------
 libminifi/test/rocksdb-tests/ContentSessionTests.cpp        |  1 -
 libminifi/test/rocksdb-tests/DBContentRepositoryTests.cpp   |  2 --
 15 files changed, 24 insertions(+), 19 deletions(-)

diff --git a/extensions/http-curl/tests/CivetStream.h b/extensions/http-curl/tests/CivetStream.h
index e333b54..9513125 100644
--- a/extensions/http-curl/tests/CivetStream.h
+++ b/extensions/http-curl/tests/CivetStream.h
@@ -54,7 +54,7 @@ class CivetStream : public io::InputStream {
   size_t read(uint8_t *buf, size_t buflen) override {
     const auto ret = mg_read(conn, buf, buflen);
     if (ret < 0) return static_cast<size_t>(-1);
-    return gsl::narrow_cast<size_t>(ret);
+    return gsl::narrow<size_t>(ret);
   }
 
  protected:
diff --git a/extensions/mqtt/processors/ConvertJSONAck.h b/extensions/mqtt/processors/ConvertJSONAck.h
index 659af59..6c9d3cd 100644
--- a/extensions/mqtt/processors/ConvertJSONAck.h
+++ b/extensions/mqtt/processors/ConvertJSONAck.h
@@ -80,7 +80,7 @@ class ConvertJSONAck : public ConvertBase {
         return 0;
       buffer_.resize(stream->size());
       const auto ret = stream->read(reinterpret_cast<uint8_t*>(buffer_.data()), stream->size());
-      return gsl::narrow<int64_t>(ret);
+      return !io::isError(ret) ? gsl::narrow<int64_t>(ret) : -1;
     }
     std::vector<char> buffer_;
   };
diff --git a/extensions/mqtt/processors/PublishMQTT.h b/extensions/mqtt/processors/PublishMQTT.h
index ba14c0f..745246f 100644
--- a/extensions/mqtt/processors/PublishMQTT.h
+++ b/extensions/mqtt/processors/PublishMQTT.h
@@ -31,6 +31,7 @@
 #include "core/logging/LoggerConfiguration.h"
 #include "MQTTClient.h"
 #include "AbstractMQTTProcessor.h"
+#include "utils/gsl.h"
 
 namespace org {
 namespace apache {
diff --git a/extensions/opc/src/putopc.cpp b/extensions/opc/src/putopc.cpp
index daf0242..5ea3167 100644
--- a/extensions/opc/src/putopc.cpp
+++ b/extensions/opc/src/putopc.cpp
@@ -37,6 +37,7 @@
 #include "core/logging/LoggerConfiguration.h"
 #include "utils/Id.h"
 #include "utils/StringUtils.h"
+#include "utils/gsl.h"
 
 namespace org {
 namespace apache {
diff --git a/extensions/sql/processors/PutSQL.cpp b/extensions/sql/processors/PutSQL.cpp
index ee8b4ce..9b44c5b 100644
--- a/extensions/sql/processors/PutSQL.cpp
+++ b/extensions/sql/processors/PutSQL.cpp
@@ -33,6 +33,7 @@
 #include "Exception.h"
 #include "data/DatabaseConnectors.h"
 #include "data/JSONSQLWriter.h"
+#include "utils/gsl.h"
 
 namespace org {
 namespace apache {
diff --git a/extensions/standard-processors/processors/ExecuteProcess.cpp b/extensions/standard-processors/processors/ExecuteProcess.cpp
index cc9212d..732ef7d 100644
--- a/extensions/standard-processors/processors/ExecuteProcess.cpp
+++ b/extensions/standard-processors/processors/ExecuteProcess.cpp
@@ -27,6 +27,7 @@
 #include "utils/StringUtils.h"
 #include "utils/TimeUtil.h"
 #include "core/TypedValues.h"
+#include "utils/gsl.h"
 
 #if defined(__clang__)
 #pragma clang diagnostic push
diff --git a/extensions/standard-processors/processors/ExtractText.cpp b/extensions/standard-processors/processors/ExtractText.cpp
index 0e2a458..23e0fae 100644
--- a/extensions/standard-processors/processors/ExtractText.cpp
+++ b/extensions/standard-processors/processors/ExtractText.cpp
@@ -32,8 +32,8 @@
 #include "core/ProcessContext.h"
 #include "core/ProcessSession.h"
 #include "core/FlowFile.h"
-
 #include "utils/RegexUtils.h"
+#include "utils/gsl.h"
 
 namespace org {
 namespace apache {
@@ -164,7 +164,7 @@ int64_t ExtractText::ReadCallback::process(const std::shared_ptr<io::BaseStream>
     const size_t maxCaptureSize = [this] {
       uint64_t val;
       ctx_->getProperty(MaxCaptureGroupLen.getName(), val);
-      return size_t{val};
+      return gsl::narrow<size_t>(val);
     }();
 
     std::string contentStr = contentStream.str();
diff --git a/extensions/tensorflow/TFExtractTopLabels.cpp b/extensions/tensorflow/TFExtractTopLabels.cpp
index b73a3e5..3156ba9 100644
--- a/extensions/tensorflow/TFExtractTopLabels.cpp
+++ b/extensions/tensorflow/TFExtractTopLabels.cpp
@@ -137,6 +137,7 @@ int64_t TFExtractTopLabels::LabelsReadCallback::process(const std::shared_ptr<io
 
   while (total_read < stream->size()) {
     const auto read = stream->read(reinterpret_cast<uint8_t *>(&buf[0]), buf_size);
+    if (io::isError(read)) break;
     for (size_t i = 0; i < read; i++) {
       if (buf[i] == '\n' || total_read + i == stream->size()) {
         labels_->emplace_back(label.substr(0, label_size));
diff --git a/libminifi/src/io/FileStream.cpp b/libminifi/src/io/FileStream.cpp
index c88f8dd..32b3e1b 100644
--- a/libminifi/src/io/FileStream.cpp
+++ b/libminifi/src/io/FileStream.cpp
@@ -25,6 +25,8 @@
 #include "io/FileStream.h"
 #include "io/InputStream.h"
 #include "io/OutputStream.h"
+#include "utils/gsl.h"
+
 namespace org {
 namespace apache {
 namespace nifi {
diff --git a/libminifi/src/io/InputStream.cpp b/libminifi/src/io/InputStream.cpp
index be0a273..2361451 100644
--- a/libminifi/src/io/InputStream.cpp
+++ b/libminifi/src/io/InputStream.cpp
@@ -53,7 +53,7 @@ size_t InputStream::read(bool &value) {
 size_t InputStream::read(utils::Identifier &value) {
   std::string uuidStr;
   const auto ret = read(uuidStr);
-  if (ret < 0) {
+  if (isError(ret)) {
     return ret;
   }
   auto optional_uuid = utils::Identifier::parse(uuidStr);
diff --git a/libminifi/src/io/tls/SecureDescriptorStream.cpp b/libminifi/src/io/tls/SecureDescriptorStream.cpp
index 5acf950..7aea5ab 100644
--- a/libminifi/src/io/tls/SecureDescriptorStream.cpp
+++ b/libminifi/src/io/tls/SecureDescriptorStream.cpp
@@ -23,6 +23,8 @@
 #include <string>
 #include <Exception.h>
 #include "io/validation.h"
+#include "utils/gsl.h"
+
 namespace org {
 namespace apache {
 namespace nifi {
diff --git a/libminifi/src/io/tls/TLSSocket.cpp b/libminifi/src/io/tls/TLSSocket.cpp
index 49d07ec..4deb3f6 100644
--- a/libminifi/src/io/tls/TLSSocket.cpp
+++ b/libminifi/src/io/tls/TLSSocket.cpp
@@ -373,7 +373,8 @@ size_t TLSSocket::read(uint8_t *buf, size_t buflen, bool /*retrieve_all_bytes*/)
     }
     int sslStatus;
     do {
-      status = SSL_read(fd_ssl, buf + loc, gsl::narrow<int>(std::min(buflen, gsl::narrow<size_t>(std::numeric_limits<int>::max()))));
+      const auto ssl_read_size = gsl::narrow<int>(std::min(buflen, gsl::narrow<size_t>(std::numeric_limits<int>::max())));
+      status = SSL_read(fd_ssl, buf + loc, ssl_read_size);
       sslStatus = SSL_get_error(fd_ssl, status);
     } while (status < 0 && sslStatus == SSL_ERROR_WANT_READ && SSL_pending(fd_ssl));
 
@@ -431,11 +432,12 @@ size_t TLSSocket::read(uint8_t *buf, size_t buflen) {
 
     int sslStatus;
     do {
-      auto fd_ssl = get_ssl(fd);
+      const auto fd_ssl = get_ssl(fd);
       if (IsNullOrEmpty(fd_ssl)) {
         return static_cast<size_t>(-1);
       }
-      status = SSL_read(fd_ssl, buf, gsl::narrow<int>(std::min(buflen, gsl::narrow<size_t>(std::numeric_limits<int>::max()))));
+      const auto ssl_read_size = gsl::narrow<int>(std::min(buflen, gsl::narrow<size_t>(std::numeric_limits<int>::max())));
+      status = SSL_read(fd_ssl, buf, ssl_read_size);
       sslStatus = SSL_get_error(fd_ssl, status);
     } while (status < 0 && sslStatus == SSL_ERROR_WANT_READ);
 
diff --git a/libminifi/src/sitetosite/RawSocketProtocol.cpp b/libminifi/src/sitetosite/RawSocketProtocol.cpp
index 463a96f..b60b2f4 100644
--- a/libminifi/src/sitetosite/RawSocketProtocol.cpp
+++ b/libminifi/src/sitetosite/RawSocketProtocol.cpp
@@ -349,23 +349,21 @@ void RawSiteToSiteClient::tearDown() {
 
 bool RawSiteToSiteClient::getPeerList(std::vector<PeerStatus> &peers) {
   if (establish() && handShake()) {
-    int status = writeRequestType(REQUEST_PEER_LIST);
-
-    if (status <= 0) {
+    if (writeRequestType(REQUEST_PEER_LIST) <= 0) {
       tearDown();
       return false;
     }
 
-    uint32_t number;
+    uint32_t number_of_peers;
     {
-      const auto ret = peer_->read(number);
+      const auto ret = peer_->read(number_of_peers);
       if (ret == 0 || io::isError(ret)) {
         tearDown();
         return false;
       }
     }
 
-    for (uint32_t i = 0; i < number; i++) {
+    for (uint32_t i = 0; i < number_of_peers; i++) {
       std::string host;
       {
         const auto ret = peer_->read(host);
@@ -398,8 +396,7 @@ bool RawSiteToSiteClient::getPeerList(std::vector<PeerStatus> &peers) {
           return false;
         }
       }
-      PeerStatus peer_status(std::make_shared<Peer>(port_id_, host, gsl::narrow<uint16_t>(port), secure != 0), count, true);
-      peers.push_back(std::move(peer_status));
+      peers.push_back(PeerStatus{std::make_shared<Peer>(port_id_, host, gsl::narrow<uint16_t>(port), secure != 0), count, true});
       logging::LOG_TRACE(logger_) << "Site2Site Peer host " << host << " port " << port << " Secure " << std::to_string(secure);
     }
 
diff --git a/libminifi/test/rocksdb-tests/ContentSessionTests.cpp b/libminifi/test/rocksdb-tests/ContentSessionTests.cpp
index 77b7c73..f63947b 100644
--- a/libminifi/test/rocksdb-tests/ContentSessionTests.cpp
+++ b/libminifi/test/rocksdb-tests/ContentSessionTests.cpp
@@ -57,7 +57,6 @@ const std::shared_ptr<minifi::io::BaseStream>& operator>>(const std::shared_ptr<
   uint8_t buffer[4096]{};
   while (true) {
     const auto ret = stream->read(buffer, sizeof(buffer));
-    REQUIRE(ret >= 0);
     REQUIRE(!minifi::io::isError(ret));
     if (ret == 0) { break; }
     str += std::string{reinterpret_cast<char*>(buffer), ret};
diff --git a/libminifi/test/rocksdb-tests/DBContentRepositoryTests.cpp b/libminifi/test/rocksdb-tests/DBContentRepositoryTests.cpp
index 38b7690..712d9f8 100644
--- a/libminifi/test/rocksdb-tests/DBContentRepositoryTests.cpp
+++ b/libminifi/test/rocksdb-tests/DBContentRepositoryTests.cpp
@@ -179,7 +179,6 @@ TEST_CASE("Delete NonExistent Claim", "[TestDBCR4]") {
 
   std::string readstr;
 
-  // error tells us we have an invalid stream
   read_stream->read(readstr);
 
   REQUIRE(readstr == "well hello there");
@@ -227,7 +226,6 @@ TEST_CASE("Delete Remove Count Claim", "[TestDBCR5]") {
 
   std::string readstr;
 
-  // error tells us we have an invalid stream
   read_stream->read(readstr);
 
   REQUIRE(readstr == "well hello there");

[nifi-minifi-cpp] 04/17: fix compilation on some platforms with size_t not the same as uint64_t

Posted by sz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

szaszm pushed a commit to branch MINIFICPP-1507
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit 4c01eeaf718b9113554bf2c542265bccd3dcb030
Author: Marton Szasz <sz...@gmail.com>
AuthorDate: Tue Mar 16 13:20:45 2021 +0100

    fix compilation on some platforms with size_t not the same as uint64_t
---
 libminifi/src/io/BufferStream.cpp | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/libminifi/src/io/BufferStream.cpp b/libminifi/src/io/BufferStream.cpp
index 1dd64ea..be0b68b 100644
--- a/libminifi/src/io/BufferStream.cpp
+++ b/libminifi/src/io/BufferStream.cpp
@@ -38,7 +38,7 @@ int BufferStream::write(const uint8_t *value, int size) {
 
 size_t BufferStream::read(uint8_t *buf, size_t len) {
   const auto bytes_available_in_buffer = buffer_.size() - readOffset_;
-  const auto readlen = std::min(len, bytes_available_in_buffer);
+  const auto readlen = std::min(len, size_t{bytes_available_in_buffer});
   const auto begin = buffer_.begin() + gsl::narrow<decltype(buffer_)::difference_type>(readOffset_);
   std::copy(begin, begin + gsl::narrow<decltype(buffer_)::difference_type>(readlen), buf);