You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by sz...@apache.org on 2021/05/26 12:46:09 UTC
[nifi-minifi-cpp] 01/17: convert InputStream::read to size_t
This is an automated email from the ASF dual-hosted git repository.
szaszm pushed a commit to branch MINIFICPP-1507
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
commit 5c41fff9c012860cebb8c830c38057b93efef7dc
Author: Marton Szasz <sz...@gmail.com>
AuthorDate: Fri Mar 12 14:10:55 2021 +0100
convert InputStream::read to size_t
---
extensions/aws/processors/PutS3Object.h | 11 ++--
extensions/civetweb/processors/ListenHTTP.h | 17 +++---
extensions/http-curl/client/HTTPCallback.h | 2 +-
extensions/http-curl/client/HTTPStream.cpp | 9 ++--
extensions/http-curl/client/HTTPStream.h | 4 +-
extensions/http-curl/tests/CivetStream.h | 8 ++-
extensions/http-curl/tests/HTTPHandlers.h | 9 ++--
extensions/jni/jvm/JniReferenceObjects.h | 3 +-
extensions/libarchive/CompressContent.h | 49 +++++++++--------
extensions/libarchive/FocusArchiveEntry.cpp | 9 ++--
extensions/librdkafka/PublishKafka.cpp | 7 ++-
extensions/mqtt/processors/ConvertJSONAck.h | 13 ++---
extensions/mqtt/processors/ConvertUpdate.cpp | 2 +-
extensions/mqtt/processors/PublishMQTT.h | 18 ++++---
extensions/opc/src/putopc.cpp | 14 ++---
extensions/rocksdb-repos/RocksDbStream.cpp | 35 +++++-------
extensions/rocksdb-repos/RocksDbStream.h | 4 +-
extensions/script/python/PyBaseStream.cpp | 8 ++-
extensions/script/python/PyBaseStream.h | 2 +-
extensions/sftp/client/SFTPClient.cpp | 14 ++---
.../standard-processors/processors/ExtractText.cpp | 22 ++++----
.../standard-processors/processors/LogAttribute.h | 20 ++++---
.../standard-processors/processors/PutFile.cpp | 25 ++++-----
.../standard-processors/processors/PutFile.h | 2 +-
libminifi/include/io/AtomicEntryStream.h | 13 +++--
libminifi/include/io/BufferStream.h | 6 +--
libminifi/include/io/CRCStream.h | 6 +--
libminifi/include/io/ClientSocket.h | 4 +-
libminifi/include/io/DescriptorStream.h | 4 +-
libminifi/include/io/FileStream.h | 4 +-
libminifi/include/io/InputStream.h | 16 +++---
libminifi/include/io/Stream.h | 2 +-
libminifi/include/io/StreamPipe.h | 12 ++---
libminifi/include/io/tls/SecureDescriptorStream.h | 4 +-
libminifi/include/io/tls/TLSSocket.h | 4 +-
libminifi/include/provenance/Provenance.h | 16 +++---
libminifi/include/sitetosite/Peer.h | 2 +-
libminifi/include/sitetosite/SiteToSiteClient.h | 55 ++++++++-----------
libminifi/include/utils/ByteArrayCallback.h | 7 +--
libminifi/include/utils/Enum.h | 4 +-
libminifi/include/utils/FileOutputCallback.h | 2 +-
libminifi/src/FlowFileRecord.cpp | 14 +++--
libminifi/src/c2/ControllerSocketProtocol.cpp | 63 +++++++++++-----------
libminifi/src/core/ProcessSession.cpp | 10 ++--
libminifi/src/core/ProcessSessionReadCallback.cpp | 13 ++---
libminifi/src/io/BufferStream.cpp | 15 +++---
libminifi/src/io/ClientSocket.cpp | 21 ++++----
libminifi/src/io/DescriptorStream.cpp | 15 +++---
libminifi/src/io/FileStream.cpp | 22 ++++----
libminifi/src/io/InputStream.cpp | 27 +++++-----
libminifi/src/io/tls/SecureDescriptorStream.cpp | 41 +++++++-------
libminifi/src/io/tls/TLSSocket.cpp | 36 ++++++-------
libminifi/src/provenance/Provenance.cpp | 32 ++++++-----
libminifi/src/sitetosite/RawSocketProtocol.cpp | 35 ++++++------
libminifi/src/sitetosite/SiteToSiteClient.cpp | 35 ++++++------
libminifi/src/utils/ByteArrayCallback.cpp | 10 ++--
libminifi/src/utils/FileOutputCallback.cpp | 2 +-
libminifi/test/BufferReader.h | 10 ++--
.../test/archive-tests/CompressContentTests.cpp | 40 ++++++--------
libminifi/test/archive-tests/MergeFileTests.cpp | 10 ++--
.../test/rocksdb-tests/ContentSessionTests.cpp | 9 ++--
.../rocksdb-tests/DBContentRepositoryTests.cpp | 4 +-
.../test/rocksdb-tests/RocksDBStreamTests.cpp | 2 +-
libminifi/test/unit/FileStreamTests.cpp | 28 +++++-----
libminifi/test/unit/SiteToSiteHelper.h | 2 +-
nanofi/src/api/nanofi.cpp | 2 +-
nanofi/tests/CSite2SiteTests.cpp | 11 ++--
67 files changed, 458 insertions(+), 519 deletions(-)
diff --git a/extensions/aws/processors/PutS3Object.h b/extensions/aws/processors/PutS3Object.h
index ad3072a..e55ac1e 100644
--- a/extensions/aws/processors/PutS3Object.h
+++ b/extensions/aws/processors/PutS3Object.h
@@ -31,6 +31,7 @@
#include "S3Processor.h"
#include "utils/GeneralUtils.h"
+#include "utils/gsl.h"
template<typename T>
class S3TestsFixture;
@@ -94,20 +95,20 @@ class PutS3Object : public S3Processor {
auto data_stream = std::make_shared<std::stringstream>();
read_size_ = 0;
while (read_size_ < flow_size_) {
- auto next_read_size = (std::min)(flow_size_ - read_size_, BUFFER_SIZE);
- int read_ret = stream->read(buffer, next_read_size);
- if (read_ret < 0) {
+ const auto next_read_size = (std::min)(flow_size_ - read_size_, BUFFER_SIZE);
+ const auto read_ret = stream->read(buffer.data(), next_read_size);
+ if (read_ret == static_cast<size_t>(-1)) {
return -1;
}
if (read_ret > 0) {
- data_stream->write(reinterpret_cast<char*>(buffer.data()), next_read_size);
+ data_stream->write(reinterpret_cast<char*>(buffer.data()), gsl::narrow<std::streamsize>(next_read_size));
read_size_ += read_ret;
} else {
break;
}
}
result_ = s3_wrapper_.putObject(options_, data_stream);
- return read_size_;
+ return gsl::narrow<int64_t>(read_size_);
}
uint64_t flow_size_;
diff --git a/extensions/civetweb/processors/ListenHTTP.h b/extensions/civetweb/processors/ListenHTTP.h
index adf4a76..960185c 100644
--- a/extensions/civetweb/processors/ListenHTTP.h
+++ b/extensions/civetweb/processors/ListenHTTP.h
@@ -32,6 +32,7 @@
#include "core/Resource.h"
#include "core/logging/LoggerConfiguration.h"
#include "utils/MinifiConcurrentQueue.h"
+#include "utils/gsl.h"
namespace org {
namespace apache {
@@ -48,7 +49,7 @@ class ListenHTTP : public core::Processor {
/*!
* Create a new processor
*/
- ListenHTTP(const std::string& name, const utils::Identifier& uuid = {})
+ explicit ListenHTTP(const std::string& name, const utils::Identifier& uuid = {})
: Processor(name, uuid),
logger_(logging::LoggerFactory<ListenHTTP>::getLogger()),
batch_size_(0) {
@@ -56,7 +57,7 @@ class ListenHTTP : public core::Processor {
callbacks_.log_access = &logAccess;
}
// Destructor
- virtual ~ListenHTTP();
+ ~ListenHTTP() override;
// Processor Name
static constexpr char const *ProcessorName = "ListenHTTP";
// Supported Properties
@@ -131,14 +132,12 @@ class ListenHTTP : public core::Processor {
}
int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
out_str_->resize(stream->size());
- uint64_t num_read = stream->read(reinterpret_cast<uint8_t *>(&(*out_str_)[0]),
- gsl::narrow<int>(stream->size()));
-
+ const auto num_read = stream->read(reinterpret_cast<uint8_t *>(&(*out_str_)[0]), stream->size());
if (num_read != stream->size()) {
throw std::runtime_error("GraphReadCallback failed to fully read flow file input stream");
}
- return num_read;
+ return gsl::narrow<int64_t>(num_read);
}
private:
@@ -148,7 +147,7 @@ class ListenHTTP : public core::Processor {
// Write callback for transferring data from HTTP request to content repo
class WriteCallback : public OutputStreamCallback {
public:
- WriteCallback(std::unique_ptr<io::BufferStream>);
+ explicit WriteCallback(std::unique_ptr<io::BufferStream>);
int64_t process(const std::shared_ptr<io::BaseStream>& stream) override;
private:
@@ -159,11 +158,11 @@ class ListenHTTP : public core::Processor {
try {
struct mg_context* ctx = mg_get_context(conn);
/* CivetServer stores 'this' as the userdata when calling mg_start */
- CivetServer* server = static_cast<CivetServer*>(mg_get_user_data(ctx));
+ auto* const server = static_cast<CivetServer*>(mg_get_user_data(ctx));
if (server == nullptr) {
return 0;
}
- std::shared_ptr<logging::Logger>* logger = static_cast<std::shared_ptr<logging::Logger>*>(const_cast<void*>(server->getUserContext()));
+ auto* const logger = static_cast<std::shared_ptr<logging::Logger>*>(const_cast<void*>(server->getUserContext()));
if (logger == nullptr) {
return 0;
}
diff --git a/extensions/http-curl/client/HTTPCallback.h b/extensions/http-curl/client/HTTPCallback.h
index 921a8c5..242990e 100644
--- a/extensions/http-curl/client/HTTPCallback.h
+++ b/extensions/http-curl/client/HTTPCallback.h
@@ -83,7 +83,7 @@ class HttpStreamingCallback : public ByteInputCallBack {
if (stream->size() > 0) {
vec.resize(stream->size());
- stream->read(reinterpret_cast<uint8_t*>(vec.data()), gsl::narrow<int>(stream->size()));
+ stream->read(reinterpret_cast<uint8_t*>(vec.data()), stream->size());
}
return processInner(std::move(vec));
diff --git a/extensions/http-curl/client/HTTPStream.cpp b/extensions/http-curl/client/HTTPStream.cpp
index 5ba7f3f..f9b8f49 100644
--- a/extensions/http-curl/client/HTTPStream.cpp
+++ b/extensions/http-curl/client/HTTPStream.cpp
@@ -51,7 +51,7 @@ void HttpStream::close() {
http_read_callback_.close();
}
-void HttpStream::seek(uint64_t /*offset*/) {
+void HttpStream::seek(size_t /*offset*/) {
// seek is an unnecessary part of this implementatino
throw std::logic_error{"HttpStream::seek is unimplemented"};
}
@@ -81,8 +81,7 @@ int HttpStream::write(const uint8_t *value, int size) {
}
}
-int HttpStream::read(uint8_t *buf, int buflen) {
- gsl_Expects(buflen >= 0);
+size_t HttpStream::read(uint8_t *buf, size_t buflen) {
if (buflen == 0) {
return 0;
}
@@ -97,10 +96,10 @@ int HttpStream::read(uint8_t *buf, int buflen) {
started_ = true;
}
}
- return gsl::narrow<int>(http_read_callback_.readFully((char*) buf, buflen));
+ return http_read_callback_.readFully(reinterpret_cast<char*>(buf), buflen);
} else {
- return -1;
+ return static_cast<size_t>(-1);
}
}
diff --git a/extensions/http-curl/client/HTTPStream.h b/extensions/http-curl/client/HTTPStream.h
index 711d1d2..8d49d02 100644
--- a/extensions/http-curl/client/HTTPStream.h
+++ b/extensions/http-curl/client/HTTPStream.h
@@ -77,7 +77,7 @@ class HttpStream : public io::BaseStream {
* Skip to the specified offset.
* @param offset offset to which we will skip
*/
- void seek(uint64_t offset) override;
+ void seek(size_t offset) override;
size_t size() const override {
return written;
@@ -91,7 +91,7 @@ class HttpStream : public io::BaseStream {
* @param buf buffer in which we extract data
* @param buflen
*/
- int read(uint8_t *buf, int buflen) override;
+ size_t read(uint8_t *buf, size_t buflen) override;
/**
* writes value to stream
diff --git a/extensions/http-curl/tests/CivetStream.h b/extensions/http-curl/tests/CivetStream.h
index 9cbba27..e333b54 100644
--- a/extensions/http-curl/tests/CivetStream.h
+++ b/extensions/http-curl/tests/CivetStream.h
@@ -28,6 +28,8 @@
#include "io/BaseStream.h"
#include "civetweb.h"
#include "CivetServer.h"
+#include "utils/gsl.h"
+
namespace org {
namespace apache {
namespace nifi {
@@ -49,8 +51,10 @@ class CivetStream : public io::InputStream {
* @param buf buffer in which we extract data
* @param buflen
*/
- int read(uint8_t *buf, int buflen) override {
- return mg_read(conn, buf, buflen);
+ size_t read(uint8_t *buf, size_t buflen) override {
+ const auto ret = mg_read(conn, buf, buflen);
+ if (ret < 0) return static_cast<size_t>(-1);
+ return gsl::narrow_cast<size_t>(ret);
}
protected:
diff --git a/extensions/http-curl/tests/HTTPHandlers.h b/extensions/http-curl/tests/HTTPHandlers.h
index cedb834..5c49205 100644
--- a/extensions/http-curl/tests/HTTPHandlers.h
+++ b/extensions/http-curl/tests/HTTPHandlers.h
@@ -214,7 +214,7 @@ class FlowFileResponder : public ServerAwareHandler {
minifi::io::CivetStream civet_stream(conn);
minifi::io::CRCStream < minifi::io::CivetStream > stream(gsl::make_not_null(&civet_stream));
uint32_t num_attributes;
- int read;
+ size_t read;
uint64_t total_size = 0;
read = stream.read(num_attributes);
if(!isServerRunning())return false;
@@ -241,9 +241,10 @@ class FlowFileResponder : public ServerAwareHandler {
flow->data.resize(gsl::narrow<size_t>(length));
flow->total_size = total_size;
- read = stream.read(flow->data.data(), gsl::narrow<int>(length));
- if(!isServerRunning())return false;
- assert(read == gsl::narrow<int>(length));
+ read = stream.read(flow->data.data(), length);
+ if (!isServerRunning()) return false;
+ (void)read;
+ assert(read == length);
if (!invalid_checksum) {
site2site_rest_resp = std::to_string(stream.getCRC());
diff --git a/extensions/jni/jvm/JniReferenceObjects.h b/extensions/jni/jvm/JniReferenceObjects.h
index 67798e9..f81391b 100644
--- a/extensions/jni/jvm/JniReferenceObjects.h
+++ b/extensions/jni/jvm/JniReferenceObjects.h
@@ -142,7 +142,8 @@ class JniByteInputStream : public minifi::InputStreamCallback {
int writtenOffset = 0;
int read = 0;
do {
- int actual = stream_->read(buffer_, std::min(remaining, buffer_size_));
+ // JNI takes size as int, there's not much we can do here to support 2GB+ sizes
+ int actual = static_cast<int>(stream_->read(buffer_, std::min(remaining, buffer_size_)));
if (actual <= 0) {
if (read == 0) {
stream_ = nullptr;
diff --git a/extensions/libarchive/CompressContent.h b/extensions/libarchive/CompressContent.h
index fc7851e..df7fc75 100644
--- a/extensions/libarchive/CompressContent.h
+++ b/extensions/libarchive/CompressContent.h
@@ -21,6 +21,7 @@
#define __COMPRESS_CONTENT_H__
#include <cinttypes>
+#include <utility>
#include "archive_entry.h"
#include "archive.h"
@@ -56,7 +57,7 @@ public:
, encapsulateInTar_(false) {
}
// Destructor
- virtual ~CompressContent() = default;
+ ~CompressContent() override = default;
// Processor Name
static constexpr char const* ProcessorName = "CompressContent";
// Supported Properties
@@ -94,8 +95,8 @@ public:
ReadCallbackCompress(std::shared_ptr<core::FlowFile> &flow, struct archive *arch, struct archive_entry *entry) :
flow_(flow), arch_(arch), entry_(entry), status_(0), logger_(logging::LoggerFactory<CompressContent>::getLogger()) {
}
- ~ReadCallbackCompress() = default;
- int64_t process(const std::shared_ptr<io::BaseStream>& stream) {
+ ~ReadCallbackCompress() override = default;
+ int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
uint8_t buffer[4096U];
int64_t ret = 0;
uint64_t read_size = 0;
@@ -107,13 +108,13 @@ public:
return -1;
}
while (read_size < flow_->getSize()) {
- ret = stream->read(buffer, sizeof(buffer));
- if (ret < 0) {
+ const auto readret = stream->read(buffer, sizeof(buffer));
+ if (readret == static_cast<size_t>(-1)) {
status_ = -1;
return -1;
}
- if (ret > 0) {
- ret = archive_write_data(arch_, buffer, gsl::narrow<size_t>(ret));
+ if (readret > 0) {
+ ret = archive_write_data(arch_, buffer, readret);
if (ret < 0) {
logger_->log_error("Compress Content archive error %s", archive_error_string(arch_));
status_ = -1;
@@ -124,7 +125,7 @@ public:
break;
}
}
- return read_size;
+ return gsl::narrow<int64_t>(read_size);
}
std::shared_ptr<core::FlowFile> flow_;
struct archive *arch_;
@@ -135,25 +136,23 @@ public:
// Nest Callback Class for read stream from flow for decompress
class ReadCallbackDecompress: public InputStreamCallback {
public:
- ReadCallbackDecompress(const std::shared_ptr<core::FlowFile> &flow) :
- read_size_(0), offset_(0), flow_(flow) {
- origin_offset_ = flow_->getOffset();
+ explicit ReadCallbackDecompress(std::shared_ptr<core::FlowFile> flow) :
+ flow_(std::move(flow)) {
}
- ~ReadCallbackDecompress() = default;
- int64_t process(const std::shared_ptr<io::BaseStream>& stream) {
+ ~ReadCallbackDecompress() override = default;
+ int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
read_size_ = 0;
stream->seek(offset_);
- int readRet = stream->read(buffer_, sizeof(buffer_));
+ const auto readRet = stream->read(buffer_, sizeof(buffer_));
read_size_ = readRet;
- if (readRet > 0) {
- offset_ += read_size_;
+ if (readRet != static_cast<size_t>(-1)) {
+ offset_ += readRet;
}
- return readRet;
+ return gsl::narrow<int64_t>(readRet);
}
- int64_t read_size_;
- uint8_t buffer_[8192];
- uint64_t offset_;
- uint64_t origin_offset_;
+ size_t read_size_ = 0;
+ uint8_t buffer_[8192] = {0};
+ size_t offset_ = 0;
std::shared_ptr<core::FlowFile> flow_;
};
// Nest Callback Class for write stream
@@ -383,8 +382,8 @@ public:
std::vector<uint8_t> buffer(16 * 1024U);
int64_t read_size = 0;
while (read_size < gsl::narrow<int64_t>(writer_.flow_->getSize())) {
- int ret = inputStream->read(buffer.data(), gsl::narrow<int>(buffer.size()));
- if (ret < 0) {
+ const auto ret = inputStream->read(buffer.data(), buffer.size());
+ if (ret == static_cast<size_t>(-1)) {
return -1;
} else if (ret == 0) {
break;
@@ -414,7 +413,7 @@ public:
success_ = filterStream->isFinished();
- return flow_->getSize();
+ return gsl::narrow<int64_t>(flow_->getSize());
}
};
@@ -440,7 +439,7 @@ private:
void processFlowFile(const std::shared_ptr<core::FlowFile>& flowFile, const std::shared_ptr<core::ProcessSession>& session);
std::shared_ptr<logging::Logger> logger_;
- int compressLevel_;
+ int compressLevel_{};
CompressionMode compressMode_;
ExtendedCompressionFormat compressFormat_;
bool updateFileName_;
diff --git a/extensions/libarchive/FocusArchiveEntry.cpp b/extensions/libarchive/FocusArchiveEntry.cpp
index 59a43e0..9452dce 100644
--- a/extensions/libarchive/FocusArchiveEntry.cpp
+++ b/extensions/libarchive/FocusArchiveEntry.cpp
@@ -34,6 +34,7 @@
#include "core/ProcessContext.h"
#include "core/ProcessSession.h"
#include "Exception.h"
+#include "utils/gsl.h"
namespace org {
namespace apache {
@@ -150,20 +151,20 @@ typedef struct {
la_ssize_t FocusArchiveEntry::ReadCallback::read_cb(struct archive * a, void *d, const void **buf) {
auto data = static_cast<FocusArchiveEntryReadData *>(d);
*buf = data->buf;
- int read = 0;
- int last_read = 0;
+ size_t read = 0;
+ size_t last_read = 0;
do {
last_read = data->stream->read(reinterpret_cast<uint8_t *>(data->buf), 8196 - read);
read += last_read;
- } while (data->processor->isRunning() && last_read > 0 && read < 8196);
+ } while (data->processor->isRunning() && last_read > 0 && last_read != static_cast<size_t>(-1) && read < 8196);
if (!data->processor->isRunning()) {
archive_set_error(a, EINTR, "Processor shut down during read");
return -1;
}
- return read;
+ return gsl::narrow<la_ssize_t>(read);
}
int64_t FocusArchiveEntry::ReadCallback::process(const std::shared_ptr<io::BaseStream>& stream) {
diff --git a/extensions/librdkafka/PublishKafka.cpp b/extensions/librdkafka/PublishKafka.cpp
index b83d028..bbb66a8 100644
--- a/extensions/librdkafka/PublishKafka.cpp
+++ b/extensions/librdkafka/PublishKafka.cpp
@@ -409,14 +409,13 @@ class ReadCallback : public InputStreamCallback {
}
for (size_t segment_num = 0; read_size_ < flow_size_; ++segment_num) {
- const int readRet = stream->read(buffer.data(), gsl::narrow<int>(buffer.size()));
- if (readRet < 0) {
+ const auto readRet = stream->read(buffer.data(), buffer.size());
+ if (readRet == static_cast<size_t>(-1)) {
status_ = -1;
error_ = "Failed to read from stream";
return read_size_;
}
-
- if (readRet <= 0) { break; }
+ if (readRet == 0) { break; }
const auto err = produce(segment_num, buffer, readRet);
if (err) {
diff --git a/extensions/mqtt/processors/ConvertJSONAck.h b/extensions/mqtt/processors/ConvertJSONAck.h
index 47f53d0..659af59 100644
--- a/extensions/mqtt/processors/ConvertJSONAck.h
+++ b/extensions/mqtt/processors/ConvertJSONAck.h
@@ -31,6 +31,8 @@
#include "MQTTClient.h"
#include "c2/protocols/RESTProtocol.h"
#include "ConvertBase.h"
+#include "utils/gsl.h"
+
namespace org {
namespace apache {
namespace nifi {
@@ -52,7 +54,7 @@ class ConvertJSONAck : public ConvertBase {
logger_(logging::LoggerFactory<ConvertJSONAck>::getLogger()) {
}
// Destructor
- virtual ~ConvertJSONAck() = default;
+ ~ConvertJSONAck() override = default;
// Processor Name
static constexpr char const* ProcessorName = "ConvertJSONAck";
@@ -72,14 +74,13 @@ class ConvertJSONAck : public ConvertBase {
class ReadCallback : public InputStreamCallback {
public:
ReadCallback() = default;
- ~ReadCallback() = default;
- int64_t process(const std::shared_ptr<io::BaseStream>& stream) {
- int64_t ret = 0;
+ ~ReadCallback() override = default;
+ int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
if (nullptr == stream)
return 0;
buffer_.resize(stream->size());
- ret = stream->read(reinterpret_cast<uint8_t*>(buffer_.data()), stream->size());
- return ret;
+ const auto ret = stream->read(reinterpret_cast<uint8_t*>(buffer_.data()), stream->size());
+ return gsl::narrow<int64_t>(ret);
}
std::vector<char> buffer_;
};
diff --git a/extensions/mqtt/processors/ConvertUpdate.cpp b/extensions/mqtt/processors/ConvertUpdate.cpp
index ca5a8d5..69da1fc 100644
--- a/extensions/mqtt/processors/ConvertUpdate.cpp
+++ b/extensions/mqtt/processors/ConvertUpdate.cpp
@@ -37,7 +37,7 @@ void ConvertUpdate::onTrigger(const std::shared_ptr<core::ProcessContext> &conte
bool received_update = false;
while (mqtt_service_->get(100, listening_topic, update)) {
// first we have the input topic string followed by the update URI
- if (update.size() > 0) {
+ if (!update.empty()) {
io::BufferStream stream(update.data(), update.size());
diff --git a/extensions/mqtt/processors/PublishMQTT.h b/extensions/mqtt/processors/PublishMQTT.h
index fdad09f..ba14c0f 100644
--- a/extensions/mqtt/processors/PublishMQTT.h
+++ b/extensions/mqtt/processors/PublishMQTT.h
@@ -20,6 +20,8 @@
#ifndef __PUBLISH_MQTT_H__
#define __PUBLISH_MQTT_H__
+#include <limits>
+
#include "FlowFileRecord.h"
#include "core/Processor.h"
#include "core/ProcessSession.h"
@@ -50,7 +52,7 @@ class PublishMQTT : public processors::AbstractMQTTProcessor {
max_seg_size_ = ULLONG_MAX;
}
// Destructor
- virtual ~PublishMQTT() = default;
+ ~PublishMQTT() override = default;
// Processor Name
static constexpr char const* ProcessorName = "PublishMQTT";
// Supported Properties
@@ -74,18 +76,20 @@ class PublishMQTT : public processors::AbstractMQTTProcessor {
status_ = 0;
read_size_ = 0;
}
- ~ReadCallback() = default;
- int64_t process(const std::shared_ptr<io::BaseStream>& stream) {
+ ~ReadCallback() override = default;
+ int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
if (flow_size_ < max_seg_size_)
max_seg_size_ = flow_size_;
+ gsl_Expects(max_seg_size_ < gsl::narrow<uint64_t>(std::numeric_limits<int>::max()));
std::vector<unsigned char> buffer(max_seg_size_);
read_size_ = 0;
status_ = 0;
while (read_size_ < flow_size_) {
- int readRet = stream->read(&buffer[0], max_seg_size_);
+ // MQTTClient_message::payloadlen is int, so we can't handle 2GB+
+ int readRet = static_cast<int>(stream->read(&buffer[0], max_seg_size_));
if (readRet < 0) {
status_ = -1;
- return read_size_;
+ return gsl::narrow<int64_t>(read_size_);
}
if (readRet > 0) {
MQTTClient_message pubmsg = MQTTClient_message_initializer;
@@ -97,12 +101,12 @@ class PublishMQTT : public processors::AbstractMQTTProcessor {
status_ = -1;
return -1;
}
- read_size_ += readRet;
+ read_size_ += gsl::narrow<size_t>(readRet);
} else {
break;
}
}
- return read_size_;
+ return gsl::narrow<int64_t>(read_size_);
}
uint64_t flow_size_;
uint64_t max_seg_size_;
diff --git a/extensions/opc/src/putopc.cpp b/extensions/opc/src/putopc.cpp
index 1f3fe63..e613d1b 100644
--- a/extensions/opc/src/putopc.cpp
+++ b/extensions/opc/src/putopc.cpp
@@ -428,21 +428,15 @@ namespace processors {
uint64_t size = 0;
do {
- int read = stream->read(buf_.data() + size, 1024);
-
- if (read < 0) {
- return -1;
- }
-
- if (read == 0) {
- break;
- }
+ const auto read = stream->read(buf_.data() + size, 1024);
+ if (read == static_cast<size_t>(-1)) return -1;
+ if (read == 0) break;
size += read;
} while (size < stream->size());
logger_->log_trace("Read %llu bytes from flowfile content to buffer", stream->size());
- return size;
+ return gsl::narrow<int64_t>(size);
}
} /* namespace processors */
diff --git a/extensions/rocksdb-repos/RocksDbStream.cpp b/extensions/rocksdb-repos/RocksDbStream.cpp
index 86daa20..27b342e 100644
--- a/extensions/rocksdb-repos/RocksDbStream.cpp
+++ b/extensions/rocksdb-repos/RocksDbStream.cpp
@@ -47,7 +47,7 @@ RocksDbStream::RocksDbStream(std::string path, gsl::not_null<minifi::internal::R
void RocksDbStream::close() {
}
-void RocksDbStream::seek(uint64_t /*offset*/) {
+void RocksDbStream::seek(size_t /*offset*/) {
// noop
}
@@ -84,28 +84,17 @@ int RocksDbStream::write(const uint8_t *value, int size) {
}
}
-int RocksDbStream::read(uint8_t *buf, int buflen) {
- gsl_Expects(buflen >= 0);
- if (!exists_) {
- return -1;
- }
- if (buflen == 0) {
- return 0;
- }
- if (!IsNullOrEmpty(buf)) {
- size_t amtToRead = gsl::narrow<size_t>(buflen);
- if (offset_ >= value_.size()) {
- return 0;
- }
- if (amtToRead > value_.size() - offset_) {
- amtToRead = value_.size() - offset_;
- }
- std::memcpy(buf, value_.data() + offset_, amtToRead);
- offset_ += amtToRead;
- return gsl::narrow<int>(amtToRead);
- } else {
- return -1;
- }
+size_t RocksDbStream::read(uint8_t *buf, size_t buflen) {
+ // The check have to be in this order for RocksDBStreamTest "Read zero bytes" to succeed
+ if (!exists_) return static_cast<size_t>(-1);
+ if (buflen == 0) return 0;
+ if (IsNullOrEmpty(buf)) return static_cast<size_t>(-1);
+ if (offset_ >= value_.size()) return 0;
+
+ const auto amtToRead = std::min(buflen, value_.size() - offset_);
+ std::memcpy(buf, value_.data() + offset_, amtToRead);
+ offset_ += amtToRead;
+ return amtToRead;
}
} /* namespace io */
diff --git a/extensions/rocksdb-repos/RocksDbStream.h b/extensions/rocksdb-repos/RocksDbStream.h
index 8e47b51..d051802 100644
--- a/extensions/rocksdb-repos/RocksDbStream.h
+++ b/extensions/rocksdb-repos/RocksDbStream.h
@@ -56,7 +56,7 @@ class RocksDbStream : public io::BaseStream {
* Skip to the specified offset.
* @param offset offset to which we will skip
*/
- void seek(uint64_t offset) override;
+ void seek(size_t offset) override;
size_t size() const override {
return size_;
@@ -70,7 +70,7 @@ class RocksDbStream : public io::BaseStream {
* @param buf buffer in which we extract data
* @param buflen
*/
- int read(uint8_t *buf, int buflen) override;
+ size_t read(uint8_t *buf, size_t buflen) override;
/**
* writes value to stream
diff --git a/extensions/script/python/PyBaseStream.cpp b/extensions/script/python/PyBaseStream.cpp
index fcde480..1733994 100644
--- a/extensions/script/python/PyBaseStream.cpp
+++ b/extensions/script/python/PyBaseStream.cpp
@@ -49,13 +49,11 @@ py::bytes PyBaseStream::read(size_t len) {
std::vector<uint8_t> buffer(len);
- auto read = stream_->read(buffer.data(), static_cast<int>(len));
- auto result = py::bytes(reinterpret_cast<char *>(buffer.data()), static_cast<size_t>(read));
-
- return result;
+ const auto read = stream_->read(buffer.data(), len);
+ return py::bytes(reinterpret_cast<char *>(buffer.data()), static_cast<size_t>(read));
}
-size_t PyBaseStream::write(py::bytes buf) {
+size_t PyBaseStream::write(const py::bytes& buf) {
const auto &&buf_str = buf.operator std::string();
return static_cast<size_t>(stream_->write(reinterpret_cast<uint8_t *>(const_cast<char *>(buf_str.data())),
static_cast<int>(buf_str.length())));
diff --git a/extensions/script/python/PyBaseStream.h b/extensions/script/python/PyBaseStream.h
index f07fe80..1f11065 100644
--- a/extensions/script/python/PyBaseStream.h
+++ b/extensions/script/python/PyBaseStream.h
@@ -37,7 +37,7 @@ class PyBaseStream {
py::bytes read();
py::bytes read(size_t len = 0);
- size_t write(py::bytes buf);
+ size_t write(const py::bytes& buf);
private:
std::shared_ptr<io::BaseStream> stream_;
diff --git a/extensions/sftp/client/SFTPClient.cpp b/extensions/sftp/client/SFTPClient.cpp
index e657d2e..8d528ab 100644
--- a/extensions/sftp/client/SFTPClient.cpp
+++ b/extensions/sftp/client/SFTPClient.cpp
@@ -562,12 +562,12 @@ bool SFTPClient::putFile(const std::string& path, io::BaseStream& input, bool ov
return true;
}
- const size_t buf_size = expected_size < 0 ? MAX_BUFFER_SIZE : std::min<size_t>(expected_size, MAX_BUFFER_SIZE);
+ const size_t buf_size = expected_size < 0 ? MAX_BUFFER_SIZE : std::min(gsl::narrow<size_t>(expected_size), MAX_BUFFER_SIZE);
std::vector<uint8_t> buf(buf_size);
uint64_t total_read = 0U;
do {
- int read_ret = input.read(buf.data(), buf.size());
- if (read_ret < 0) {
+ const auto read_ret = input.read(buf.data(), buf.size());
+ if (read_ret == static_cast<size_t>(-1)) {
last_error_.setLibssh2Error(LIBSSH2_FX_OK);
logger_->log_error("Error while reading input");
return false;
@@ -577,20 +577,20 @@ bool SFTPClient::putFile(const std::string& path, io::BaseStream& input, bool ov
}
logger_->log_trace("Read %d bytes", read_ret);
total_read += read_ret;
- ssize_t remaining = read_ret;
+ auto remaining = gsl::narrow<size_t>(read_ret);
while (remaining > 0) {
- int write_ret = libssh2_sftp_write(file_handle, reinterpret_cast<char*>(buf.data() + (read_ret - remaining)), remaining);
+ const auto write_ret = libssh2_sftp_write(file_handle, reinterpret_cast<char*>(buf.data() + (read_ret - remaining)), remaining);
if (write_ret < 0) {
last_error_.setSftpError(SFTPError::IoError);
logger_->log_error("Failed to write remote file \"%s\"", path.c_str());
return false;
}
logger_->log_trace("Wrote %d bytes to remote file \"%s\"", write_ret, path.c_str());
- remaining -= write_ret;
+ remaining -= gsl::narrow<size_t>(write_ret);
}
} while (true);
- if (expected_size >= 0 && total_read != gsl::narrow<uint64_t>(expected_size)) {
+ if (expected_size >= 0 && total_read != gsl::narrow<size_t>(expected_size)) {
last_error_.setLibssh2Error(LIBSSH2_FX_OK);
logger_->log_error("Input has unexpected size, expected: %ld, actual: %lu", path.c_str(), expected_size, total_read);
return false;
diff --git a/extensions/standard-processors/processors/ExtractText.cpp b/extensions/standard-processors/processors/ExtractText.cpp
index 345b84a..04e5e91 100644
--- a/extensions/standard-processors/processors/ExtractText.cpp
+++ b/extensions/standard-processors/processors/ExtractText.cpp
@@ -113,10 +113,9 @@ void ExtractText::onTrigger(core::ProcessContext *context, core::ProcessSession
}
int64_t ExtractText::ReadCallback::process(const std::shared_ptr<io::BaseStream>& stream) {
- int64_t ret = 0;
- uint64_t read_size = 0;
+ size_t read_size = 0;
bool regex_mode;
- uint64_t size_limit = flowFile_->getSize();
+ size_t size_limit = flowFile_->getSize();
std::string attrKey, sizeLimitStr;
ctx_->getProperty(Attribute.getName(), attrKey);
@@ -126,22 +125,22 @@ int64_t ExtractText::ReadCallback::process(const std::shared_ptr<io::BaseStream>
if (sizeLimitStr.empty())
size_limit = DEFAULT_SIZE_LIMIT;
else if (sizeLimitStr != "0")
- size_limit = std::stoi(sizeLimitStr);
+ size_limit = gsl::narrow_cast<size_t>(std::stoi(sizeLimitStr));
std::ostringstream contentStream;
while (read_size < size_limit) {
// Don't read more than config limit or the size of the buffer
- int length = gsl::narrow<int>(std::min<uint64_t>(size_limit - read_size, buffer_.size()));
- ret = stream->read(buffer_, length);
+ const auto length = std::min(size_limit - read_size, buffer_.size());
+ const auto ret = stream->read(buffer_, length);
- if (ret < 0) {
+ if (ret == static_cast<size_t>(-1)) {
return -1; // Stream error
} else if (ret == 0) {
break; // End of stream, no more data
}
- contentStream.write(reinterpret_cast<const char*>(buffer_.data()), ret);
+ contentStream.write(reinterpret_cast<const char*>(buffer_.data()), gsl::narrow<std::streamsize>(ret));
read_size += ret;
if (contentStream.fail()) {
return -1;
@@ -162,9 +161,8 @@ int64_t ExtractText::ReadCallback::process(const std::shared_ptr<io::BaseStream>
bool repeatingcapture;
ctx_->getProperty(EnableRepeatingCaptureGroup.getName(), repeatingcapture);
- int maxCaptureSizeProperty;
- ctx_->getProperty(MaxCaptureGroupLen.getName(), maxCaptureSizeProperty);
- size_t maxCaptureSize = gsl::narrow<size_t>(maxCaptureSizeProperty);
+ size_t maxCaptureSize;
+ ctx_->getProperty(MaxCaptureGroupLen.getName(), maxCaptureSize);
std::string contentStr = contentStream.str();
@@ -212,7 +210,7 @@ int64_t ExtractText::ReadCallback::process(const std::shared_ptr<io::BaseStream>
} else {
flowFile_->setAttribute(attrKey, contentStream.str());
}
- return read_size;
+ return gsl::narrow<int64_t>(read_size);
}
ExtractText::ReadCallback::ReadCallback(std::shared_ptr<core::FlowFile> flowFile, core::ProcessContext *ctx, std::shared_ptr<logging::Logger> lgr)
diff --git a/extensions/standard-processors/processors/LogAttribute.h b/extensions/standard-processors/processors/LogAttribute.h
index 6d4b2ef..7a7d920 100644
--- a/extensions/standard-processors/processors/LogAttribute.h
+++ b/extensions/standard-processors/processors/LogAttribute.h
@@ -47,14 +47,14 @@ class LogAttribute : public core::Processor {
* Create a new processor
*/
explicit LogAttribute(const std::string& name, const utils::Identifier& uuid = {})
- : Processor(std::move(name), uuid),
+ : Processor(name, uuid),
flowfiles_to_log_(1),
hexencode_(false),
max_line_length_(80U),
logger_(logging::LoggerFactory<LogAttribute>::getLogger()) {
}
// Destructor
- virtual ~LogAttribute() = default;
+ ~LogAttribute() override = default;
// Processor Name
static constexpr char const* ProcessorName = "LogAttribute";
// Supported Properties
@@ -103,16 +103,14 @@ class LogAttribute : public core::Processor {
: logger_(std::move(logger))
, buffer_(size) {
}
- int64_t process(const std::shared_ptr<io::BaseStream>& stream) {
- if (buffer_.size() == 0U) {
- return 0U;
- }
- int ret = stream->read(buffer_.data(), gsl::narrow<int>(buffer_.size()));
- if (ret < 0 || static_cast<uint64_t>(ret) != buffer_.size()) {
- logger_->log_error("%zu bytes were requested from the stream but %d bytes were read. Rolling back.", buffer_.size(), ret);
+ int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
+ if (buffer_.empty()) return 0U;
+ const auto ret = stream->read(buffer_.data(), buffer_.size());
+ if (ret != buffer_.size()) {
+ logger_->log_error("%zu bytes were requested from the stream but %zu bytes were read. Rolling back.", buffer_.size(), size_t{ret});
throw Exception(PROCESSOR_EXCEPTION, "Failed to read the entire FlowFile.");
}
- return buffer_.size();
+ return gsl::narrow<int64_t>(buffer_.size());
}
std::shared_ptr<logging::Logger> logger_;
std::vector<uint8_t> buffer_;
@@ -123,7 +121,7 @@ class LogAttribute : public core::Processor {
// OnTrigger method, implemented by NiFi LogAttribute
void onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) override;
// Initialize, over write by NiFi LogAttribute
- void initialize(void) override;
+ void initialize() override;
private:
uint64_t flowfiles_to_log_;
diff --git a/extensions/standard-processors/processors/PutFile.cpp b/extensions/standard-processors/processors/PutFile.cpp
index 3765856..2060867 100644
--- a/extensions/standard-processors/processors/PutFile.cpp
+++ b/extensions/standard-processors/processors/PutFile.cpp
@@ -25,10 +25,12 @@
#include <memory>
#include <string>
#include <set>
+#include <utility>
#ifdef WIN32
#include <Windows.h>
#endif
#include "utils/file/FileUtils.h"
+#include "utils/gsl.h"
namespace org {
namespace apache {
@@ -304,9 +306,9 @@ void PutFile::getDirectoryPermissions(core::ProcessContext *context) {
}
#endif
-PutFile::ReadCallback::ReadCallback(const std::string &tmp_file, const std::string &dest_file)
- : tmp_file_(tmp_file),
- dest_file_(dest_file) {
+PutFile::ReadCallback::ReadCallback(std::string tmp_file, std::string dest_file)
+ : tmp_file_(std::move(tmp_file)),
+ dest_file_(std::move(dest_file)) {
}
// Copy the entire file contents to the temporary file
@@ -319,17 +321,10 @@ int64_t PutFile::ReadCallback::process(const std::shared_ptr<io::BaseStream>& st
std::ofstream tmp_file_os(tmp_file_, std::ios::out | std::ios::binary);
do {
- int read = stream->read(buffer, 1024);
-
- if (read < 0) {
- return -1;
- }
-
- if (read == 0) {
- break;
- }
-
- tmp_file_os.write(reinterpret_cast<char *>(buffer), read);
+ const auto read = stream->read(buffer, 1024);
+ if (read == static_cast<size_t>(-1)) return -1;
+ if (read == 0) break;
+ tmp_file_os.write(reinterpret_cast<char *>(buffer), gsl::narrow<std::streamsize>(read));
size += read;
} while (size < stream->size());
@@ -339,7 +334,7 @@ int64_t PutFile::ReadCallback::process(const std::shared_ptr<io::BaseStream>& st
write_succeeded_ = true;
}
- return size;
+ return gsl::narrow<int64_t>(size);
}
// Renames tmp file to final destination
diff --git a/extensions/standard-processors/processors/PutFile.h b/extensions/standard-processors/processors/PutFile.h
index c09bffb..487213b 100644
--- a/extensions/standard-processors/processors/PutFile.h
+++ b/extensions/standard-processors/processors/PutFile.h
@@ -82,7 +82,7 @@ class PutFile : public core::Processor {
class ReadCallback : public InputStreamCallback {
public:
- ReadCallback(const std::string &tmp_file, const std::string &dest_file);
+ ReadCallback(std::string tmp_file, std::string dest_file);
~ReadCallback() override;
int64_t process(const std::shared_ptr<io::BaseStream>& stream) override;
bool commit();
diff --git a/libminifi/include/io/AtomicEntryStream.h b/libminifi/include/io/AtomicEntryStream.h
index 75b2d4c..e33b3c5 100644
--- a/libminifi/include/io/AtomicEntryStream.h
+++ b/libminifi/include/io/AtomicEntryStream.h
@@ -64,7 +64,7 @@ class AtomicEntryStream : public BaseStream {
* Skip to the specified offset.
* @param offset offset to which we will skip
*/
- void seek(uint64_t offset) override;
+ void seek(size_t offset) override;
size_t size() const override {
return length_;
@@ -75,7 +75,7 @@ class AtomicEntryStream : public BaseStream {
* @param buf buffer in which we extract data
* @param buflen
*/
- int read(uint8_t *buf, int buflen) override;
+ size_t read(uint8_t *buf, size_t buflen) override;
/**
* writes value to stream
@@ -103,7 +103,7 @@ AtomicEntryStream<T>::~AtomicEntryStream() {
}
template<typename T>
-void AtomicEntryStream<T>::seek(uint64_t offset) {
+void AtomicEntryStream<T>::seek(size_t offset) {
std::lock_guard<std::recursive_mutex> lock(entry_lock_);
offset_ = gsl::narrow<size_t>(offset);
}
@@ -129,14 +129,13 @@ int AtomicEntryStream<T>::write(const uint8_t *value, int size) {
}
template<typename T>
-int AtomicEntryStream<T>::read(uint8_t *buf, int buflen) {
- gsl_Expects(buflen >= 0);
+size_t AtomicEntryStream<T>::read(uint8_t *buf, size_t buflen) {
if (buflen == 0) {
return 0;
}
if (nullptr != buf && !invalid_stream_) {
std::lock_guard<std::recursive_mutex> lock(entry_lock_);
- int len = buflen;
+ auto len = buflen;
core::repository::RepoValue<T> *value;
if (entry_->getValue(key_, &value)) {
if (offset_ + len > value->getBufferSize()) {
@@ -152,7 +151,7 @@ int AtomicEntryStream<T>::read(uint8_t *buf, int buflen) {
return len;
}
}
- return -1;
+ return static_cast<size_t>(-1);
}
} // namespace io
diff --git a/libminifi/include/io/BufferStream.h b/libminifi/include/io/BufferStream.h
index 3f0bfdc..7ab2b9e 100644
--- a/libminifi/include/io/BufferStream.h
+++ b/libminifi/include/io/BufferStream.h
@@ -35,7 +35,7 @@ class BufferStream : public BaseStream {
public:
BufferStream() = default;
- BufferStream(const uint8_t *buf, const unsigned int len) {
+ BufferStream(const uint8_t *buf, const size_t len) {
write(buf, len);
}
@@ -48,7 +48,7 @@ class BufferStream : public BaseStream {
int write(const uint8_t* data, int len) final;
- int read(uint8_t* buffer, int len) override;
+ size_t read(uint8_t* buffer, size_t len) override;
int initialize() override {
buffer_.clear();
@@ -56,7 +56,7 @@ class BufferStream : public BaseStream {
return 0;
}
- void seek(uint64_t offset) override {
+ void seek(size_t offset) override {
readOffset_ += offset;
}
diff --git a/libminifi/include/io/CRCStream.h b/libminifi/include/io/CRCStream.h
index 2204537..abe0762 100644
--- a/libminifi/include/io/CRCStream.h
+++ b/libminifi/include/io/CRCStream.h
@@ -84,9 +84,9 @@ class InputCRCStream : public virtual CRCStreamBase<StreamType>, public InputStr
public:
using InputStream::read;
- int read(uint8_t *buf, int buflen) override {
- int ret = child_stream_->read(buf, buflen);
- if (ret > 0) {
+ size_t read(uint8_t *buf, size_t buflen) override {
+ const auto ret = child_stream_->read(buf, buflen);
+ if (ret > 0 && ret != static_cast<size_t>(-1)) {
crc_ = crc32(crc_, buf, ret);
}
return ret;
diff --git a/libminifi/include/io/ClientSocket.h b/libminifi/include/io/ClientSocket.h
index 37626c8..398a1d5 100644
--- a/libminifi/include/io/ClientSocket.h
+++ b/libminifi/include/io/ClientSocket.h
@@ -159,7 +159,7 @@ class Socket : public BaseStream {
* @param buflen
* @param retrieve_all_bytes determines if we should read all bytes before returning
*/
- int read(uint8_t *buf, int buflen) override {
+ size_t read(uint8_t *buf, size_t buflen) override {
return read(buf, buflen, true);
}
@@ -169,7 +169,7 @@ class Socket : public BaseStream {
* @param buflen
* @param retrieve_all_bytes determines if we should read all bytes before returning
*/
- virtual int read(uint8_t *buf, int buflen, bool retrieve_all_bytes);
+ virtual size_t read(uint8_t *buf, size_t buflen, bool retrieve_all_bytes);
protected:
/**
diff --git a/libminifi/include/io/DescriptorStream.h b/libminifi/include/io/DescriptorStream.h
index a7ea713..ee5aec7 100644
--- a/libminifi/include/io/DescriptorStream.h
+++ b/libminifi/include/io/DescriptorStream.h
@@ -52,14 +52,14 @@ class DescriptorStream : public io::BaseStream {
* Skip to the specified offset.
* @param offset offset to which we will skip
*/
- void seek(uint64_t offset) override;
+ void seek(size_t offset) override;
/**
* Reads data and places it into buf
* @param buf buffer in which we extract data
* @param buflen
*/
- int read(uint8_t *buf, int buflen) override;
+ size_t read(uint8_t *buf, size_t buflen) override;
/**
* writes value to stream
diff --git a/libminifi/include/io/FileStream.h b/libminifi/include/io/FileStream.h
index 3156a30..ad7f0dc 100644
--- a/libminifi/include/io/FileStream.h
+++ b/libminifi/include/io/FileStream.h
@@ -63,7 +63,7 @@ class FileStream : public io::BaseStream {
* Skip to the specified offset.
* @param offset offset to which we will skip
*/
- void seek(uint64_t offset) override;
+ void seek(size_t offset) override;
size_t size() const override {
return length_;
@@ -77,7 +77,7 @@ class FileStream : public io::BaseStream {
* @param buf buffer in which we extract data
* @param buflen
*/
- int read(uint8_t *buf, int buflen) override;
+ size_t read(uint8_t *buf, size_t buflen) override;
/**
* writes value to stream
diff --git a/libminifi/include/io/InputStream.h b/libminifi/include/io/InputStream.h
index 60a297c..cebf266 100644
--- a/libminifi/include/io/InputStream.h
+++ b/libminifi/include/io/InputStream.h
@@ -39,32 +39,32 @@ class InputStream : public virtual Stream {
* reads a byte array from the stream
* @param value reference in which will set the result
* @param len length to read
- * @return resulting read size
+ * @return resulting read size or static_cast<size_t>(-1) on error or static_cast<size_t>(-2) for ClientSocket EAGAIN
**/
- virtual int read(uint8_t *value, int len) = 0;
+ virtual size_t read(uint8_t *value, size_t len) = 0;
- int read(std::vector<uint8_t>& buffer, int len);
+ size_t read(std::vector<uint8_t>& buffer, size_t len);
/**
* read string from stream
* @param str reference string
* @return resulting read size
**/
- int read(std::string &str, bool widen = false);
+ size_t read(std::string &str, bool widen = false);
/**
* read a bool from stream
* @param value reference to the output
* @return resulting read size
**/
- int read(bool& value);
+ size_t read(bool& value);
/**
* read a uuid from stream
* @param value reference to the output
* @return resulting read size
**/
- int read(utils::Identifier& value);
+ size_t read(utils::Identifier& value);
/**
* reads sizeof(Integral) bytes from the stream
@@ -72,10 +72,10 @@ class InputStream : public virtual Stream {
* @return resulting read size
**/
template<typename Integral, typename = std::enable_if<std::is_unsigned<Integral>::value && !std::is_same<Integral, bool>::value>>
- int read(Integral& value) {
+ size_t read(Integral& value) {
uint8_t buf[sizeof(Integral)]{};
if (read(buf, sizeof(Integral)) != sizeof(Integral)) {
- return -1;
+ return static_cast<size_t>(-1);
}
value = 0;
diff --git a/libminifi/include/io/Stream.h b/libminifi/include/io/Stream.h
index e067b52..cb528e5 100644
--- a/libminifi/include/io/Stream.h
+++ b/libminifi/include/io/Stream.h
@@ -31,7 +31,7 @@ class Stream {
public:
virtual void close() {}
- virtual void seek(uint64_t /*offset*/) {
+ virtual void seek(size_t /*offset*/) {
throw std::runtime_error("Seek is not supported");
}
diff --git a/libminifi/include/io/StreamPipe.h b/libminifi/include/io/StreamPipe.h
index b3fafce..8c88568 100644
--- a/libminifi/include/io/StreamPipe.h
+++ b/libminifi/include/io/StreamPipe.h
@@ -49,14 +49,10 @@ inline int64_t pipe(const std::shared_ptr<io::InputStream>& src, const std::shar
uint8_t buffer[4096U];
int64_t totalTransferred = 0;
while (true) {
- int readRet = src->read(buffer, sizeof(buffer));
- if (readRet < 0) {
- return readRet;
- }
- if (readRet == 0) {
- break;
- }
- int remaining = readRet;
+ const auto readRet = src->read(buffer, sizeof(buffer));
+ if (readRet == static_cast<size_t>(-1)) return -1;
+ if (readRet == 0) break;
+ auto remaining = readRet;
int transferred = 0;
while (remaining > 0) {
int writeRet = dst->write(buffer + transferred, remaining);
diff --git a/libminifi/include/io/tls/SecureDescriptorStream.h b/libminifi/include/io/tls/SecureDescriptorStream.h
index 7699910..5f285c1 100644
--- a/libminifi/include/io/tls/SecureDescriptorStream.h
+++ b/libminifi/include/io/tls/SecureDescriptorStream.h
@@ -63,7 +63,7 @@ class SecureDescriptorStream : public io::BaseStream {
* Skip to the specified offset.
* @param offset offset to which we will skip
*/
- void seek(uint64_t offset) override;
+ void seek(size_t offset) override;
size_t size() const override {
return -1;
@@ -74,7 +74,7 @@ class SecureDescriptorStream : public io::BaseStream {
* @param buf buffer in which we extract data
* @param buflen
*/
- int read(uint8_t *buf, int buflen) override;
+ size_t read(uint8_t *buf, size_t buflen) override;
/**
* writes value to stream
diff --git a/libminifi/include/io/tls/TLSSocket.h b/libminifi/include/io/tls/TLSSocket.h
index 6bf741d..24df1d9 100644
--- a/libminifi/include/io/tls/TLSSocket.h
+++ b/libminifi/include/io/tls/TLSSocket.h
@@ -136,14 +136,14 @@ class TLSSocket : public Socket {
using Socket::read;
using Socket::write;
- int read(uint8_t *buf, int buflen, bool retrieve_all_bytes) override;
+ size_t read(uint8_t *buf, size_t buflen, bool retrieve_all_bytes) override;
/**
* Reads data and places it into buf
* @param buf buffer in which we extract data
* @param buflen
*/
- int read(uint8_t *buf, int buflen) override;
+ size_t read(uint8_t *buf, size_t buflen) override;
/**
* Write value to the stream using uint8_t ptr
diff --git a/libminifi/include/provenance/Provenance.h b/libminifi/include/provenance/Provenance.h
index 3b7b4f7..60ee8c2 100644
--- a/libminifi/include/provenance/Provenance.h
+++ b/libminifi/include/provenance/Provenance.h
@@ -363,25 +363,23 @@ class ProvenanceEventRecord : public core::SerializableComponent {
uint64_t getEventTime(const uint8_t *buffer, const size_t bufferSize) {
const auto size = std::min<size_t>(72, bufferSize);
- org::apache::nifi::minifi::io::BufferStream outStream(buffer, gsl::narrow<int>(size));
+ org::apache::nifi::minifi::io::BufferStream outStream(buffer, size);
std::string uuid;
- int ret = outStream.read(uuid);
-
- if (ret <= 0) {
+ const auto uuidret = outStream.read(uuid);
+ if (uuidret <= 0) {
return 0;
}
uint32_t eventType;
- ret = outStream.read(eventType);
- if (ret != 4) {
+ const auto typeret = outStream.read(eventType);
+ if (typeret != 4) {
return 0;
}
uint64_t event_time;
-
- ret = outStream.read(event_time);
- if (ret != 8) {
+ const auto timeret = outStream.read(event_time);
+ if (timeret != 8) {
return 0;
}
diff --git a/libminifi/include/sitetosite/Peer.h b/libminifi/include/sitetosite/Peer.h
index a82e3b4..d2138ea 100644
--- a/libminifi/include/sitetosite/Peer.h
+++ b/libminifi/include/sitetosite/Peer.h
@@ -299,7 +299,7 @@ class SiteToSitePeer : public org::apache::nifi::minifi::io::BaseStream {
return stream_->write(data, len);
}
- int read(uint8_t* data, int len) override {
+ size_t read(uint8_t* data, size_t len) override {
return stream_->read(data, len);
}
diff --git a/libminifi/include/sitetosite/SiteToSiteClient.h b/libminifi/include/sitetosite/SiteToSiteClient.h
index fc1dc91..7a41e81 100644
--- a/libminifi/include/sitetosite/SiteToSiteClient.h
+++ b/libminifi/include/sitetosite/SiteToSiteClient.h
@@ -19,6 +19,7 @@
#ifndef LIBMINIFI_INCLUDE_SITETOSITE_SITETOSITECLIENT_H_
#define LIBMINIFI_INCLUDE_SITETOSITE_SITETOSITECLIENT_H_
+#include <algorithm>
#include <map>
#include <memory>
#include <string>
@@ -30,6 +31,7 @@
#include "core/ProcessSession.h"
#include "core/ProcessContext.h"
#include "core/Connectable.h"
+#include "utils/gsl.h"
namespace org {
namespace apache {
@@ -65,11 +67,6 @@ class SiteToSiteClient : public core::Connectable {
_batchSendNanos(5000000000),
ssl_context_service_(nullptr),
logger_(logging::LoggerFactory<SiteToSiteClient>::getLogger()) {
- _supportedVersion[0] = 5;
- _supportedVersion[1] = 4;
- _supportedVersion[2] = 3;
- _supportedVersion[3] = 2;
- _supportedVersion[4] = 1;
_currentVersion = _supportedVersion[0];
_currentVersionIndex = 0;
_supportedCodecVersion[0] = 1;
@@ -77,7 +74,7 @@ class SiteToSiteClient : public core::Connectable {
_currentCodecVersionIndex = 0;
}
- virtual ~SiteToSiteClient() = default;
+ ~SiteToSiteClient() override = default;
void setSSLContextService(const std::shared_ptr<minifi::controllers::SSLContextService> &context_service) {
ssl_context_service_ = context_service;
@@ -189,13 +186,13 @@ class SiteToSiteClient : public core::Connectable {
return logger_;
}
- virtual void yield() {
+ void yield() override {
}
/**
* Determines if we are connected and operating
*/
- virtual bool isRunning() {
+ bool isRunning() override {
return running_;
}
@@ -203,7 +200,7 @@ class SiteToSiteClient : public core::Connectable {
* Determines if work is available by this connectable
* @return boolean if work is available.
*/
- virtual bool isWorkAvailable() {
+ bool isWorkAvailable() override {
return true;
}
@@ -234,12 +231,12 @@ class SiteToSiteClient : public core::Connectable {
virtual int writeResponse(const std::shared_ptr<Transaction> &transaction, RespondCode code, std::string message);
// getRespondCodeContext
virtual RespondCodeContext *getRespondCodeContext(RespondCode code) {
- for (unsigned int i = 0; i < sizeof(SiteToSiteRequest::respondCodeContext) / sizeof(RespondCodeContext); i++) {
- if (SiteToSiteRequest::respondCodeContext[i].code == code) {
- return &SiteToSiteRequest::respondCodeContext[i];
+ for (auto & i : SiteToSiteRequest::respondCodeContext) {
+ if (i.code == code) {
+ return &i;
}
}
- return NULL;
+ return nullptr;
}
// Peer State
@@ -254,7 +251,7 @@ class SiteToSiteClient : public core::Connectable {
// Peer Connection
std::unique_ptr<SiteToSitePeer> peer_;
- std::atomic<bool> running_;
+ std::atomic<bool> running_{false};
// transaction map
std::map<utils::Identifier, std::shared_ptr<Transaction>> known_transactions_;
@@ -265,10 +262,10 @@ class SiteToSiteClient : public core::Connectable {
/***
* versioning
*/
- uint32_t _supportedVersion[5];
+ uint32_t _supportedVersion[5] = {5, 4, 3, 2, 1};
uint32_t _currentVersion;
int _currentVersionIndex;
- uint32_t _supportedCodecVersion[1];
+ uint32_t _supportedCodecVersion[1] = {1};
uint32_t _currentCodecVersion;
int _currentCodecVersionIndex;
@@ -286,13 +283,13 @@ class WriteCallback : public OutputStreamCallback {
}
DataPacket *_packet;
// void process(std::ofstream *stream) {
- int64_t process(const std::shared_ptr<io::BaseStream>& stream) {
+ int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
uint8_t buffer[16384];
uint64_t len = _packet->_size;
uint64_t total = 0;
while (len > 0) {
- int size = len < 16384 ? static_cast<int>(len) : 16384;
- int ret = _packet->transaction_->getStream().read(buffer, size);
+ const auto size = std::min(len, size_t{16384});
+ const auto ret = _packet->transaction_->getStream().read(buffer, size);
if (ret != size) {
logging::LOG_ERROR(_packet->logger_reference_) << "Site2Site Receive Flow Size " << size << " Failed " << ret << ", should have received " << len;
return -1;
@@ -302,7 +299,7 @@ class WriteCallback : public OutputStreamCallback {
total += size;
}
logging::LOG_INFO(_packet->logger_reference_) << "Received " << total << " from stream";
- return len;
+ return gsl::narrow<int64_t>(len);
}
};
// Nest Callback Class for read stream
@@ -312,21 +309,15 @@ class ReadCallback : public InputStreamCallback {
: _packet(packet) {
}
DataPacket *_packet;
- int64_t process(const std::shared_ptr<io::BaseStream>& stream) {
+ int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
_packet->_size = 0;
uint8_t buffer[8192] = { 0 };
- int readSize;
size_t size = 0;
do {
- readSize = stream->read(buffer, 8192);
-
- if (readSize == 0) {
- break;
- }
- if (readSize < 0) {
- return -1;
- }
- int ret = _packet->transaction_->getStream().write(buffer, readSize);
+ const auto readSize = stream->read(buffer, 8192);
+ if (readSize == 0) break;
+ if (readSize == static_cast<size_t>(-1)) return -1;
+ const auto ret = _packet->transaction_->getStream().write(buffer, readSize);
if (ret != readSize) {
logging::LOG_INFO(_packet->logger_reference_) << "Site2Site Send Flow Size " << readSize << " Failed " << ret;
return -1;
@@ -334,7 +325,7 @@ class ReadCallback : public InputStreamCallback {
size += readSize;
} while (size < stream->size());
_packet->_size = size;
- return size;
+ return gsl::narrow<int64_t>(size);
}
};
diff --git a/libminifi/include/utils/ByteArrayCallback.h b/libminifi/include/utils/ByteArrayCallback.h
index 84e3362..ace72b6 100644
--- a/libminifi/include/utils/ByteArrayCallback.h
+++ b/libminifi/include/utils/ByteArrayCallback.h
@@ -24,6 +24,7 @@
#include "concurrentqueue.h"
#include "FlowFileRecord.h"
#include "core/logging/LoggerConfiguration.h"
+#include "utils/gsl.h"
namespace org {
namespace apache {
@@ -45,10 +46,10 @@ class ByteInputCallBack : public InputStreamCallback {
if (stream->size() > 0) {
vec.resize(stream->size());
- stream->read(reinterpret_cast<uint8_t*>(vec.data()), gsl::narrow<int>(stream->size()));
+ stream->read(reinterpret_cast<uint8_t*>(vec.data()), stream->size());
}
- return vec.size();
+ return gsl::narrow<int64_t>(vec.size());
}
virtual void seek(size_t) { }
@@ -101,7 +102,7 @@ class ByteOutputCallback : public OutputStreamCallback {
virtual int64_t process(const std::shared_ptr<io::BaseStream>& stream);
- virtual const std::vector<char> to_string();
+ virtual std::vector<char> to_string();
virtual void close();
diff --git a/libminifi/include/utils/Enum.h b/libminifi/include/utils/Enum.h
index 440b158..00facc4 100644
--- a/libminifi/include/utils/Enum.h
+++ b/libminifi/include/utils/Enum.h
@@ -72,8 +72,8 @@ namespace utils {
#define SMART_ENUM_BODY(Clazz, ...) \
constexpr Clazz(Type value = static_cast<Type>(-1)) : value_{value} {} \
- Clazz(const std::string& str) : value_{parse(str.c_str()).value_} {} \
- Clazz(const char* str) : value_{parse(str).value_} {} \
+ explicit Clazz(const std::string& str) : value_{parse(str.c_str()).value_} {} \
+ explicit Clazz(const char* str) : value_{parse(str).value_} {} \
private: \
Type value_; \
public: \
diff --git a/libminifi/include/utils/FileOutputCallback.h b/libminifi/include/utils/FileOutputCallback.h
index 1343fae..ee68db5 100644
--- a/libminifi/include/utils/FileOutputCallback.h
+++ b/libminifi/include/utils/FileOutputCallback.h
@@ -51,7 +51,7 @@ class FileOutputCallback : public ByteOutputCallback {
int64_t process(const std::shared_ptr<io::BaseStream>& stream) override;
- const std::vector<char> to_string() override;
+ std::vector<char> to_string() override;
void close() override;
diff --git a/libminifi/src/FlowFileRecord.cpp b/libminifi/src/FlowFileRecord.cpp
index 813137b..047c571 100644
--- a/libminifi/src/FlowFileRecord.cpp
+++ b/libminifi/src/FlowFileRecord.cpp
@@ -161,11 +161,9 @@ bool FlowFileRecord::Persist(const std::shared_ptr<core::Repository>& flowReposi
}
std::shared_ptr<FlowFileRecord> FlowFileRecord::DeSerialize(io::InputStream& inStream, const std::shared_ptr<core::ContentRepository>& content_repo, utils::Identifier& container) {
- int ret;
-
auto file = std::make_shared<FlowFileRecord>();
- ret = inStream.read(file->event_time_);
+ auto ret = inStream.read(file->event_time_);
if (ret != 8) {
return {};
}
@@ -181,12 +179,12 @@ std::shared_ptr<FlowFileRecord> FlowFileRecord::DeSerialize(io::InputStream& inS
}
ret = inStream.read(file->uuid_);
- if (ret <= 0) {
+ if (ret == static_cast<size_t>(-1)) {
return {};
}
ret = inStream.read(container);
- if (ret <= 0) {
+ if (ret == static_cast<size_t>(-1)) {
return {};
}
@@ -200,12 +198,12 @@ std::shared_ptr<FlowFileRecord> FlowFileRecord::DeSerialize(io::InputStream& inS
for (uint32_t i = 0; i < numAttributes; i++) {
std::string key;
ret = inStream.read(key, true);
- if (ret <= 0) {
+ if (ret == static_cast<size_t>(-1)) {
return {};
}
std::string value;
ret = inStream.read(value, true);
- if (ret <= 0) {
+ if (ret == static_cast<size_t>(-1)) {
return {};
}
file->attributes_[key] = value;
@@ -213,7 +211,7 @@ std::shared_ptr<FlowFileRecord> FlowFileRecord::DeSerialize(io::InputStream& inS
std::string content_full_path;
ret = inStream.read(content_full_path);
- if (ret <= 0) {
+ if (ret == static_cast<size_t>(-1)) {
return {};
}
diff --git a/libminifi/src/c2/ControllerSocketProtocol.cpp b/libminifi/src/c2/ControllerSocketProtocol.cpp
index 8322302..73b74ff 100644
--- a/libminifi/src/c2/ControllerSocketProtocol.cpp
+++ b/libminifi/src/c2/ControllerSocketProtocol.cpp
@@ -101,8 +101,8 @@ void ControllerSocketProtocol::initialize(core::controller::ControllerServicePro
case Operation::START:
{
std::string componentStr;
- int size = stream->read(componentStr);
- if ( size != -1 ) {
+ const auto size = stream->read(componentStr);
+ if ( size != static_cast<size_t>(-1) ) {
auto components = update_sink_->getComponents(componentStr);
for (const auto& component : components) {
component->start();
@@ -115,8 +115,8 @@ void ControllerSocketProtocol::initialize(core::controller::ControllerServicePro
case Operation::STOP:
{
std::string componentStr;
- int size = stream->read(componentStr);
- if ( size != -1 ) {
+ const auto size = stream->read(componentStr);
+ if ( size != static_cast<size_t>(-1) ) {
auto components = update_sink_->getComponents(componentStr);
for (const auto& component : components) {
component->stop();
@@ -129,8 +129,8 @@ void ControllerSocketProtocol::initialize(core::controller::ControllerServicePro
case Operation::CLEAR:
{
std::string connection;
- int size = stream->read(connection);
- if ( size != -1 ) {
+ const auto size = stream->read(connection);
+ if ( size != static_cast<size_t>(-1) ) {
update_sink_->clearConnection(connection);
}
}
@@ -138,18 +138,18 @@ void ControllerSocketProtocol::initialize(core::controller::ControllerServicePro
case Operation::UPDATE:
{
std::string what;
- int size = stream->read(what);
- if (size == -1) {
+ const auto size = stream->read(what);
+ if (size == static_cast<size_t>(-1)) {
logger_->log_debug("Connection broke");
break;
}
if (what == "flow") {
std::string ff_loc;
- int size = stream->read(ff_loc);
+ const auto size = stream->read(ff_loc);
std::ifstream tf(ff_loc);
std::string configuration((std::istreambuf_iterator<char>(tf)),
std::istreambuf_iterator<char>());
- if (size == -1) {
+ if (size == static_cast<size_t>(-1)) {
logger_->log_debug("Connection broke");
break;
}
@@ -160,15 +160,15 @@ void ControllerSocketProtocol::initialize(core::controller::ControllerServicePro
case Operation::DESCRIBE:
{
std::string what;
- int size = stream->read(what);
- if (size == -1) {
+ const auto size = stream->read(what);
+ if (size == static_cast<size_t>(-1)) {
logger_->log_debug("Connection broke");
break;
}
if (what == "queue") {
std::string connection;
- int size = stream->read(connection);
- if (size == -1) {
+ const auto size_ = stream->read(connection);
+ if (size_ == static_cast<size_t>(-1)) {
logger_->log_debug("Connection broke");
break;
}
@@ -184,8 +184,8 @@ void ControllerSocketProtocol::initialize(core::controller::ControllerServicePro
} else if (what == "components") {
io::BufferStream resp;
resp.write(&head, 1);
- uint16_t size = gsl::narrow<uint16_t>(update_sink_->getAllComponents().size());
- resp.write(size);
+ const auto size_ = gsl::narrow<uint16_t>(update_sink_->getAllComponents().size());
+ resp.write(size_);
for (const auto &component : update_sink_->getAllComponents()) {
resp.write(component->getComponentName());
resp.write(component->isRunning() ? "true" : "false");
@@ -210,8 +210,8 @@ void ControllerSocketProtocol::initialize(core::controller::ControllerServicePro
} else if (what == "connections") {
io::BufferStream resp;
resp.write(&head, 1);
- uint16_t size = gsl::narrow<uint16_t>(queue_full_.size());
- resp.write(size);
+ const auto size_ = gsl::narrow<uint16_t>(queue_full_.size());
+ resp.write(size_);
for (const auto &connection : queue_full_) {
resp.write(connection.first, false);
}
@@ -220,17 +220,17 @@ void ControllerSocketProtocol::initialize(core::controller::ControllerServicePro
std::vector<std::string> full_connections;
{
std::lock_guard<std::mutex> lock(controller_mutex_);
- for (auto conn : queue_full_) {
- if (conn.second == true) {
+ for (const auto& conn : queue_full_) {
+ if (conn.second) {
full_connections.push_back(conn.first);
}
}
}
io::BufferStream resp;
resp.write(&head, 1);
- uint16_t full_connection_count = gsl::narrow<uint16_t>(full_connections.size());
+ const auto full_connection_count = gsl::narrow<uint16_t>(full_connections.size());
resp.write(full_connection_count);
- for (auto conn : full_connections) {
+ for (const auto& conn : full_connections) {
resp.write(conn);
}
stream->write(const_cast<uint8_t*>(resp.getBuffer()), gsl::narrow<int>(resp.size()));
@@ -248,11 +248,11 @@ void ControllerSocketProtocol::initialize(core::controller::ControllerServicePro
void ControllerSocketProtocol::parse_content(const std::vector<C2ContentResponse> &content) {
for (const auto &payload_content : content) {
if (payload_content.name == "Components") {
- for (auto content : payload_content.operation_arguments) {
+ for (const auto& content_ : payload_content.operation_arguments) {
bool is_enabled = false;
- minifi::utils::StringUtils::StringToBool(content.second.to_string(), is_enabled);
+ minifi::utils::StringUtils::StringToBool(content_.second.to_string(), is_enabled);
std::lock_guard<std::mutex> lock(controller_mutex_);
- component_map_[content.first] = is_enabled;
+ component_map_[content_.first] = is_enabled;
}
}
}
@@ -261,21 +261,20 @@ void ControllerSocketProtocol::parse_content(const std::vector<C2ContentResponse
int16_t ControllerSocketProtocol::heartbeat(const C2Payload &payload) {
if (server_socket_ == nullptr)
return 0;
- const std::vector<C2ContentResponse> &content = payload.getContent();
for (const auto &pc : payload.getNestedPayloads()) {
if (pc.getLabel() == "flowInfo" || pc.getLabel() == "metrics") {
for (const auto &metrics_payload : pc.getNestedPayloads()) {
if (metrics_payload.getLabel() == "QueueMetrics" || metrics_payload.getLabel() == "queues") {
for (const auto &queue_metrics : metrics_payload.getNestedPayloads()) {
- auto metric_content = queue_metrics.getContent();
- for (const auto &payload_content : queue_metrics.getContent()) {
+ const auto& metric_content = queue_metrics.getContent();
+ for (const auto &payload_content : metric_content) {
uint64_t size = 0;
uint64_t max = 0;
- for (auto content : payload_content.operation_arguments) {
+ for (const auto& content : payload_content.operation_arguments) {
if (content.first == "datasize") {
- size = std::stol(content.second.to_string());
+ size = std::stoull(content.second.to_string());
} else if (content.first == "datasizemax") {
- max = std::stol(content.second.to_string());
+ max = std::stoull(content.second.to_string());
}
}
std::lock_guard<std::mutex> lock(controller_mutex_);
@@ -293,7 +292,7 @@ int16_t ControllerSocketProtocol::heartbeat(const C2Payload &payload) {
}
}
- parse_content(content);
+ parse_content(payload.getContent());
std::vector<uint8_t> buffer;
buffer.resize(1024);
diff --git a/libminifi/src/core/ProcessSession.cpp b/libminifi/src/core/ProcessSession.cpp
index f8e8c31..6a4077e 100644
--- a/libminifi/src/core/ProcessSession.cpp
+++ b/libminifi/src/core/ProcessSession.cpp
@@ -325,8 +325,8 @@ void ProcessSession::importFrom(io::InputStream&& stream, const std::shared_ptr<
*
*/
void ProcessSession::importFrom(io::InputStream &stream, const std::shared_ptr<core::FlowFile> &flow) {
- std::shared_ptr<ResourceClaim> claim = content_session_->create();
- int max_read = getpagesize();
+ const std::shared_ptr<ResourceClaim> claim = content_session_->create();
+ const auto max_read = gsl::narrow_cast<size_t>(getpagesize());
std::vector<uint8_t> charBuffer(max_read);
try {
@@ -336,10 +336,10 @@ void ProcessSession::importFrom(io::InputStream &stream, const std::shared_ptr<c
if (nullptr == content_stream) {
throw Exception(FILE_OPERATION_EXCEPTION, "Could not obtain claim for " + claim->getContentFullPath());
}
- int position = 0;
- const int max_size = gsl::narrow<int>(stream.size());
+ size_t position = 0;
+ const auto max_size = stream.size();
while (position < max_size) {
- const int read_size = std::min(max_read, max_size - position);
+ const auto read_size = std::min(max_read, max_size - position);
stream.read(charBuffer, read_size);
content_stream->write(charBuffer.data(), read_size);
diff --git a/libminifi/src/core/ProcessSessionReadCallback.cpp b/libminifi/src/core/ProcessSessionReadCallback.cpp
index 8a21f73..d62fac8 100644
--- a/libminifi/src/core/ProcessSessionReadCallback.cpp
+++ b/libminifi/src/core/ProcessSessionReadCallback.cpp
@@ -24,6 +24,7 @@
#include "core/logging/LoggerConfiguration.h"
#include "io/BaseStream.h"
+#include "utils/gsl.h"
namespace org {
namespace apache {
@@ -47,20 +48,16 @@ int64_t ProcessSessionReadCallback::process(const std::shared_ptr<io::BaseStream
size_t size = 0;
uint8_t buffer[8192];
do {
- int read = stream->read(buffer, 8192);
- if (read < 0) {
- return -1;
- }
- if (read == 0) {
- break;
- }
+ const auto read = stream->read(buffer, 8192);
+ if (read == static_cast<size_t>(-1)) return -1;
+ if (read == 0) break;
if (!_tmpFileOs.write(reinterpret_cast<char*>(buffer), read)) {
return -1;
}
size += read;
} while (size < stream->size());
_writeSucceeded = true;
- return size;
+ return gsl::narrow<int64_t>(size);
}
// Renames tmp file to final destination
diff --git a/libminifi/src/io/BufferStream.cpp b/libminifi/src/io/BufferStream.cpp
index 6d3ebab..12220ef 100644
--- a/libminifi/src/io/BufferStream.cpp
+++ b/libminifi/src/io/BufferStream.cpp
@@ -36,17 +36,16 @@ int BufferStream::write(const uint8_t *value, int size) {
return size;
}
-int BufferStream::read(uint8_t *buf, int len) {
- gsl_Expects(len >= 0);
- int bytes_available_in_buffer = gsl::narrow<int>(buffer_.size() - readOffset_);
- len = std::min(len, bytes_available_in_buffer);
- auto begin = buffer_.begin() + readOffset_;
- std::copy(begin, begin + len, buf);
+size_t BufferStream::read(uint8_t *buf, size_t len) {
+ const auto bytes_available_in_buffer = buffer_.size() - readOffset_;
+ const auto readlen = std::min(len, bytes_available_in_buffer);
+ auto begin = buffer_.begin() + gsl::narrow<decltype(buffer_)::difference_type>(readOffset_);
+ std::copy(begin, begin + gsl::narrow<decltype(buffer_)::difference_type>(readlen), buf);
// increase offset for the next read
- readOffset_ += len;
+ readOffset_ += readlen;
- return len;
+ return readlen;
}
} /* namespace io */
diff --git a/libminifi/src/io/ClientSocket.cpp b/libminifi/src/io/ClientSocket.cpp
index 6427447..cc65a8b 100644
--- a/libminifi/src/io/ClientSocket.cpp
+++ b/libminifi/src/io/ClientSocket.cpp
@@ -47,6 +47,8 @@
#include "core/logging/LoggerConfiguration.h"
#include "utils/file/FileUtils.h"
#include "utils/GeneralUtils.h"
+#include "utils/gsl.h"
+
namespace util = org::apache::nifi::minifi::utils;
namespace mio = org::apache::nifi::minifi::io;
@@ -523,9 +525,8 @@ int Socket::write(const uint8_t *value, int size) {
return bytes;
}
-int Socket::read(uint8_t *buf, int buflen, bool retrieve_all_bytes) {
- gsl_Expects(buflen >= 0);
- int32_t total_read = 0;
+size_t Socket::read(uint8_t *buf, size_t buflen, bool retrieve_all_bytes) {
+ size_t total_read = 0;
while (buflen) {
int16_t fd = select_descriptor(1000);
if (fd < 0) {
@@ -533,9 +534,9 @@ int Socket::read(uint8_t *buf, int buflen, bool retrieve_all_bytes) {
logger_->log_debug("fd %d close %i", fd, buflen);
utils::file::FileUtils::close(socket_file_descriptor_);
}
- return -1;
+ return static_cast<size_t>(-1);
}
- int bytes_read = recv(fd, reinterpret_cast<char*>(buf), buflen, 0);
+ const auto bytes_read = recv(fd, reinterpret_cast<char*>(buf), buflen, 0);
logger_->log_trace("Recv call %d", bytes_read);
if (bytes_read <= 0) {
if (bytes_read == 0) {
@@ -545,23 +546,23 @@ int Socket::read(uint8_t *buf, int buflen, bool retrieve_all_bytes) {
int err = WSAGetLastError();
if (err == WSAEWOULDBLOCK) {
// continue
- return -2;
+ return static_cast<size_t>(-2);
}
logger_->log_error("Could not recv on %d (port %d), error code: %d", fd, port_, err);
#else
if (errno == EAGAIN || errno == EWOULDBLOCK) {
// continue
- return -2;
+ return static_cast<size_t>(-2);
}
logger_->log_error("Could not recv on %d (port %d), error: %s", fd, port_, strerror(errno));
#endif // WIN32
}
- return -1;
+ return static_cast<size_t>(-1);
}
- buflen -= bytes_read;
+ buflen -= gsl::narrow<size_t>(bytes_read);
buf += bytes_read;
- total_read += bytes_read;
+ total_read += gsl::narrow<size_t>(bytes_read);
if (!retrieve_all_bytes) {
break;
}
diff --git a/libminifi/src/io/DescriptorStream.cpp b/libminifi/src/io/DescriptorStream.cpp
index 2b5fc70..6330b5e 100644
--- a/libminifi/src/io/DescriptorStream.cpp
+++ b/libminifi/src/io/DescriptorStream.cpp
@@ -40,7 +40,7 @@ DescriptorStream::DescriptorStream(int fd)
logger_(logging::LoggerFactory<DescriptorStream>::getLogger()) {
}
-void DescriptorStream::seek(uint64_t offset) {
+void DescriptorStream::seek(size_t offset) {
std::lock_guard<std::recursive_mutex> lock(file_lock_);
#ifdef WIN32
_lseeki64(fd_, gsl::narrow<int64_t>(offset), 0x00);
@@ -70,25 +70,24 @@ int DescriptorStream::write(const uint8_t *value, int size) {
}
}
-int DescriptorStream::read(uint8_t *buf, int buflen) {
- gsl_Expects(buflen >= 0);
+size_t DescriptorStream::read(uint8_t *buf, size_t buflen) {
if (buflen == 0) {
return 0;
}
if (!IsNullOrEmpty(buf)) {
#ifdef WIN32
- auto size_read = _read(fd_, buf, buflen);
+ const auto size_read = _read(fd_, buf, buflen);
#else
- auto size_read = ::read(fd_, buf, buflen);
+ const auto size_read = ::read(fd_, buf, buflen);
#endif
if (size_read < 0) {
- return -1;
+ return static_cast<size_t>(-1);
}
- return size_read;
+ return gsl::narrow<size_t>(size_read);
} else {
- return -1;
+ return static_cast<size_t>(-1);
}
}
diff --git a/libminifi/src/io/FileStream.cpp b/libminifi/src/io/FileStream.cpp
index 4f759ae..c88f8dd 100644
--- a/libminifi/src/io/FileStream.cpp
+++ b/libminifi/src/io/FileStream.cpp
@@ -97,13 +97,13 @@ void FileStream::close() {
file_stream_.reset();
}
-void FileStream::seek(uint64_t offset) {
+void FileStream::seek(size_t offset) {
std::lock_guard<std::mutex> lock(file_lock_);
if (file_stream_ == nullptr || !file_stream_->is_open()) {
logging::LOG_ERROR(logger_) << SEEK_ERROR_MSG << INVALID_FILE_STREAM_ERROR_MSG;
return;
}
- offset_ = gsl::narrow<size_t>(offset);
+ offset_ = offset;
file_stream_->clear();
if (!file_stream_->seekg(offset_))
logging::LOG_ERROR(logger_) << SEEK_ERROR_MSG << SEEKG_CALL_ERROR_MSG;
@@ -142,8 +142,7 @@ int FileStream::write(const uint8_t *value, int size) {
}
}
-int FileStream::read(uint8_t *buf, int buflen) {
- gsl_Expects(buflen >= 0);
+size_t FileStream::read(uint8_t *buf, size_t buflen) {
if (buflen == 0) {
return 0;
}
@@ -151,32 +150,31 @@ int FileStream::read(uint8_t *buf, int buflen) {
std::lock_guard<std::mutex> lock(file_lock_);
if (file_stream_ == nullptr || !file_stream_->is_open()) {
logging::LOG_ERROR(logger_) << READ_ERROR_MSG << INVALID_FILE_STREAM_ERROR_MSG;
- return -1;
+ return static_cast<size_t>(-1);
}
- file_stream_->read(reinterpret_cast<char*>(buf), buflen);
+ file_stream_->read(reinterpret_cast<char*>(buf), gsl::narrow<std::streamsize>(buflen));
if (file_stream_->eof() || file_stream_->fail()) {
file_stream_->clear();
seekToEndOfFile(READ_ERROR_MSG);
auto tellg_result = file_stream_->tellg();
if (tellg_result == std::streampos(-1)) {
logging::LOG_ERROR(logger_) << READ_ERROR_MSG << TELLG_CALL_ERROR_MSG;
- return -1;
+ return static_cast<size_t>(-1);
}
- size_t len = gsl::narrow<size_t>(tellg_result);
+ const auto len = gsl::narrow<size_t>(tellg_result);
size_t ret = len - offset_;
offset_ = len;
length_ = len;
logging::LOG_DEBUG(logger_) << path_ << " eof bit, ended at " << offset_;
- return gsl::narrow<int>(ret);
+ return ret;
} else {
offset_ += buflen;
- file_stream_->seekp(offset_);
+ file_stream_->seekp(gsl::narrow<std::streamoff>(offset_));
return buflen;
}
-
} else {
logging::LOG_ERROR(logger_) << READ_ERROR_MSG << INVALID_BUFFER_ERROR_MSG;
- return -1;
+ return static_cast<size_t>(-1);
}
}
diff --git a/libminifi/src/io/InputStream.cpp b/libminifi/src/io/InputStream.cpp
index cdd101e..cd4166b 100644
--- a/libminifi/src/io/InputStream.cpp
+++ b/libminifi/src/io/InputStream.cpp
@@ -30,42 +30,43 @@ namespace nifi {
namespace minifi {
namespace io {
-int InputStream::read(std::vector<uint8_t>& buffer, int len) {
- if (buffer.size() < gsl::narrow<size_t>(len)) {
+size_t InputStream::read(std::vector<uint8_t>& buffer, size_t len) {
+ if (buffer.size() < len) {
buffer.resize(len);
}
- int ret = read(buffer.data(), len);
- buffer.resize((std::max)(ret, 0));
+ const auto ret = read(buffer.data(), len);
+ if (ret == static_cast<size_t>(-1)) return ret;
+ buffer.resize((std::max)(ret, size_t{0}));
return ret;
}
-int InputStream::read(bool &value) {
+size_t InputStream::read(bool &value) {
uint8_t buf = 0;
if (read(&buf, 1) != 1) {
- return -1;
+ return static_cast<size_t>(-1);
}
value = buf;
return 1;
}
-int InputStream::read(utils::Identifier &value) {
+size_t InputStream::read(utils::Identifier &value) {
std::string uuidStr;
- int ret = read(uuidStr);
+ const auto ret = read(uuidStr);
if (ret < 0) {
return ret;
}
auto optional_uuid = utils::Identifier::parse(uuidStr);
if (!optional_uuid) {
- return -1;
+ return static_cast<size_t>(-1);
}
value = optional_uuid.value();
return ret;
}
-int InputStream::read(std::string &str, bool widen) {
+size_t InputStream::read(std::string &str, bool widen) {
uint32_t len = 0;
- int ret = 0;
+ size_t ret = 0;
if (!widen) {
uint16_t shortLength = 0;
ret = read(shortLength);
@@ -84,9 +85,9 @@ int InputStream::read(std::string &str, bool widen) {
}
std::vector<uint8_t> buffer(len);
- uint32_t bytes_read = gsl::narrow<uint32_t>(read(buffer.data(), len));
+ const auto bytes_read = gsl::narrow<uint32_t>(read(buffer.data(), len));
if (bytes_read != len) {
- return -1;
+ return static_cast<size_t>(-1);
}
str = std::string(reinterpret_cast<const char*>(buffer.data()), len);
diff --git a/libminifi/src/io/tls/SecureDescriptorStream.cpp b/libminifi/src/io/tls/SecureDescriptorStream.cpp
index c5b0b3f..66d23b5 100644
--- a/libminifi/src/io/tls/SecureDescriptorStream.cpp
+++ b/libminifi/src/io/tls/SecureDescriptorStream.cpp
@@ -34,7 +34,7 @@ SecureDescriptorStream::SecureDescriptorStream(int fd, SSL *ssl)
logger_(logging::LoggerFactory<SecureDescriptorStream>::getLogger()) {
}
-void SecureDescriptorStream::seek(uint64_t offset) {
+void SecureDescriptorStream::seek(size_t offset) {
std::lock_guard<std::recursive_mutex> lock(file_lock_);
#ifdef WIN32
_lseeki64(fd_, gsl::narrow<int64_t>(offset), 0x00);
@@ -71,34 +71,29 @@ int SecureDescriptorStream::write(const uint8_t *value, int size) {
}
}
-int SecureDescriptorStream::read(uint8_t *buf, int buflen) {
- gsl_Expects(buflen >= 0);
+size_t SecureDescriptorStream::read(uint8_t * const buf, const size_t buflen) {
if (buflen == 0) {
return 0;
}
- if (!IsNullOrEmpty(buf)) {
- int total_read = 0;
- int status = 0;
- while (buflen) {
- int sslStatus;
- do {
- status = SSL_read(ssl_, buf, buflen);
- sslStatus = SSL_get_error(ssl_, status);
- } while (status < 0 && sslStatus == SSL_ERROR_WANT_READ);
+ if (IsNullOrEmpty(buf)) return static_cast<size_t>(-1);
+ size_t total_read = 0;
+ uint8_t* writepos = buf;
+ while (buflen > total_read) {
+ int status;
+ int sslStatus;
+ do {
+ status = SSL_read(ssl_, writepos, gsl::narrow<int>(std::min(buflen - total_read, gsl::narrow<size_t>(std::numeric_limits<int>::max()))));
+ sslStatus = SSL_get_error(ssl_, status);
+ } while (status < 0 && sslStatus == SSL_ERROR_WANT_READ);
- if (status < 0)
- break;
+ if (status < 0)
+ break;
- buflen -= status;
- buf += status;
- total_read += status;
- }
-
- return total_read;
-
- } else {
- return -1;
+ writepos += status;
+ total_read += gsl::narrow<size_t>(status);
}
+
+ return total_read;
}
} /* namespace io */
diff --git a/libminifi/src/io/tls/TLSSocket.cpp b/libminifi/src/io/tls/TLSSocket.cpp
index 776fa44..49d07ec 100644
--- a/libminifi/src/io/tls/TLSSocket.cpp
+++ b/libminifi/src/io/tls/TLSSocket.cpp
@@ -351,36 +351,37 @@ int16_t TLSSocket::select_descriptor(const uint16_t msec) {
return -1;
}
-int TLSSocket::read(uint8_t *buf, int buflen, bool /*retrieve_all_bytes*/) {
- gsl_Expects(buflen >= 0);
- int total_read = 0;
+size_t TLSSocket::read(uint8_t *buf, size_t buflen, bool /*retrieve_all_bytes*/) {
+ size_t total_read = 0;
int status = 0;
int loc = 0;
int16_t fd = select_descriptor(1000);
if (fd < 0) {
close();
- return -1;
+ return static_cast<size_t>(-1);
}
auto fd_ssl = get_ssl(fd);
if (IsNullOrEmpty(fd_ssl)) {
- return -1;
+ return static_cast<size_t>(-1);
}
if (!SSL_pending(fd_ssl)) {
return 0;
}
while (buflen) {
if (fd <= 0) {
- return -1;
+ return static_cast<size_t>(-1);
}
int sslStatus;
do {
- status = SSL_read(fd_ssl, buf + loc, buflen);
+ status = SSL_read(fd_ssl, buf + loc, gsl::narrow<int>(std::min(buflen, gsl::narrow<size_t>(std::numeric_limits<int>::max()))));
sslStatus = SSL_get_error(fd_ssl, status);
} while (status < 0 && sslStatus == SSL_ERROR_WANT_READ && SSL_pending(fd_ssl));
- buflen -= status;
+ if (status < 0) break;
+
+ buflen -= gsl::narrow<size_t>(status);
loc += status;
- total_read += status;
+ total_read += gsl::narrow<size_t>(status);
}
return total_read;
@@ -418,33 +419,32 @@ int TLSSocket::write(const uint8_t *value, int size) {
return writeData(value, size, fd);
}
-int TLSSocket::read(uint8_t *buf, int buflen) {
- gsl_Expects(buflen >= 0);
- int total_read = 0;
+size_t TLSSocket::read(uint8_t *buf, size_t buflen) {
+ size_t total_read = 0;
int status = 0;
while (buflen) {
- int16_t fd = select_descriptor(1000);
+ const int16_t fd = select_descriptor(1000);
if (fd < 0) {
close();
- return -1;
+ return static_cast<size_t>(-1);
}
int sslStatus;
do {
auto fd_ssl = get_ssl(fd);
if (IsNullOrEmpty(fd_ssl)) {
- return -1;
+ return static_cast<size_t>(-1);
}
- status = SSL_read(fd_ssl, buf, buflen);
+ status = SSL_read(fd_ssl, buf, gsl::narrow<int>(std::min(buflen, gsl::narrow<size_t>(std::numeric_limits<int>::max()))));
sslStatus = SSL_get_error(fd_ssl, status);
} while (status < 0 && sslStatus == SSL_ERROR_WANT_READ);
if (status < 0)
break;
- buflen -= status;
+ buflen -= gsl::narrow<size_t>(status);
buf += status;
- total_read += status;
+ total_read += gsl::narrow<size_t>(status);
}
return total_read;
diff --git a/libminifi/src/provenance/Provenance.cpp b/libminifi/src/provenance/Provenance.cpp
index f9af7f9..2fac08b 100644
--- a/libminifi/src/provenance/Provenance.cpp
+++ b/libminifi/src/provenance/Provenance.cpp
@@ -235,12 +235,10 @@ bool ProvenanceEventRecord::Serialize(const std::shared_ptr<core::SerializableCo
}
bool ProvenanceEventRecord::DeSerialize(const uint8_t *buffer, const size_t bufferSize) {
- int ret;
-
org::apache::nifi::minifi::io::BufferStream outStream(buffer, gsl::narrow<unsigned int>(bufferSize));
- ret = outStream.read(uuid_);
- if (ret <= 0) {
+ auto ret = outStream.read(uuid_);
+ if (ret == static_cast<size_t>(-1)) {
return false;
}
@@ -272,22 +270,22 @@ bool ProvenanceEventRecord::DeSerialize(const uint8_t *buffer, const size_t buff
}
ret = outStream.read(this->_componentId);
- if (ret <= 0) {
+ if (ret == static_cast<size_t>(-1)) {
return false;
}
ret = outStream.read(this->_componentType);
- if (ret <= 0) {
+ if (ret == static_cast<size_t>(-1)) {
return false;
}
ret = outStream.read(this->flow_uuid_);
- if (ret <= 0) {
+ if (ret == static_cast<size_t>(-1)) {
return false;
}
ret = outStream.read(this->_details);
- if (ret <= 0) {
+ if (ret == static_cast<size_t>(-1)) {
return false;
}
@@ -301,19 +299,19 @@ bool ProvenanceEventRecord::DeSerialize(const uint8_t *buffer, const size_t buff
for (uint32_t i = 0; i < numAttributes; i++) {
std::string key;
ret = outStream.read(key);
- if (ret <= 0) {
+ if (ret == static_cast<size_t>(-1)) {
return false;
}
std::string value;
ret = outStream.read(value);
- if (ret <= 0) {
+ if (ret == static_cast<size_t>(-1)) {
return false;
}
this->_attributes[key] = value;
}
ret = outStream.read(this->_contentFullPath);
- if (ret <= 0) {
+ if (ret == static_cast<size_t>(-1)) {
return false;
}
@@ -328,7 +326,7 @@ bool ProvenanceEventRecord::DeSerialize(const uint8_t *buffer, const size_t buff
}
ret = outStream.read(this->_sourceQueueIdentifier);
- if (ret <= 0) {
+ if (ret == static_cast<size_t>(-1)) {
return false;
}
@@ -343,7 +341,7 @@ bool ProvenanceEventRecord::DeSerialize(const uint8_t *buffer, const size_t buff
for (uint32_t i = 0; i < number; i++) {
utils::Identifier parentUUID;
ret = outStream.read(parentUUID);
- if (ret <= 0) {
+ if (ret == static_cast<size_t>(-1)) {
return false;
}
this->addParentUuid(parentUUID);
@@ -356,23 +354,23 @@ bool ProvenanceEventRecord::DeSerialize(const uint8_t *buffer, const size_t buff
for (uint32_t i = 0; i < number; i++) {
utils::Identifier childUUID;
ret = outStream.read(childUUID);
- if (ret <= 0) {
+ if (ret == static_cast<size_t>(-1)) {
return false;
}
this->addChildUuid(childUUID);
}
} else if (this->_eventType == ProvenanceEventRecord::SEND || this->_eventType == ProvenanceEventRecord::FETCH) {
ret = outStream.read(this->_transitUri);
- if (ret <= 0) {
+ if (ret == static_cast<size_t>(-1)) {
return false;
}
} else if (this->_eventType == ProvenanceEventRecord::RECEIVE) {
ret = outStream.read(this->_transitUri);
- if (ret <= 0) {
+ if (ret == static_cast<size_t>(-1)) {
return false;
}
ret = outStream.read(this->_sourceSystemFlowFileIdentifier);
- if (ret <= 0) {
+ if (ret == static_cast<size_t>(-1)) {
return false;
}
}
diff --git a/libminifi/src/sitetosite/RawSocketProtocol.cpp b/libminifi/src/sitetosite/RawSocketProtocol.cpp
index 7ea213f..0eb1f7e 100644
--- a/libminifi/src/sitetosite/RawSocketProtocol.cpp
+++ b/libminifi/src/sitetosite/RawSocketProtocol.cpp
@@ -342,40 +342,40 @@ bool RawSiteToSiteClient::getPeerList(std::vector<PeerStatus> &peers) {
}
uint32_t number;
- status = peer_->read(number);
+ auto ret = peer_->read(number);
- if (status <= 0) {
+ if (ret == static_cast<size_t>(-1)) {
tearDown();
return false;
}
for (uint32_t i = 0; i < number; i++) {
std::string host;
- status = peer_->read(host);
- if (status <= 0) {
+ ret = peer_->read(host);
+ if (ret == static_cast<size_t>(-1)) {
tearDown();
return false;
}
uint32_t port;
- status = peer_->read(port);
- if (status <= 0) {
+ ret = peer_->read(port);
+ if (ret == static_cast<size_t>(-1)) {
tearDown();
return false;
}
uint8_t secure;
- status = peer_->read(secure);
- if (status <= 0) {
+ ret = peer_->read(secure);
+ if (ret == static_cast<size_t>(-1)) {
tearDown();
return false;
}
uint32_t count;
- status = peer_->read(count);
- if (status <= 0) {
+ ret = peer_->read(count);
+ if (ret == static_cast<size_t>(-1)) {
tearDown();
return false;
}
- PeerStatus status(std::make_shared<Peer>(port_id_, host, gsl::narrow<uint16_t>(port), secure != 0), count, true);
- peers.push_back(std::move(status));
+ PeerStatus peer_status(std::make_shared<Peer>(port_id_, host, gsl::narrow<uint16_t>(port), secure != 0), count, true);
+ peers.push_back(std::move(peer_status));
logging::LOG_TRACE(logger_) << "Site2Site Peer host " << host << " port " << port << " Secure " << std::to_string(secure);
}
@@ -397,15 +397,14 @@ bool RawSiteToSiteClient::getPeerList(std::vector<PeerStatus> &peers) {
int RawSiteToSiteClient::readRequestType(RequestType &type) {
std::string requestTypeStr;
- int ret = peer_->read(requestTypeStr);
-
- if (ret <= 0)
- return ret;
+ const auto ret = peer_->read(requestTypeStr);
+ if (ret == static_cast<size_t>(-1))
+ return gsl::narrow_cast<int>(ret);
for (int i = NEGOTIATE_FLOWFILE_CODEC; i <= SHUTDOWN; i++) {
if (SiteToSiteRequest::RequestTypeStr[i] == requestTypeStr) {
type = (RequestType) i;
- return ret;
+ return gsl::narrow_cast<int>(ret);
}
}
@@ -417,7 +416,7 @@ int RawSiteToSiteClient::readRespond(const std::shared_ptr<Transaction> &transac
}
int RawSiteToSiteClient::writeRespond(const std::shared_ptr<Transaction> &transaction, RespondCode code, std::string message) {
- return writeResponse(transaction, code, message);
+ return writeResponse(transaction, code, std::move(message));
}
bool RawSiteToSiteClient::negotiateCodec() {
diff --git a/libminifi/src/sitetosite/SiteToSiteClient.cpp b/libminifi/src/sitetosite/SiteToSiteClient.cpp
index 1833144..fb24f73 100644
--- a/libminifi/src/sitetosite/SiteToSiteClient.cpp
+++ b/libminifi/src/sitetosite/SiteToSiteClient.cpp
@@ -32,7 +32,7 @@ namespace sitetosite {
int SiteToSiteClient::readResponse(const std::shared_ptr<Transaction>& /*transaction*/, RespondCode &code, std::string &message) {
uint8_t firstByte;
- int ret = peer_->read(firstByte);
+ auto ret = peer_->read(firstByte);
if (ret <= 0 || firstByte != CODE_SEQUENCE_VALUE_1)
return -1;
@@ -48,27 +48,25 @@ int SiteToSiteClient::readResponse(const std::shared_ptr<Transaction>& /*transac
ret = peer_->read(thirdByte);
- if (ret <= 0)
- return ret;
+ if (ret != static_cast<size_t>(-1))
+ return gsl::narrow_cast<int>(ret);
code = (RespondCode) thirdByte;
RespondCodeContext *resCode = this->getRespondCodeContext(code);
-
- if (resCode == NULL) {
- // Not a valid respond code
+ if (!resCode) {
return -1;
}
if (resCode->hasDescription) {
ret = peer_->read(message);
- if (ret <= 0)
+ if (ret != static_cast<size_t>(-1))
return -1;
}
return gsl::narrow<int>(3 + message.size());
}
void SiteToSiteClient::deleteTransaction(const utils::Identifier& transactionID) {
- std::shared_ptr<Transaction> transaction = NULL;
+ std::shared_ptr<Transaction> transaction;
auto it = this->known_transactions_.find(transactionID);
@@ -85,7 +83,7 @@ void SiteToSiteClient::deleteTransaction(const utils::Identifier& transactionID)
int SiteToSiteClient::writeResponse(const std::shared_ptr<Transaction>& /*transaction*/, RespondCode code, std::string message) {
RespondCodeContext *resCode = this->getRespondCodeContext(code);
- if (resCode == NULL) {
+ if (!resCode) {
// Not a valid respond code
return -1;
}
@@ -205,7 +203,7 @@ bool SiteToSiteClient::transferFlowFiles(const std::shared_ptr<core::ProcessCont
bool SiteToSiteClient::confirm(const utils::Identifier& transactionID) {
int ret;
- std::shared_ptr<Transaction> transaction = NULL;
+ std::shared_ptr<Transaction> transaction;
if (peer_state_ != READY) {
bootstrap();
@@ -529,8 +527,7 @@ int16_t SiteToSiteClient::send(const utils::Identifier &transactionID, DataPacke
}
bool SiteToSiteClient::receive(const utils::Identifier& transactionID, DataPacket *packet, bool &eof) {
- int ret;
- std::shared_ptr<Transaction> transaction = NULL;
+ std::shared_ptr<Transaction> transaction;
if (peer_state_ != READY) {
bootstrap();
@@ -568,9 +565,7 @@ bool SiteToSiteClient::receive(const utils::Identifier& transactionID, DataPacke
RespondCode code;
std::string message;
- ret = readResponse(transaction, code, message);
-
- if (ret <= 0) {
+ if (readResponse(transaction, code, message) <= 0) {
return false;
}
if (code == CONTINUE_TRANSACTION) {
@@ -595,8 +590,8 @@ bool SiteToSiteClient::receive(const utils::Identifier& transactionID, DataPacke
// start to read the packet
uint32_t numAttributes;
- ret = transaction->getStream().read(numAttributes);
- if (ret <= 0 || numAttributes > MAX_NUM_ATTRIBUTES) {
+ auto ret = transaction->getStream().read(numAttributes);
+ if (ret != static_cast<size_t>(-1) || numAttributes > MAX_NUM_ATTRIBUTES) {
return false;
}
@@ -606,11 +601,11 @@ bool SiteToSiteClient::receive(const utils::Identifier& transactionID, DataPacke
std::string key;
std::string value;
ret = transaction->getStream().read(key, true);
- if (ret <= 0) {
+ if (ret != static_cast<size_t>(-1)) {
return false;
}
ret = transaction->getStream().read(value, true);
- if (ret <= 0) {
+ if (ret != static_cast<size_t>(-1)) {
return false;
}
packet->_attributes[key] = value;
@@ -619,7 +614,7 @@ bool SiteToSiteClient::receive(const utils::Identifier& transactionID, DataPacke
uint64_t len;
ret = transaction->getStream().read(len);
- if (ret <= 0) {
+ if (ret != static_cast<size_t>(-1)) {
return false;
}
diff --git a/libminifi/src/utils/ByteArrayCallback.cpp b/libminifi/src/utils/ByteArrayCallback.cpp
index 33c9634..a64c3ac 100644
--- a/libminifi/src/utils/ByteArrayCallback.cpp
+++ b/libminifi/src/utils/ByteArrayCallback.cpp
@@ -35,10 +35,10 @@ int64_t ByteOutputCallback::process(const std::shared_ptr<io::BaseStream>& strea
if (stream->size() > 0) {
std::unique_ptr<char> buffer = std::unique_ptr<char>(new char[stream->size()]);
readFully(buffer.get(), stream->size());
- stream->read(reinterpret_cast<uint8_t*>(buffer.get()), gsl::narrow<int>(stream->size()));
- return stream->size();
+ stream->read(reinterpret_cast<uint8_t*>(buffer.get()), stream->size());
+ return gsl::narrow<int64_t>(stream->size());
}
- return size_.load();
+ return gsl::narrow<int64_t>(size_.load());
}
int64_t StreamOutputCallback::process(const std::shared_ptr<io::BaseStream>& stream) {
@@ -46,7 +46,7 @@ int64_t StreamOutputCallback::process(const std::shared_ptr<io::BaseStream>& str
std::unique_ptr<char> buffer = std::unique_ptr<char>(new char[size_.load()]);
auto written = readFully(buffer.get(), size_);
stream->write(reinterpret_cast<uint8_t*>(buffer.get()), gsl::narrow<int>(written));
- return stream->size();
+ return gsl::narrow<int64_t>(stream->size());
}
void StreamOutputCallback::write(char *data, size_t size) {
@@ -55,7 +55,7 @@ void StreamOutputCallback::write(char *data, size_t size) {
write_and_notify(data, size);
}
-const std::vector<char> ByteOutputCallback::to_string() {
+std::vector<char> ByteOutputCallback::to_string() {
std::vector<char> buffer;
buffer.resize(size_.load());
readFully(buffer.data(), size_.load());
diff --git a/libminifi/src/utils/FileOutputCallback.cpp b/libminifi/src/utils/FileOutputCallback.cpp
index 6dfa565..bbd1265 100644
--- a/libminifi/src/utils/FileOutputCallback.cpp
+++ b/libminifi/src/utils/FileOutputCallback.cpp
@@ -34,7 +34,7 @@ int64_t FileOutputCallback::process(const std::shared_ptr<io::BaseStream>& strea
return size_.load();
}
-const std::vector<char> FileOutputCallback::to_string() {
+std::vector<char> FileOutputCallback::to_string() {
std::vector<char> buffer;
buffer.insert(std::end(buffer), std::begin(file_), std::end(file_));
return buffer;
diff --git a/libminifi/test/BufferReader.h b/libminifi/test/BufferReader.h
index 2b4bf73..312ca76 100644
--- a/libminifi/test/BufferReader.h
+++ b/libminifi/test/BufferReader.h
@@ -26,14 +26,14 @@ class BufferReader : public org::apache::nifi::minifi::InputStreamCallback {
public:
explicit BufferReader(std::vector<uint8_t>& buffer) : buffer_(buffer){}
- int write(org::apache::nifi::minifi::io::BaseStream& input, std::size_t len) {
+ size_t write(org::apache::nifi::minifi::io::BaseStream& input, std::size_t len) {
uint8_t tmpBuffer[4096]{};
std::size_t remaining_len = len;
- int total_read = 0;
+ size_t total_read = 0;
while (remaining_len > 0) {
- auto ret = input.read(tmpBuffer, gsl::narrow<int>(std::min(remaining_len, sizeof(tmpBuffer))));
+ const auto ret = input.read(tmpBuffer, std::min(remaining_len, sizeof(tmpBuffer)));
if (ret == 0) break;
- if (ret < 0) return ret;
+ if (ret == static_cast<size_t>(-1)) return ret;
remaining_len -= ret;
total_read += ret;
auto prevSize = buffer_.size();
@@ -44,7 +44,7 @@ class BufferReader : public org::apache::nifi::minifi::InputStreamCallback {
}
int64_t process(const std::shared_ptr<org::apache::nifi::minifi::io::BaseStream>& stream) {
- return write(*stream.get(), stream->size());
+ return static_cast<int64_t>(write(*stream.get(), stream->size()));
}
private:
diff --git a/libminifi/test/archive-tests/CompressContentTests.cpp b/libminifi/test/archive-tests/CompressContentTests.cpp
index 76507df..3d0da83 100644
--- a/libminifi/test/archive-tests/CompressContentTests.cpp
+++ b/libminifi/test/archive-tests/CompressContentTests.cpp
@@ -41,29 +41,23 @@
#include "processors/PutFile.h"
#include "utils/file/FileUtils.h"
#include "../Utils.h"
+#include "utils/gsl.h"
class ReadCallback: public minifi::InputStreamCallback {
public:
- explicit ReadCallback(size_t size) :
- read_size_(0) {
- buffer_size_ = size;
- buffer_ = new uint8_t[buffer_size_];
- archive_buffer_ = nullptr;
- archive_buffer_size_ = 0;
- }
- ~ReadCallback() {
- if (buffer_)
- delete[] buffer_;
- if (archive_buffer_)
- delete[] archive_buffer_;
- }
- int64_t process(const std::shared_ptr<minifi::io::BaseStream>& stream) {
+ explicit ReadCallback(size_t size)
+ :buffer_size_{size}
+ {}
+ ~ReadCallback() override {
+ delete[] buffer_;
+ delete[] archive_buffer_;
+ }
+ int64_t process(const std::shared_ptr<minifi::io::BaseStream>& stream) override {
int64_t total_read = 0;
- int64_t ret = 0;
do {
- ret = stream->read(buffer_ + read_size_, gsl::narrow<int>(buffer_size_ - read_size_));
+ const auto ret = stream->read(buffer_ + read_size_, buffer_size_ - read_size_);
if (ret == 0) break;
- if (ret < 0) return ret;
+ if (ret == static_cast<size_t>(-1)) return -1;
read_size_ += gsl::narrow<size_t>(ret);
total_read += ret;
} while (buffer_size_ != read_size_);
@@ -78,18 +72,18 @@ class ReadCallback: public minifi::InputStreamCallback {
struct archive_entry *ae;
REQUIRE(archive_read_next_header(a, &ae) == ARCHIVE_OK);
- int size = gsl::narrow<int>(archive_entry_size(ae));
+ const auto size = archive_entry_size(ae);
archive_buffer_ = new char[size];
archive_buffer_size_ = size;
- archive_read_data(a, archive_buffer_, size);
+ archive_read_data(a, archive_buffer_, gsl::narrow<size_t>(size));
archive_read_free(a);
}
- uint8_t *buffer_;
size_t buffer_size_;
- size_t read_size_;
- char *archive_buffer_;
- int archive_buffer_size_;
+ uint8_t *buffer_ = new uint8_t[buffer_size_];
+ size_t read_size_ = 0;
+ char *archive_buffer_ = nullptr;
+ int64_t archive_buffer_size_ = 0;
};
/**
diff --git a/libminifi/test/archive-tests/MergeFileTests.cpp b/libminifi/test/archive-tests/MergeFileTests.cpp
index 60e233f..074b153 100644
--- a/libminifi/test/archive-tests/MergeFileTests.cpp
+++ b/libminifi/test/archive-tests/MergeFileTests.cpp
@@ -88,9 +88,9 @@ class FixedBuffer : public minifi::InputStreamCallback {
REQUIRE(size_ + len <= capacity_);
int total_read = 0;
do {
- auto ret = input.read(end(), gsl::narrow<int>(len));
+ const auto ret = input.read(end(), len);
if (ret == 0) break;
- if (ret < 0) return ret;
+ if (ret == static_cast<size_t>(-1)) return -1;
size_ += ret;
len -= ret;
total_read += ret;
@@ -98,7 +98,7 @@ class FixedBuffer : public minifi::InputStreamCallback {
return total_read;
}
int64_t process(const std::shared_ptr<minifi::io::BaseStream>& stream) {
- return write(*stream.get(), capacity_);
+ return write(*stream, capacity_);
}
private:
@@ -111,8 +111,8 @@ std::vector<FixedBuffer> read_archives(const FixedBuffer& input) {
class ArchiveEntryReader {
public:
explicit ArchiveEntryReader(archive* arch) : arch(arch) {}
- int read(uint8_t* out, std::size_t len) {
- return gsl::narrow<int>(archive_read_data(arch, out, len));
+ size_t read(uint8_t* out, std::size_t len) {
+ return gsl::narrow_cast<size_t>(archive_read_data(arch, out, len));
}
private:
archive* arch;
diff --git a/libminifi/test/rocksdb-tests/ContentSessionTests.cpp b/libminifi/test/rocksdb-tests/ContentSessionTests.cpp
index 059dfa9..d2e6d51 100644
--- a/libminifi/test/rocksdb-tests/ContentSessionTests.cpp
+++ b/libminifi/test/rocksdb-tests/ContentSessionTests.cpp
@@ -56,12 +56,11 @@ const std::shared_ptr<minifi::io::BaseStream>& operator>>(const std::shared_ptr<
str = "";
uint8_t buffer[4096]{};
while (true) {
- auto ret = stream->read(buffer, sizeof(buffer));
+ const auto ret = stream->read(buffer, sizeof(buffer));
REQUIRE(ret >= 0);
- if (ret == 0) {
- break;
- }
- str += std::string{reinterpret_cast<char*>(buffer), static_cast<std::size_t>(ret)};
+ REQUIRE(ret != static_cast<size_t>(-1));
+ if (ret == 0) { break; }
+ str += std::string{reinterpret_cast<char*>(buffer), ret};
}
return stream;
}
diff --git a/libminifi/test/rocksdb-tests/DBContentRepositoryTests.cpp b/libminifi/test/rocksdb-tests/DBContentRepositoryTests.cpp
index 6409f3e..7b5c75c 100644
--- a/libminifi/test/rocksdb-tests/DBContentRepositoryTests.cpp
+++ b/libminifi/test/rocksdb-tests/DBContentRepositoryTests.cpp
@@ -104,7 +104,7 @@ TEST_CASE("Delete Claim", "[TestDBCR2]") {
std::string readstr;
// -1 tell us we have an invalid stream
- REQUIRE(read_stream->read(readstr) == -1);
+ REQUIRE(read_stream->read(readstr) == static_cast<size_t>(-1));
}
TEST_CASE("Test Empty Claim", "[TestDBCR3]") {
@@ -140,7 +140,7 @@ TEST_CASE("Test Empty Claim", "[TestDBCR3]") {
std::string readstr;
// -1 tell us we have an invalid stream
- REQUIRE(read_stream->read(readstr) == -1);
+ REQUIRE(read_stream->read(readstr) == static_cast<size_t>(-1));
}
TEST_CASE("Delete NonExistent Claim", "[TestDBCR4]") {
diff --git a/libminifi/test/rocksdb-tests/RocksDBStreamTests.cpp b/libminifi/test/rocksdb-tests/RocksDBStreamTests.cpp
index 88b3929..5e7c3b6 100644
--- a/libminifi/test/rocksdb-tests/RocksDBStreamTests.cpp
+++ b/libminifi/test/rocksdb-tests/RocksDBStreamTests.cpp
@@ -72,5 +72,5 @@ TEST_CASE_METHOD(RocksDBStreamTest, "Read zero bytes") {
minifi::io::RocksDbStream nonExistingStream("two", gsl::make_not_null(db.get()));
- REQUIRE(nonExistingStream.read(nullptr, 0) == -1);
+ REQUIRE(nonExistingStream.read(nullptr, 0) == static_cast<size_t>(-1));
}
diff --git a/libminifi/test/unit/FileStreamTests.cpp b/libminifi/test/unit/FileStreamTests.cpp
index b13dcfa..1ff4ab9 100644
--- a/libminifi/test/unit/FileStreamTests.cpp
+++ b/libminifi/test/unit/FileStreamTests.cpp
@@ -45,7 +45,7 @@ TEST_CASE("TestFileOverWrite", "[TestFiles]") {
minifi::io::FileStream stream(path, 0, true);
std::vector<uint8_t> readBuffer;
- REQUIRE(stream.read(readBuffer, gsl::narrow<int>(stream.size())) == stream.size());
+ REQUIRE(stream.read(readBuffer, stream.size()) == stream.size());
uint8_t* data = readBuffer.data();
@@ -59,7 +59,7 @@ TEST_CASE("TestFileOverWrite", "[TestFiles]") {
std::vector<uint8_t> verifybuffer;
- REQUIRE(stream.read(verifybuffer, gsl::narrow<int>(stream.size())) == stream.size());
+ REQUIRE(stream.read(verifybuffer, stream.size()) == stream.size());
data = verifybuffer.data();
@@ -83,7 +83,7 @@ TEST_CASE("TestFileBadArgumentNoChange", "[TestLoader]") {
minifi::io::FileStream stream(path, 0, true);
std::vector<uint8_t> readBuffer;
- REQUIRE(stream.read(readBuffer, gsl::narrow<int>(stream.size())) == stream.size());
+ REQUIRE(stream.read(readBuffer, stream.size()) == stream.size());
uint8_t* data = readBuffer.data();
@@ -97,7 +97,7 @@ TEST_CASE("TestFileBadArgumentNoChange", "[TestLoader]") {
std::vector<uint8_t> verifybuffer;
- REQUIRE(stream.read(verifybuffer, gsl::narrow<int>(stream.size())) == stream.size());
+ REQUIRE(stream.read(verifybuffer, stream.size()) == stream.size());
data = verifybuffer.data();
@@ -121,7 +121,7 @@ TEST_CASE("TestFileBadArgumentNoChange2", "[TestLoader]") {
minifi::io::FileStream stream(path, 0, true);
std::vector<uint8_t> readBuffer;
- REQUIRE(stream.read(readBuffer, gsl::narrow<int>(stream.size())) == stream.size());
+ REQUIRE(stream.read(readBuffer, stream.size()) == stream.size());
uint8_t* data = readBuffer.data();
@@ -135,7 +135,7 @@ TEST_CASE("TestFileBadArgumentNoChange2", "[TestLoader]") {
std::vector<uint8_t> verifybuffer;
- REQUIRE(stream.read(verifybuffer, gsl::narrow<int>(stream.size())) == stream.size());
+ REQUIRE(stream.read(verifybuffer, stream.size()) == stream.size());
data = verifybuffer.data();
@@ -159,7 +159,7 @@ TEST_CASE("TestFileBadArgumentNoChange3", "[TestLoader]") {
minifi::io::FileStream stream(path, 0, true);
std::vector<uint8_t> readBuffer;
- REQUIRE(stream.read(readBuffer, gsl::narrow<int>(stream.size())) == stream.size());
+ REQUIRE(stream.read(readBuffer, stream.size()) == stream.size());
uint8_t* data = readBuffer.data();
@@ -173,11 +173,11 @@ TEST_CASE("TestFileBadArgumentNoChange3", "[TestLoader]") {
std::vector<uint8_t> verifybuffer;
- REQUIRE(stream.read(nullptr, gsl::narrow<int>(stream.size())) == -1);
+ REQUIRE(stream.read(nullptr, stream.size()) == static_cast<size_t>(-1));
data = verifybuffer.data();
- REQUIRE(std::string(reinterpret_cast<char*>(data), verifybuffer.size()) == "");
+ REQUIRE(std::string(reinterpret_cast<char*>(data), verifybuffer.size()).empty());
std::remove(ss.str().c_str());
}
@@ -197,7 +197,7 @@ TEST_CASE("TestFileBeyondEnd3", "[TestLoader]") {
minifi::io::FileStream stream(path, 0, true);
std::vector<uint8_t> readBuffer;
- REQUIRE(stream.read(readBuffer, gsl::narrow<int>(stream.size())) == stream.size());
+ REQUIRE(stream.read(readBuffer, stream.size()) == stream.size());
uint8_t* data = readBuffer.data();
@@ -232,7 +232,7 @@ TEST_CASE("TestFileExceedSize", "[TestLoader]") {
minifi::io::FileStream stream(path, 0, true);
std::vector<uint8_t> readBuffer;
- REQUIRE(stream.read(readBuffer, gsl::narrow<int>(stream.size())) == stream.size());
+ REQUIRE(stream.read(readBuffer, stream.size()) == stream.size());
uint8_t* data = readBuffer.data();
@@ -280,7 +280,7 @@ TEST_CASE("Non-existing file read/write test") {
REQUIRE(test_controller.getLog().getInstance().contains("Error writing to file: invalid file stream", std::chrono::seconds(0)));
std::vector<uint8_t> readBuffer;
stream.seek(0);
- REQUIRE(stream.read(readBuffer, 1) == -1);
+ REQUIRE(stream.read(readBuffer, 1) == static_cast<size_t>(-1));
REQUIRE(test_controller.getLog().getInstance().contains("Error reading from file: invalid file stream", std::chrono::seconds(0)));
}
@@ -300,10 +300,10 @@ TEST_CASE("Existing file read/write test") {
REQUIRE_FALSE(test_controller.getLog().getInstance().contains("Error writing to file", std::chrono::seconds(0)));
std::vector<uint8_t> readBuffer;
stream.seek(0);
- REQUIRE_FALSE(stream.read(readBuffer, 11) == -1);
+ REQUIRE_FALSE(stream.read(readBuffer, 11) == static_cast<size_t>(-1));
REQUIRE_FALSE(test_controller.getLog().getInstance().contains("Error reading from file", std::chrono::seconds(0)));
stream.seek(0);
- REQUIRE(stream.read(nullptr, 11) == -1);
+ REQUIRE(stream.read(nullptr, 11) == static_cast<size_t>(-1));
REQUIRE(test_controller.getLog().getInstance().contains("Error reading from file: invalid buffer", std::chrono::seconds(0)));
}
diff --git a/libminifi/test/unit/SiteToSiteHelper.h b/libminifi/test/unit/SiteToSiteHelper.h
index b4d5e8f..61257e2 100644
--- a/libminifi/test/unit/SiteToSiteHelper.h
+++ b/libminifi/test/unit/SiteToSiteHelper.h
@@ -59,7 +59,7 @@ class SiteToSiteResponder : public minifi::io::BaseStream {
* @param len length to read
* @return resulting read size
**/
- int read(uint8_t *value, int len) override {
+ size_t read(uint8_t *value, size_t len) override {
return server_responses_.read(value, len);
}
diff --git a/nanofi/src/api/nanofi.cpp b/nanofi/src/api/nanofi.cpp
index b339a28..ca60e7f 100644
--- a/nanofi/src/api/nanofi.cpp
+++ b/nanofi/src/api/nanofi.cpp
@@ -459,7 +459,7 @@ int get_content(const flow_file_record* ff, uint8_t* target, int size) {
std::shared_ptr<minifi::ResourceClaim> claim = std::make_shared<minifi::ResourceClaim>(ff->contentLocation,
*content_repo);
auto stream = (*content_repo)->read(*claim);
- return stream->read(target, size);
+ return static_cast<int>(stream->read(target, static_cast<size_t>(size)));
} else {
file_buffer fb = file_to_buffer(ff->contentLocation);
if (size < 0) {
diff --git a/nanofi/tests/CSite2SiteTests.cpp b/nanofi/tests/CSite2SiteTests.cpp
index a1f8a08..ad3c086 100644
--- a/nanofi/tests/CSite2SiteTests.cpp
+++ b/nanofi/tests/CSite2SiteTests.cpp
@@ -39,6 +39,7 @@
#include "core/cstructs.h"
#include "RandomServerSocket.h"
#include "core/log.h"
+#include "utils/gsl.h"
#define FMT_DEFAULT fmt_lower
@@ -145,15 +146,15 @@ void sunny_path_bootstrap(minifi::io::BaseStream* stream, TransferState& transfe
size_t read_len = 0;
while(!found_codec) {
uint8_t handshake_data[1000];
- int actual_len = stream->read(handshake_data+read_len, 1000-read_len);
- if(actual_len <= 0) {
+ const auto actual_len = stream->read(handshake_data+read_len, 1000-read_len);
+ if(actual_len == 0 || actual_len == static_cast<size_t>(-1)) {
continue;
}
read_len += actual_len;
- std::string incoming_data(reinterpret_cast<const char *>(handshake_data), read_len);
- auto it = std::search(incoming_data.begin(), incoming_data.end(), CODEC_NAME.begin(), CODEC_NAME.end());
+ const std::string incoming_data(reinterpret_cast<const char *>(handshake_data), read_len);
+ const auto it = std::search(incoming_data.begin(), incoming_data.end(), CODEC_NAME.begin(), CODEC_NAME.end());
if(it != incoming_data.end()){
- size_t idx = std::distance(incoming_data.begin(), it);
+ const auto idx = gsl::narrow<size_t>(std::distance(incoming_data.begin(), it));
// Actual version follows the string as an uint32_t // that should be the end of the buffer
found_codec = idx + CODEC_NAME.length() + sizeof(uint32_t) == read_len;
}