You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by fg...@apache.org on 2021/06/15 13:42:31 UTC
[nifi-minifi-cpp] 01/02: MINIFICPP-1507 convert InputStream::read
to size_t
This is an automated email from the ASF dual-hosted git repository.
fgerlits pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
commit 13a43e678e445a88726ac7d56b1670e717cf73ba
Author: Marton Szasz <sz...@gmail.com>
AuthorDate: Wed May 26 16:02:06 2021 +0200
MINIFICPP-1507 convert InputStream::read to size_t
Co-authored-by: adamdebreceni <64...@users.noreply.github.com>
Co-authored-by: Ferenc Gerlits <fg...@users.noreply.github.com>
Signed-off-by: Ferenc Gerlits <fg...@gmail.com>
This closes #1028
---
extensions/aws/processors/PutS3Object.h | 11 +-
extensions/civetweb/processors/ListenHTTP.h | 13 +-
extensions/coap/protocols/CoapC2Protocol.h | 2 +-
extensions/http-curl/client/HTTPCallback.h | 2 +-
extensions/http-curl/client/HTTPStream.cpp | 9 +-
extensions/http-curl/client/HTTPStream.h | 4 +-
extensions/http-curl/tests/CivetStream.h | 8 +-
extensions/http-curl/tests/HTTPHandlers.h | 50 +++--
extensions/jni/jvm/JniReferenceObjects.h | 3 +-
extensions/libarchive/CompressContent.h | 79 ++++----
extensions/libarchive/FocusArchiveEntry.cpp | 9 +-
extensions/librdkafka/PublishKafka.cpp | 7 +-
extensions/mqtt/processors/ConvertJSONAck.h | 13 +-
extensions/mqtt/processors/ConvertUpdate.cpp | 2 +-
extensions/mqtt/processors/PublishMQTT.h | 23 ++-
extensions/opc/src/putopc.cpp | 15 +-
extensions/rocksdb-repos/RocksDbStream.cpp | 35 ++--
extensions/rocksdb-repos/RocksDbStream.h | 4 +-
extensions/script/lua/LuaBaseStream.cpp | 15 +-
extensions/script/python/PyBaseStream.cpp | 8 +-
extensions/script/python/PyBaseStream.h | 2 +-
extensions/sftp/client/SFTPClient.cpp | 14 +-
.../processors/ExecuteProcess.cpp | 17 +-
.../standard-processors/processors/ExtractText.cpp | 27 +--
.../standard-processors/processors/GetTCP.cpp | 12 +-
.../standard-processors/processors/LogAttribute.h | 20 +-
.../standard-processors/processors/PutFile.cpp | 25 +--
.../standard-processors/processors/PutFile.h | 2 +-
.../standard-processors/processors/TailFile.cpp | 2 +-
extensions/tensorflow/TFApplyGraph.cpp | 20 +-
extensions/tensorflow/TFConvertImageToTensor.cpp | 6 +-
extensions/tensorflow/TFExtractTopLabels.cpp | 19 +-
libminifi/include/io/AtomicEntryStream.h | 13 +-
libminifi/include/io/BufferStream.h | 6 +-
libminifi/include/io/CRCStream.h | 6 +-
libminifi/include/io/ClientSocket.h | 4 +-
libminifi/include/io/DescriptorStream.h | 4 +-
libminifi/include/io/FileStream.h | 4 +-
libminifi/include/io/InputStream.h | 38 ++--
libminifi/include/io/Stream.h | 13 +-
libminifi/include/io/StreamPipe.h | 12 +-
libminifi/include/io/tls/SecureDescriptorStream.h | 4 +-
libminifi/include/io/tls/TLSSocket.h | 4 +-
libminifi/include/provenance/Provenance.h | 16 +-
libminifi/include/sitetosite/Peer.h | 2 +-
libminifi/include/sitetosite/SiteToSiteClient.h | 95 ++++------
libminifi/include/utils/ByteArrayCallback.h | 7 +-
libminifi/include/utils/Enum.h | 4 +-
libminifi/src/FlowControlProtocol.cpp | 22 +--
libminifi/src/FlowFileRecord.cpp | 90 +++++----
libminifi/src/c2/ControllerSocketProtocol.cpp | 77 ++++----
libminifi/src/core/ProcessSession.cpp | 10 +-
libminifi/src/core/ProcessSessionReadCallback.cpp | 13 +-
libminifi/src/io/BufferStream.cpp | 15 +-
libminifi/src/io/ClientSocket.cpp | 21 ++-
libminifi/src/io/DescriptorStream.cpp | 15 +-
libminifi/src/io/FileStream.cpp | 24 +--
libminifi/src/io/InputStream.cpp | 58 +++---
libminifi/src/io/tls/SecureDescriptorStream.cpp | 44 +++--
libminifi/src/io/tls/TLSSocket.cpp | 40 ++--
libminifi/src/provenance/Provenance.cpp | 196 ++++++++++++--------
libminifi/src/sitetosite/RawSocketProtocol.cpp | 206 +++++++++++----------
libminifi/src/sitetosite/SiteToSiteClient.cpp | 85 ++++-----
libminifi/src/utils/ByteArrayCallback.cpp | 10 +-
libminifi/test/BufferReader.h | 12 +-
.../test/archive-tests/CompressContentTests.cpp | 45 +++--
libminifi/test/archive-tests/MergeFileTests.cpp | 11 +-
.../test/rocksdb-tests/ContentSessionTests.cpp | 10 +-
.../rocksdb-tests/DBContentRepositoryTests.cpp | 10 +-
.../test/rocksdb-tests/RocksDBStreamTests.cpp | 2 +-
libminifi/test/unit/FileStreamTests.cpp | 28 +--
libminifi/test/unit/SiteToSiteHelper.h | 2 +-
nanofi/src/api/nanofi.cpp | 2 +-
nanofi/tests/CSite2SiteTests.cpp | 13 +-
74 files changed, 912 insertions(+), 859 deletions(-)
diff --git a/extensions/aws/processors/PutS3Object.h b/extensions/aws/processors/PutS3Object.h
index af983db..fb14d2b 100644
--- a/extensions/aws/processors/PutS3Object.h
+++ b/extensions/aws/processors/PutS3Object.h
@@ -31,6 +31,7 @@
#include "S3Processor.h"
#include "utils/GeneralUtils.h"
+#include "utils/gsl.h"
template<typename T>
class S3TestsFixture;
@@ -94,20 +95,20 @@ class PutS3Object : public S3Processor {
auto data_stream = std::make_shared<std::stringstream>();
read_size_ = 0;
while (read_size_ < flow_size_) {
- auto next_read_size = (std::min)(flow_size_ - read_size_, BUFFER_SIZE);
- int read_ret = stream->read(buffer, next_read_size);
- if (read_ret < 0) {
+ const auto next_read_size = (std::min)(flow_size_ - read_size_, BUFFER_SIZE);
+ const auto read_ret = stream->read(buffer, next_read_size);
+ if (io::isError(read_ret)) {
return -1;
}
if (read_ret > 0) {
- data_stream->write(reinterpret_cast<char*>(buffer.data()), next_read_size);
+ data_stream->write(reinterpret_cast<char*>(buffer.data()), gsl::narrow<std::streamsize>(next_read_size));
read_size_ += read_ret;
} else {
break;
}
}
result_ = s3_wrapper_.putObject(options_, data_stream);
- return read_size_;
+ return gsl::narrow<int64_t>(read_size_);
}
uint64_t flow_size_;
diff --git a/extensions/civetweb/processors/ListenHTTP.h b/extensions/civetweb/processors/ListenHTTP.h
index 57cc1dd..de522c3 100644
--- a/extensions/civetweb/processors/ListenHTTP.h
+++ b/extensions/civetweb/processors/ListenHTTP.h
@@ -34,6 +34,7 @@
#include "core/Resource.h"
#include "core/logging/LoggerConfiguration.h"
#include "utils/MinifiConcurrentQueue.h"
+#include "utils/gsl.h"
namespace org {
namespace apache {
@@ -58,7 +59,7 @@ class ListenHTTP : public core::Processor {
callbacks_.log_access = &logAccess;
}
// Destructor
- virtual ~ListenHTTP();
+ ~ListenHTTP() override;
// Processor Name
static constexpr char const *ProcessorName = "ListenHTTP";
// Supported Properties
@@ -133,14 +134,12 @@ class ListenHTTP : public core::Processor {
}
int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
out_str_->resize(stream->size());
- uint64_t num_read = stream->read(reinterpret_cast<uint8_t *>(&(*out_str_)[0]),
- gsl::narrow<int>(stream->size()));
-
+ const auto num_read = stream->read(reinterpret_cast<uint8_t *>(&(*out_str_)[0]), stream->size());
if (num_read != stream->size()) {
throw std::runtime_error("GraphReadCallback failed to fully read flow file input stream");
}
- return num_read;
+ return gsl::narrow<int64_t>(num_read);
}
private:
@@ -161,11 +160,11 @@ class ListenHTTP : public core::Processor {
try {
struct mg_context* ctx = mg_get_context(conn);
/* CivetServer stores 'this' as the userdata when calling mg_start */
- CivetServer* server = static_cast<CivetServer*>(mg_get_user_data(ctx));
+ auto* const server = static_cast<CivetServer*>(mg_get_user_data(ctx));
if (server == nullptr) {
return 0;
}
- std::shared_ptr<logging::Logger>* logger = static_cast<std::shared_ptr<logging::Logger>*>(const_cast<void*>(server->getUserContext()));
+ auto* const logger = static_cast<std::shared_ptr<logging::Logger>*>(const_cast<void*>(server->getUserContext()));
if (logger == nullptr) {
return 0;
}
diff --git a/extensions/coap/protocols/CoapC2Protocol.h b/extensions/coap/protocols/CoapC2Protocol.h
index 03ce24b..c19ffd5 100644
--- a/extensions/coap/protocols/CoapC2Protocol.h
+++ b/extensions/coap/protocols/CoapC2Protocol.h
@@ -49,7 +49,7 @@ namespace coap {
namespace c2 {
#define REQUIRE_VALID(x) \
- if (-1 == x) { \
+ if (io::isError(x)) { \
return minifi::c2::C2Payload(payload.getOperation(), state::UpdateState::READ_ERROR); \
}
diff --git a/extensions/http-curl/client/HTTPCallback.h b/extensions/http-curl/client/HTTPCallback.h
index 3a73cd5..60ed3b3 100644
--- a/extensions/http-curl/client/HTTPCallback.h
+++ b/extensions/http-curl/client/HTTPCallback.h
@@ -84,7 +84,7 @@ class HttpStreamingCallback : public ByteInputCallBack {
if (stream->size() > 0) {
vec.resize(stream->size());
- stream->read(reinterpret_cast<uint8_t*>(vec.data()), gsl::narrow<int>(stream->size()));
+ stream->read(reinterpret_cast<uint8_t*>(vec.data()), stream->size());
}
return processInner(std::move(vec));
diff --git a/extensions/http-curl/client/HTTPStream.cpp b/extensions/http-curl/client/HTTPStream.cpp
index 218d200..9366f91 100644
--- a/extensions/http-curl/client/HTTPStream.cpp
+++ b/extensions/http-curl/client/HTTPStream.cpp
@@ -47,7 +47,7 @@ void HttpStream::close() {
http_read_callback_.close();
}
-void HttpStream::seek(uint64_t /*offset*/) {
+void HttpStream::seek(size_t /*offset*/) {
// seek is an unnecessary part of this implementatino
throw std::logic_error{"HttpStream::seek is unimplemented"};
}
@@ -77,8 +77,7 @@ int HttpStream::write(const uint8_t *value, int size) {
}
}
-int HttpStream::read(uint8_t *buf, int buflen) {
- gsl_Expects(buflen >= 0);
+size_t HttpStream::read(uint8_t *buf, size_t buflen) {
if (buflen == 0) {
return 0;
}
@@ -93,10 +92,10 @@ int HttpStream::read(uint8_t *buf, int buflen) {
started_ = true;
}
}
- return gsl::narrow<int>(http_read_callback_.readFully(reinterpret_cast<char*>(buf), buflen));
+ return http_read_callback_.readFully(reinterpret_cast<char*>(buf), buflen);
} else {
- return -1;
+ return STREAM_ERROR;
}
}
diff --git a/extensions/http-curl/client/HTTPStream.h b/extensions/http-curl/client/HTTPStream.h
index b034252..b23d515 100644
--- a/extensions/http-curl/client/HTTPStream.h
+++ b/extensions/http-curl/client/HTTPStream.h
@@ -76,7 +76,7 @@ class HttpStream : public io::BaseStream {
* Skip to the specified offset.
* @param offset offset to which we will skip
*/
- void seek(uint64_t offset) override;
+ void seek(size_t offset) override;
size_t size() const override {
return written;
@@ -90,7 +90,7 @@ class HttpStream : public io::BaseStream {
* @param buf buffer in which we extract data
* @param buflen
*/
- int read(uint8_t *buf, int buflen) override;
+ size_t read(uint8_t *buf, size_t buflen) override;
/**
* writes value to stream
diff --git a/extensions/http-curl/tests/CivetStream.h b/extensions/http-curl/tests/CivetStream.h
index ce1c15d..d6e66e7 100644
--- a/extensions/http-curl/tests/CivetStream.h
+++ b/extensions/http-curl/tests/CivetStream.h
@@ -21,6 +21,8 @@
#include "io/BaseStream.h"
#include "civetweb.h"
+#include "utils/gsl.h"
+
namespace org {
namespace apache {
namespace nifi {
@@ -42,8 +44,10 @@ class CivetStream : public io::InputStream {
* @param buf buffer in which we extract data
* @param buflen
*/
- int read(uint8_t *buf, int buflen) override {
- return mg_read(conn, buf, buflen);
+ size_t read(uint8_t *buf, size_t buflen) override {
+ const auto ret = mg_read(conn, buf, buflen);
+ if (ret < 0) return STREAM_ERROR;
+ return gsl::narrow<size_t>(ret);
}
protected:
diff --git a/extensions/http-curl/tests/HTTPHandlers.h b/extensions/http-curl/tests/HTTPHandlers.h
index 6f71ade..890acb7 100644
--- a/extensions/http-curl/tests/HTTPHandlers.h
+++ b/extensions/http-curl/tests/HTTPHandlers.h
@@ -213,37 +213,51 @@ class FlowFileResponder : public ServerAwareHandler {
if (!wrong_uri) {
minifi::io::CivetStream civet_stream(conn);
minifi::io::CRCStream < minifi::io::CivetStream > stream(gsl::make_not_null(&civet_stream));
- uint32_t num_attributes;
- int read;
+ uint32_t num_attributes = 0;
uint64_t total_size = 0;
- read = stream.read(num_attributes);
- if (!isServerRunning())return false;
- assert(read > 0); total_size += read;
+ {
+ const auto read = stream.read(num_attributes);
+ if (!isServerRunning()) return false;
+ assert(read > 0);
+ total_size += read;
+ }
- auto flow = std::make_shared<FlowObj>();
+ const auto flow = std::make_shared<FlowObj>();
for (uint32_t i = 0; i < num_attributes; i++) {
std::string name, value;
- read = stream.read(name, true);
- if (!isServerRunning())return false;
- assert(read > 0); total_size += read;
- read = stream.read(value, true);
- if (!isServerRunning())return false;
- assert(read > 0); total_size += read;
+ {
+ const auto read = stream.read(name, true);
+ if (!isServerRunning()) return false;
+ assert(read > 0);
+ total_size += read;
+ }
+ {
+ const auto read = stream.read(value, true);
+ if (!isServerRunning()) return false;
+ assert(read > 0);
+ total_size += read;
+ }
flow->attributes[name] = value;
}
uint64_t length;
- read = stream.read(length);
- if (!isServerRunning())return false;
- assert(read > 0); total_size += read;
+ {
+ const auto read = stream.read(length);
+ if (!isServerRunning()) return false;
+ assert(read > 0);
+ total_size += read;
+ }
total_size += length;
flow->data.resize(gsl::narrow<size_t>(length));
flow->total_size = total_size;
- read = stream.read(flow->data.data(), gsl::narrow<int>(length));
- if (!isServerRunning())return false;
- assert(read == gsl::narrow<int>(length));
+ {
+ const auto read = stream.read(flow->data.data(), length);
+ if (!isServerRunning()) return false;
+ (void)read;
+ assert(read == length);
+ }
if (!invalid_checksum) {
site2site_rest_resp = std::to_string(stream.getCRC());
diff --git a/extensions/jni/jvm/JniReferenceObjects.h b/extensions/jni/jvm/JniReferenceObjects.h
index a55ffcc..5f994bb 100644
--- a/extensions/jni/jvm/JniReferenceObjects.h
+++ b/extensions/jni/jvm/JniReferenceObjects.h
@@ -139,7 +139,8 @@ class JniByteInputStream : public minifi::InputStreamCallback {
int writtenOffset = 0;
int read = 0;
do {
- int actual = stream_->read(buffer_, std::min(remaining, buffer_size_));
+ // JNI takes size as int, there's not much we can do here to support 2GB+ sizes
+ int actual = static_cast<int>(stream_->read(buffer_, std::min(remaining, buffer_size_)));
if (actual <= 0) {
if (read == 0) {
stream_ = nullptr;
diff --git a/extensions/libarchive/CompressContent.h b/extensions/libarchive/CompressContent.h
index 17c406b..9e06aa6 100644
--- a/extensions/libarchive/CompressContent.h
+++ b/extensions/libarchive/CompressContent.h
@@ -21,6 +21,7 @@
#define __COMPRESS_CONTENT_H__
#include <cinttypes>
+#include <utility>
#include "archive_entry.h"
#include "archive.h"
@@ -43,7 +44,7 @@ namespace minifi {
namespace processors {
// CompressContent Class
-class CompressContent: public core::Processor {
+class CompressContent : public core::Processor {
public:
// Constructor
/*!
@@ -56,7 +57,7 @@ public:
, encapsulateInTar_(false) {
}
// Destructor
- virtual ~CompressContent() = default;
+ ~CompressContent() override = default;
// Processor Name
static constexpr char const* ProcessorName = "CompressContent";
// Supported Properties
@@ -94,8 +95,8 @@ public:
ReadCallbackCompress(std::shared_ptr<core::FlowFile> &flow, struct archive *arch, struct archive_entry *entry) :
flow_(flow), arch_(arch), entry_(entry), status_(0), logger_(logging::LoggerFactory<CompressContent>::getLogger()) {
}
- ~ReadCallbackCompress() = default;
- int64_t process(const std::shared_ptr<io::BaseStream>& stream) {
+ ~ReadCallbackCompress() override = default;
+ int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
uint8_t buffer[4096U];
int64_t ret = 0;
uint64_t read_size = 0;
@@ -107,24 +108,24 @@ public:
return -1;
}
while (read_size < flow_->getSize()) {
- ret = stream->read(buffer, sizeof(buffer));
- if (ret < 0) {
+ const auto readret = stream->read(buffer, sizeof(buffer));
+ if (io::isError(readret)) {
status_ = -1;
return -1;
}
- if (ret > 0) {
- ret = archive_write_data(arch_, buffer, gsl::narrow<size_t>(ret));
+ if (readret > 0) {
+ ret = archive_write_data(arch_, buffer, readret);
if (ret < 0) {
logger_->log_error("Compress Content archive error %s", archive_error_string(arch_));
status_ = -1;
return -1;
}
- read_size += ret;
+ read_size += gsl::narrow<uint64_t>(ret);
} else {
break;
}
}
- return read_size;
+ return gsl::narrow<int64_t>(read_size);
}
std::shared_ptr<core::FlowFile> flow_;
struct archive *arch_;
@@ -133,28 +134,24 @@ public:
std::shared_ptr<logging::Logger> logger_;
};
// Nest Callback Class for read stream from flow for decompress
- class ReadCallbackDecompress: public InputStreamCallback {
- public:
- ReadCallbackDecompress(const std::shared_ptr<core::FlowFile> &flow) :
- read_size_(0), offset_(0), flow_(flow) {
- origin_offset_ = flow_->getOffset();
+ struct ReadCallbackDecompress : InputStreamCallback {
+ explicit ReadCallbackDecompress(std::shared_ptr<core::FlowFile> flow) :
+ flow_file(std::move(flow)) {
}
- ~ReadCallbackDecompress() = default;
- int64_t process(const std::shared_ptr<io::BaseStream>& stream) {
- read_size_ = 0;
- stream->seek(offset_);
- int readRet = stream->read(buffer_, sizeof(buffer_));
- read_size_ = readRet;
- if (readRet > 0) {
- offset_ += read_size_;
+ ~ReadCallbackDecompress() override = default;
+ int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
+ stream->seek(offset);
+ const auto readRet = stream->read(buffer, sizeof(buffer));
+ stream_read_result = readRet;
+ if (!io::isError(readRet)) {
+ offset += readRet;
}
- return readRet;
+ return gsl::narrow<int64_t>(readRet);
}
- int64_t read_size_;
- uint8_t buffer_[8192];
- uint64_t offset_;
- uint64_t origin_offset_;
- std::shared_ptr<core::FlowFile> flow_;
+ size_t stream_read_result = 0; // read size or error code, to be checked with io::isError
+ uint8_t buffer[8192] = {0};
+ size_t offset = 0;
+ std::shared_ptr<core::FlowFile> flow_file;
};
// Nest Callback Class for write stream
class WriteCallback: public OutputStreamCallback {
@@ -190,16 +187,15 @@ public:
return ret;
}
- static la_ssize_t archive_read(struct archive *arch, void *context, const void **buff) {
- WriteCallback *callback = (WriteCallback *) context;
+ static la_ssize_t archive_read(struct archive* archive, void *context, const void **buff) {
+ auto *callback = (WriteCallback *) context;
callback->session_->read(callback->flow_, &callback->readDecompressCb_);
- if (callback->readDecompressCb_.read_size_ >= 0) {
- *buff = callback->readDecompressCb_.buffer_;
- return gsl::narrow<la_ssize_t>(callback->readDecompressCb_.read_size_);
- } else {
- archive_set_error(arch, EIO, "Error reading flowfile");
+ *buff = callback->readDecompressCb_.buffer;
+ if (io::isError(callback->readDecompressCb_.stream_read_result)) {
+ archive_set_error(archive, EIO, "Error reading flowfile");
return -1;
}
+ return gsl::narrow<la_ssize_t>(callback->readDecompressCb_.stream_read_result);
}
static la_int64_t archive_skip(struct archive* /*a*/, void* /*client_data*/, la_int64_t /*request*/) {
@@ -383,13 +379,14 @@ public:
std::vector<uint8_t> buffer(16 * 1024U);
int64_t read_size = 0;
while (read_size < gsl::narrow<int64_t>(writer_.flow_->getSize())) {
- int ret = inputStream->read(buffer.data(), gsl::narrow<int>(buffer.size()));
- if (ret < 0) {
+ const auto ret = inputStream->read(buffer.data(), buffer.size());
+ if (io::isError(ret)) {
return -1;
} else if (ret == 0) {
break;
} else {
- if (outputStream_->write(buffer.data(), ret) != ret) {
+ const auto writeret = outputStream_->write(buffer.data(), ret);
+ if (io::isError(writeret) || gsl::narrow<size_t>(writeret) != ret) {
return -1;
}
read_size += ret;
@@ -414,7 +411,7 @@ public:
success_ = filterStream->isFinished();
- return flow_->getSize();
+ return gsl::narrow<int64_t>(flow_->getSize());
}
};
@@ -444,7 +441,7 @@ private:
}
std::shared_ptr<logging::Logger> logger_;
- int compressLevel_;
+ int compressLevel_{};
CompressionMode compressMode_;
ExtendedCompressionFormat compressFormat_;
bool updateFileName_;
diff --git a/extensions/libarchive/FocusArchiveEntry.cpp b/extensions/libarchive/FocusArchiveEntry.cpp
index 7a91e2c..04bdb2c 100644
--- a/extensions/libarchive/FocusArchiveEntry.cpp
+++ b/extensions/libarchive/FocusArchiveEntry.cpp
@@ -34,6 +34,7 @@
#include "core/ProcessContext.h"
#include "core/ProcessSession.h"
#include "Exception.h"
+#include "utils/gsl.h"
namespace org {
namespace apache {
@@ -149,20 +150,20 @@ typedef struct {
la_ssize_t FocusArchiveEntry::ReadCallback::read_cb(struct archive * a, void *d, const void **buf) {
auto data = static_cast<FocusArchiveEntryReadData *>(d);
*buf = data->buf;
- int read = 0;
- int last_read = 0;
+ size_t read = 0;
+ size_t last_read = 0;
do {
last_read = data->stream->read(reinterpret_cast<uint8_t *>(data->buf), 8196 - read);
read += last_read;
- } while (data->processor->isRunning() && last_read > 0 && read < 8196);
+ } while (data->processor->isRunning() && last_read > 0 && !io::isError(last_read) && read < 8196);
if (!data->processor->isRunning()) {
archive_set_error(a, EINTR, "Processor shut down during read");
return -1;
}
- return read;
+ return gsl::narrow<la_ssize_t>(read);
}
int64_t FocusArchiveEntry::ReadCallback::process(const std::shared_ptr<io::BaseStream>& stream) {
diff --git a/extensions/librdkafka/PublishKafka.cpp b/extensions/librdkafka/PublishKafka.cpp
index b83d028..dd7bac8 100644
--- a/extensions/librdkafka/PublishKafka.cpp
+++ b/extensions/librdkafka/PublishKafka.cpp
@@ -409,14 +409,13 @@ class ReadCallback : public InputStreamCallback {
}
for (size_t segment_num = 0; read_size_ < flow_size_; ++segment_num) {
- const int readRet = stream->read(buffer.data(), gsl::narrow<int>(buffer.size()));
- if (readRet < 0) {
+ const auto readRet = stream->read(buffer.data(), buffer.size());
+ if (io::isError(readRet)) {
status_ = -1;
error_ = "Failed to read from stream";
return read_size_;
}
-
- if (readRet <= 0) { break; }
+ if (readRet == 0) { break; }
const auto err = produce(segment_num, buffer, readRet);
if (err) {
diff --git a/extensions/mqtt/processors/ConvertJSONAck.h b/extensions/mqtt/processors/ConvertJSONAck.h
index 47f53d0..6c9d3cd 100644
--- a/extensions/mqtt/processors/ConvertJSONAck.h
+++ b/extensions/mqtt/processors/ConvertJSONAck.h
@@ -31,6 +31,8 @@
#include "MQTTClient.h"
#include "c2/protocols/RESTProtocol.h"
#include "ConvertBase.h"
+#include "utils/gsl.h"
+
namespace org {
namespace apache {
namespace nifi {
@@ -52,7 +54,7 @@ class ConvertJSONAck : public ConvertBase {
logger_(logging::LoggerFactory<ConvertJSONAck>::getLogger()) {
}
// Destructor
- virtual ~ConvertJSONAck() = default;
+ ~ConvertJSONAck() override = default;
// Processor Name
static constexpr char const* ProcessorName = "ConvertJSONAck";
@@ -72,14 +74,13 @@ class ConvertJSONAck : public ConvertBase {
class ReadCallback : public InputStreamCallback {
public:
ReadCallback() = default;
- ~ReadCallback() = default;
- int64_t process(const std::shared_ptr<io::BaseStream>& stream) {
- int64_t ret = 0;
+ ~ReadCallback() override = default;
+ int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
if (nullptr == stream)
return 0;
buffer_.resize(stream->size());
- ret = stream->read(reinterpret_cast<uint8_t*>(buffer_.data()), stream->size());
- return ret;
+ const auto ret = stream->read(reinterpret_cast<uint8_t*>(buffer_.data()), stream->size());
+ return !io::isError(ret) ? gsl::narrow<int64_t>(ret) : -1;
}
std::vector<char> buffer_;
};
diff --git a/extensions/mqtt/processors/ConvertUpdate.cpp b/extensions/mqtt/processors/ConvertUpdate.cpp
index 3460de9..9f3b456 100644
--- a/extensions/mqtt/processors/ConvertUpdate.cpp
+++ b/extensions/mqtt/processors/ConvertUpdate.cpp
@@ -37,7 +37,7 @@ void ConvertUpdate::onTrigger(const std::shared_ptr<core::ProcessContext> &conte
bool received_update = false;
while (mqtt_service_->get(100, listening_topic, update)) {
// first we have the input topic string followed by the update URI
- if (update.size() > 0) {
+ if (!update.empty()) {
io::BufferStream stream(update.data(), update.size());
std::string returnTopic, url;
diff --git a/extensions/mqtt/processors/PublishMQTT.h b/extensions/mqtt/processors/PublishMQTT.h
index b140c74..04368c7 100644
--- a/extensions/mqtt/processors/PublishMQTT.h
+++ b/extensions/mqtt/processors/PublishMQTT.h
@@ -20,6 +20,8 @@
#ifndef __PUBLISH_MQTT_H__
#define __PUBLISH_MQTT_H__
+#include <limits>
+
#include "FlowFileRecord.h"
#include "core/Processor.h"
#include "core/ProcessSession.h"
@@ -29,6 +31,7 @@
#include "core/logging/LoggerConfiguration.h"
#include "MQTTClient.h"
#include "AbstractMQTTProcessor.h"
+#include "utils/gsl.h"
namespace org {
namespace apache {
@@ -50,7 +53,7 @@ class PublishMQTT : public processors::AbstractMQTTProcessor {
max_seg_size_ = ULLONG_MAX;
}
// Destructor
- virtual ~PublishMQTT() = default;
+ ~PublishMQTT() override = default;
// Processor Name
static constexpr char const* ProcessorName = "PublishMQTT";
// Supported Properties
@@ -74,35 +77,37 @@ class PublishMQTT : public processors::AbstractMQTTProcessor {
status_ = 0;
read_size_ = 0;
}
- ~ReadCallback() = default;
- int64_t process(const std::shared_ptr<io::BaseStream>& stream) {
+ ~ReadCallback() override = default;
+ int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
if (flow_size_ < max_seg_size_)
max_seg_size_ = flow_size_;
+ gsl_Expects(max_seg_size_ < gsl::narrow<uint64_t>(std::numeric_limits<int>::max()));
std::vector<unsigned char> buffer(max_seg_size_);
read_size_ = 0;
status_ = 0;
while (read_size_ < flow_size_) {
- int readRet = stream->read(&buffer[0], max_seg_size_);
- if (readRet < 0) {
+ // MQTTClient_message::payloadlen is int, so we can't handle 2GB+
+ const auto readRet = stream->read(&buffer[0], max_seg_size_);
+ if (io::isError(readRet)) {
status_ = -1;
- return read_size_;
+ return gsl::narrow<int64_t>(read_size_);
}
if (readRet > 0) {
MQTTClient_message pubmsg = MQTTClient_message_initializer;
pubmsg.payload = &buffer[0];
- pubmsg.payloadlen = readRet;
+ pubmsg.payloadlen = gsl::narrow<int>(readRet);
pubmsg.qos = qos_;
pubmsg.retained = retain_;
if (MQTTClient_publishMessage(client_, key_.c_str(), &pubmsg, &token_) != MQTTCLIENT_SUCCESS) {
status_ = -1;
return -1;
}
- read_size_ += readRet;
+ read_size_ += gsl::narrow<size_t>(readRet);
} else {
break;
}
}
- return read_size_;
+ return gsl::narrow<int64_t>(read_size_);
}
uint64_t flow_size_;
uint64_t max_seg_size_;
diff --git a/extensions/opc/src/putopc.cpp b/extensions/opc/src/putopc.cpp
index ef1be63..2e09369 100644
--- a/extensions/opc/src/putopc.cpp
+++ b/extensions/opc/src/putopc.cpp
@@ -37,6 +37,7 @@
#include "core/logging/LoggerConfiguration.h"
#include "utils/Id.h"
#include "utils/StringUtils.h"
+#include "utils/gsl.h"
namespace org {
namespace apache {
@@ -427,21 +428,15 @@ namespace processors {
uint64_t size = 0;
do {
- int read = stream->read(buf_.data() + size, 1024);
-
- if (read < 0) {
- return -1;
- }
-
- if (read == 0) {
- break;
- }
+ const auto read = stream->read(buf_.data() + size, 1024);
+ if (io::isError(read)) return -1;
+ if (read == 0) break;
size += read;
} while (size < stream->size());
logger_->log_trace("Read %llu bytes from flowfile content to buffer", stream->size());
- return size;
+ return gsl::narrow<int64_t>(size);
}
} /* namespace processors */
diff --git a/extensions/rocksdb-repos/RocksDbStream.cpp b/extensions/rocksdb-repos/RocksDbStream.cpp
index 86daa20..735692b 100644
--- a/extensions/rocksdb-repos/RocksDbStream.cpp
+++ b/extensions/rocksdb-repos/RocksDbStream.cpp
@@ -47,7 +47,7 @@ RocksDbStream::RocksDbStream(std::string path, gsl::not_null<minifi::internal::R
void RocksDbStream::close() {
}
-void RocksDbStream::seek(uint64_t /*offset*/) {
+void RocksDbStream::seek(size_t /*offset*/) {
// noop
}
@@ -84,28 +84,17 @@ int RocksDbStream::write(const uint8_t *value, int size) {
}
}
-int RocksDbStream::read(uint8_t *buf, int buflen) {
- gsl_Expects(buflen >= 0);
- if (!exists_) {
- return -1;
- }
- if (buflen == 0) {
- return 0;
- }
- if (!IsNullOrEmpty(buf)) {
- size_t amtToRead = gsl::narrow<size_t>(buflen);
- if (offset_ >= value_.size()) {
- return 0;
- }
- if (amtToRead > value_.size() - offset_) {
- amtToRead = value_.size() - offset_;
- }
- std::memcpy(buf, value_.data() + offset_, amtToRead);
- offset_ += amtToRead;
- return gsl::narrow<int>(amtToRead);
- } else {
- return -1;
- }
+size_t RocksDbStream::read(uint8_t *buf, size_t buflen) {
+ // The check have to be in this order for RocksDBStreamTest "Read zero bytes" to succeed
+ if (!exists_) return STREAM_ERROR;
+ if (buflen == 0) return 0;
+ if (IsNullOrEmpty(buf)) return STREAM_ERROR;
+ if (offset_ >= value_.size()) return 0;
+
+ const auto amtToRead = std::min(buflen, value_.size() - offset_);
+ std::memcpy(buf, value_.data() + offset_, amtToRead);
+ offset_ += amtToRead;
+ return amtToRead;
}
} /* namespace io */
diff --git a/extensions/rocksdb-repos/RocksDbStream.h b/extensions/rocksdb-repos/RocksDbStream.h
index 6b4a525..cf4946f 100644
--- a/extensions/rocksdb-repos/RocksDbStream.h
+++ b/extensions/rocksdb-repos/RocksDbStream.h
@@ -56,7 +56,7 @@ class RocksDbStream : public io::BaseStream {
* Skip to the specified offset.
* @param offset offset to which we will skip
*/
- void seek(uint64_t offset) override;
+ void seek(size_t offset) override;
size_t size() const override {
return size_;
@@ -70,7 +70,7 @@ class RocksDbStream : public io::BaseStream {
* @param buf buffer in which we extract data
* @param buflen
*/
- int read(uint8_t *buf, int buflen) override;
+ size_t read(uint8_t *buf, size_t buflen) override;
/**
* writes value to stream
diff --git a/extensions/script/lua/LuaBaseStream.cpp b/extensions/script/lua/LuaBaseStream.cpp
index 1387fae..3545c77 100644
--- a/extensions/script/lua/LuaBaseStream.cpp
+++ b/extensions/script/lua/LuaBaseStream.cpp
@@ -39,7 +39,7 @@ std::string LuaBaseStream::read(size_t len) {
}
if (len <= 0) {
- return nullptr;
+ return std::string{};
}
std::string buffer;
@@ -52,16 +52,11 @@ std::string LuaBaseStream::read(size_t len) {
// 0 <= n < s.size()."
//
// http://www.open-std.org/jtc1/sc22/wg21/docs/papers/2012/n3337.pdf
- auto read = stream_->read(reinterpret_cast<uint8_t *>(&buffer[0]), static_cast<int>(len));
- if (read < 0) {
- return nullptr;
+ const auto read = stream_->read(reinterpret_cast<uint8_t *>(&buffer[0]), len);
+ if (!io::isError(read) && read != len) {
+ buffer.resize(read);
}
-
- if (gsl::narrow<size_t>(read) != len) {
- buffer.resize(gsl::narrow<size_t>(read));
- }
-
- return buffer;
+ return io::isError(read) ? std::string{} : buffer;
}
size_t LuaBaseStream::write(std::string buf) {
diff --git a/extensions/script/python/PyBaseStream.cpp b/extensions/script/python/PyBaseStream.cpp
index fcde480..5e583f7 100644
--- a/extensions/script/python/PyBaseStream.cpp
+++ b/extensions/script/python/PyBaseStream.cpp
@@ -49,13 +49,11 @@ py::bytes PyBaseStream::read(size_t len) {
std::vector<uint8_t> buffer(len);
- auto read = stream_->read(buffer.data(), static_cast<int>(len));
- auto result = py::bytes(reinterpret_cast<char *>(buffer.data()), static_cast<size_t>(read));
-
- return result;
+ const auto read = stream_->read(buffer.data(), len);
+ return py::bytes(reinterpret_cast<char *>(buffer.data()), read);
}
-size_t PyBaseStream::write(py::bytes buf) {
+size_t PyBaseStream::write(const py::bytes& buf) {
const auto &&buf_str = buf.operator std::string();
return static_cast<size_t>(stream_->write(reinterpret_cast<uint8_t *>(const_cast<char *>(buf_str.data())),
static_cast<int>(buf_str.length())));
diff --git a/extensions/script/python/PyBaseStream.h b/extensions/script/python/PyBaseStream.h
index f07fe80..1f11065 100644
--- a/extensions/script/python/PyBaseStream.h
+++ b/extensions/script/python/PyBaseStream.h
@@ -37,7 +37,7 @@ class PyBaseStream {
py::bytes read();
py::bytes read(size_t len = 0);
- size_t write(py::bytes buf);
+ size_t write(const py::bytes& buf);
private:
std::shared_ptr<io::BaseStream> stream_;
diff --git a/extensions/sftp/client/SFTPClient.cpp b/extensions/sftp/client/SFTPClient.cpp
index e657d2e..7af066d 100644
--- a/extensions/sftp/client/SFTPClient.cpp
+++ b/extensions/sftp/client/SFTPClient.cpp
@@ -562,12 +562,12 @@ bool SFTPClient::putFile(const std::string& path, io::BaseStream& input, bool ov
return true;
}
- const size_t buf_size = expected_size < 0 ? MAX_BUFFER_SIZE : std::min<size_t>(expected_size, MAX_BUFFER_SIZE);
+ const size_t buf_size = expected_size < 0 ? MAX_BUFFER_SIZE : std::min(gsl::narrow<size_t>(expected_size), MAX_BUFFER_SIZE);
std::vector<uint8_t> buf(buf_size);
uint64_t total_read = 0U;
do {
- int read_ret = input.read(buf.data(), buf.size());
- if (read_ret < 0) {
+ const auto read_ret = input.read(buf.data(), buf.size());
+ if (io::isError(read_ret)) {
last_error_.setLibssh2Error(LIBSSH2_FX_OK);
logger_->log_error("Error while reading input");
return false;
@@ -577,20 +577,20 @@ bool SFTPClient::putFile(const std::string& path, io::BaseStream& input, bool ov
}
logger_->log_trace("Read %d bytes", read_ret);
total_read += read_ret;
- ssize_t remaining = read_ret;
+ auto remaining = read_ret;
while (remaining > 0) {
- int write_ret = libssh2_sftp_write(file_handle, reinterpret_cast<char*>(buf.data() + (read_ret - remaining)), remaining);
+ const auto write_ret = libssh2_sftp_write(file_handle, reinterpret_cast<char*>(buf.data() + (read_ret - remaining)), remaining);
if (write_ret < 0) {
last_error_.setSftpError(SFTPError::IoError);
logger_->log_error("Failed to write remote file \"%s\"", path.c_str());
return false;
}
logger_->log_trace("Wrote %d bytes to remote file \"%s\"", write_ret, path.c_str());
- remaining -= write_ret;
+ remaining -= gsl::narrow<size_t>(write_ret);
}
} while (true);
- if (expected_size >= 0 && total_read != gsl::narrow<uint64_t>(expected_size)) {
+ if (expected_size >= 0 && total_read != gsl::narrow<size_t>(expected_size)) {
last_error_.setLibssh2Error(LIBSSH2_FX_OK);
logger_->log_error("Input has unexpected size, expected: %ld, actual: %lu", path.c_str(), expected_size, total_read);
return false;
diff --git a/extensions/standard-processors/processors/ExecuteProcess.cpp b/extensions/standard-processors/processors/ExecuteProcess.cpp
index 9350920..902d369 100644
--- a/extensions/standard-processors/processors/ExecuteProcess.cpp
+++ b/extensions/standard-processors/processors/ExecuteProcess.cpp
@@ -27,6 +27,7 @@
#include "utils/StringUtils.h"
#include "utils/TimeUtil.h"
#include "core/TypedValues.h"
+#include "utils/gsl.h"
#if defined(__clang__)
#pragma clang diagnostic push
@@ -160,11 +161,11 @@ void ExecuteProcess::onTrigger(core::ProcessContext *context, core::ProcessSessi
while (1) {
std::this_thread::sleep_for(std::chrono::milliseconds(_batchDuration));
char buffer[4096];
- int numRead = read(_pipefd[0], buffer, sizeof(buffer));
+ const auto numRead = read(_pipefd[0], buffer, sizeof(buffer));
if (numRead <= 0)
break;
- logger_->log_debug("Execute Command Respond %d", numRead);
- ExecuteProcess::WriteCallback callback(buffer, numRead);
+ logger_->log_debug("Execute Command Respond %zd", numRead);
+ ExecuteProcess::WriteCallback callback(buffer, gsl::narrow<uint64_t>(numRead));
auto flowFile = session->create();
if (!flowFile)
continue;
@@ -177,13 +178,13 @@ void ExecuteProcess::onTrigger(core::ProcessContext *context, core::ProcessSessi
} else {
char buffer[4096];
char *bufPtr = buffer;
- int totalRead = 0;
+ size_t totalRead = 0;
std::shared_ptr<core::FlowFile> flowFile = nullptr;
while (true) {
- int numRead = read(_pipefd[0], bufPtr, (sizeof(buffer) - totalRead));
+ const auto numRead = read(_pipefd[0], bufPtr, (sizeof(buffer) - totalRead));
if (numRead <= 0) {
if (totalRead > 0) {
- logger_->log_debug("Execute Command Respond %d", totalRead);
+ logger_->log_debug("Execute Command Respond %zu", totalRead);
// child exits and close the pipe
ExecuteProcess::WriteCallback callback(buffer, totalRead);
if (!flowFile) {
@@ -200,9 +201,9 @@ void ExecuteProcess::onTrigger(core::ProcessContext *context, core::ProcessSessi
}
break;
} else {
- if (numRead == static_cast<int>((sizeof(buffer) - totalRead))) {
+ if (numRead == static_cast<ssize_t>((sizeof(buffer) - totalRead))) {
// we reach the max buffer size
- logger_->log_debug("Execute Command Max Respond %d", sizeof(buffer));
+ logger_->log_debug("Execute Command Max Respond %zu", sizeof(buffer));
ExecuteProcess::WriteCallback callback(buffer, sizeof(buffer));
if (!flowFile) {
flowFile = session->create();
diff --git a/extensions/standard-processors/processors/ExtractText.cpp b/extensions/standard-processors/processors/ExtractText.cpp
index 345b84a..23e0fae 100644
--- a/extensions/standard-processors/processors/ExtractText.cpp
+++ b/extensions/standard-processors/processors/ExtractText.cpp
@@ -32,8 +32,8 @@
#include "core/ProcessContext.h"
#include "core/ProcessSession.h"
#include "core/FlowFile.h"
-
#include "utils/RegexUtils.h"
+#include "utils/gsl.h"
namespace org {
namespace apache {
@@ -113,10 +113,9 @@ void ExtractText::onTrigger(core::ProcessContext *context, core::ProcessSession
}
int64_t ExtractText::ReadCallback::process(const std::shared_ptr<io::BaseStream>& stream) {
- int64_t ret = 0;
- uint64_t read_size = 0;
+ size_t read_size = 0;
bool regex_mode;
- uint64_t size_limit = flowFile_->getSize();
+ size_t size_limit = flowFile_->getSize();
std::string attrKey, sizeLimitStr;
ctx_->getProperty(Attribute.getName(), attrKey);
@@ -126,22 +125,22 @@ int64_t ExtractText::ReadCallback::process(const std::shared_ptr<io::BaseStream>
if (sizeLimitStr.empty())
size_limit = DEFAULT_SIZE_LIMIT;
else if (sizeLimitStr != "0")
- size_limit = std::stoi(sizeLimitStr);
+ size_limit = static_cast<size_t>(std::stoi(sizeLimitStr));
std::ostringstream contentStream;
while (read_size < size_limit) {
// Don't read more than config limit or the size of the buffer
- int length = gsl::narrow<int>(std::min<uint64_t>(size_limit - read_size, buffer_.size()));
- ret = stream->read(buffer_, length);
+ const auto length = std::min(size_limit - read_size, buffer_.size());
+ const auto ret = stream->read(buffer_, length);
- if (ret < 0) {
+ if (io::isError(ret)) {
return -1; // Stream error
} else if (ret == 0) {
break; // End of stream, no more data
}
- contentStream.write(reinterpret_cast<const char*>(buffer_.data()), ret);
+ contentStream.write(reinterpret_cast<const char*>(buffer_.data()), gsl::narrow<std::streamsize>(ret));
read_size += ret;
if (contentStream.fail()) {
return -1;
@@ -162,9 +161,11 @@ int64_t ExtractText::ReadCallback::process(const std::shared_ptr<io::BaseStream>
bool repeatingcapture;
ctx_->getProperty(EnableRepeatingCaptureGroup.getName(), repeatingcapture);
- int maxCaptureSizeProperty;
- ctx_->getProperty(MaxCaptureGroupLen.getName(), maxCaptureSizeProperty);
- size_t maxCaptureSize = gsl::narrow<size_t>(maxCaptureSizeProperty);
+ const size_t maxCaptureSize = [this] {
+ uint64_t val;
+ ctx_->getProperty(MaxCaptureGroupLen.getName(), val);
+ return gsl::narrow<size_t>(val);
+ }();
std::string contentStr = contentStream.str();
@@ -212,7 +213,7 @@ int64_t ExtractText::ReadCallback::process(const std::shared_ptr<io::BaseStream>
} else {
flowFile_->setAttribute(attrKey, contentStream.str());
}
- return read_size;
+ return gsl::narrow<int64_t>(read_size);
}
ExtractText::ReadCallback::ReadCallback(std::shared_ptr<core::FlowFile> flowFile, core::ProcessContext *ctx, std::shared_ptr<logging::Logger> lgr)
diff --git a/extensions/standard-processors/processors/GetTCP.cpp b/extensions/standard-processors/processors/GetTCP.cpp
index 749d515..1c93a8e 100644
--- a/extensions/standard-processors/processors/GetTCP.cpp
+++ b/extensions/standard-processors/processors/GetTCP.cpp
@@ -166,12 +166,12 @@ void GetTCP::onSchedule(const std::shared_ptr<core::ProcessContext> &context, co
do {
if ( socket_ring_buffer_.try_dequeue(socket_ptr) ) {
buffer.resize(receive_buffer_size_);
- int size_read = socket_ptr->read(buffer.data(), gsl::narrow<int>(receive_buffer_size_), false);
- if (size_read >= 0) {
- if (size_read > 0) {
+ const auto size_read = socket_ptr->read(buffer.data(), receive_buffer_size_, false);
+ if (!io::isError(size_read)) {
+ if (size_read != 0) {
// determine cut location
- int startLoc = 0, i = 0;
- for (; i < size_read; i++) {
+ size_t startLoc = 0;
+ for (size_t i = 0; i < size_read; i++) {
if (buffer.at(i) == endOfMessageByte && i > 0) {
if (i-startLoc > 0) {
handler_->handle(socket_ptr->getHostname(), buffer.data()+startLoc, (i-startLoc), true);
@@ -193,7 +193,7 @@ void GetTCP::onSchedule(const std::shared_ptr<core::ProcessContext> &context, co
reconnects = 0;
}
socket_ring_buffer_.enqueue(std::move(socket_ptr));
- } else if (size_read == -2 && stay_connected_) {
+ } else if (size_read == static_cast<size_t>(-2) && stay_connected_) {
if (++reconnects > connection_attempt_limit_) {
logger_->log_info("Too many reconnects, exiting thread");
socket_ptr->close();
diff --git a/extensions/standard-processors/processors/LogAttribute.h b/extensions/standard-processors/processors/LogAttribute.h
index 31445d6..ff886d0 100644
--- a/extensions/standard-processors/processors/LogAttribute.h
+++ b/extensions/standard-processors/processors/LogAttribute.h
@@ -47,14 +47,14 @@ class LogAttribute : public core::Processor {
* Create a new processor
*/
explicit LogAttribute(const std::string& name, const utils::Identifier& uuid = {})
- : Processor(std::move(name), uuid),
+ : Processor(name, uuid),
flowfiles_to_log_(1),
hexencode_(false),
max_line_length_(80U),
logger_(logging::LoggerFactory<LogAttribute>::getLogger()) {
}
// Destructor
- virtual ~LogAttribute() = default;
+ ~LogAttribute() override = default;
// Processor Name
static constexpr char const* ProcessorName = "LogAttribute";
// Supported Properties
@@ -103,16 +103,14 @@ class LogAttribute : public core::Processor {
: logger_(std::move(logger))
, buffer_(size) {
}
- int64_t process(const std::shared_ptr<io::BaseStream>& stream) {
- if (buffer_.size() == 0U) {
- return 0U;
- }
- int ret = stream->read(buffer_.data(), gsl::narrow<int>(buffer_.size()));
- if (ret < 0 || static_cast<uint64_t>(ret) != buffer_.size()) {
- logger_->log_error("%zu bytes were requested from the stream but %d bytes were read. Rolling back.", buffer_.size(), ret);
+ int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
+ if (buffer_.empty()) return 0U;
+ const auto ret = stream->read(buffer_.data(), buffer_.size());
+ if (ret != buffer_.size()) {
+ logger_->log_error("%zu bytes were requested from the stream but %zu bytes were read. Rolling back.", buffer_.size(), size_t{ret});
throw Exception(PROCESSOR_EXCEPTION, "Failed to read the entire FlowFile.");
}
- return buffer_.size();
+ return gsl::narrow<int64_t>(buffer_.size());
}
std::shared_ptr<logging::Logger> logger_;
std::vector<uint8_t> buffer_;
@@ -123,7 +121,7 @@ class LogAttribute : public core::Processor {
// OnTrigger method, implemented by NiFi LogAttribute
void onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) override;
// Initialize, over write by NiFi LogAttribute
- void initialize(void) override;
+ void initialize() override;
private:
core::annotation::Input getInputRequirement() const override {
diff --git a/extensions/standard-processors/processors/PutFile.cpp b/extensions/standard-processors/processors/PutFile.cpp
index 33e223c..9efae99 100644
--- a/extensions/standard-processors/processors/PutFile.cpp
+++ b/extensions/standard-processors/processors/PutFile.cpp
@@ -25,10 +25,12 @@
#include <memory>
#include <string>
#include <set>
+#include <utility>
#ifdef WIN32
#include <Windows.h>
#endif
#include "utils/file/FileUtils.h"
+#include "utils/gsl.h"
namespace org {
namespace apache {
@@ -304,9 +306,9 @@ void PutFile::getDirectoryPermissions(core::ProcessContext *context) {
}
#endif
-PutFile::ReadCallback::ReadCallback(const std::string &tmp_file, const std::string &dest_file)
- : tmp_file_(tmp_file),
- dest_file_(dest_file) {
+PutFile::ReadCallback::ReadCallback(std::string tmp_file, std::string dest_file)
+ : tmp_file_(std::move(tmp_file)),
+ dest_file_(std::move(dest_file)) {
}
// Copy the entire file contents to the temporary file
@@ -319,17 +321,10 @@ int64_t PutFile::ReadCallback::process(const std::shared_ptr<io::BaseStream>& st
std::ofstream tmp_file_os(tmp_file_, std::ios::out | std::ios::binary);
do {
- int read = stream->read(buffer, 1024);
-
- if (read < 0) {
- return -1;
- }
-
- if (read == 0) {
- break;
- }
-
- tmp_file_os.write(reinterpret_cast<char *>(buffer), read);
+ const auto read = stream->read(buffer, 1024);
+ if (io::isError(read)) return -1;
+ if (read == 0) break;
+ tmp_file_os.write(reinterpret_cast<char *>(buffer), gsl::narrow<std::streamsize>(read));
size += read;
} while (size < stream->size());
@@ -339,7 +334,7 @@ int64_t PutFile::ReadCallback::process(const std::shared_ptr<io::BaseStream>& st
write_succeeded_ = true;
}
- return size;
+ return gsl::narrow<int64_t>(size);
}
// Renames tmp file to final destination
diff --git a/extensions/standard-processors/processors/PutFile.h b/extensions/standard-processors/processors/PutFile.h
index 47e767d..47509e2 100644
--- a/extensions/standard-processors/processors/PutFile.h
+++ b/extensions/standard-processors/processors/PutFile.h
@@ -82,7 +82,7 @@ class PutFile : public core::Processor {
class ReadCallback : public InputStreamCallback {
public:
- ReadCallback(const std::string &tmp_file, const std::string &dest_file);
+ ReadCallback(std::string tmp_file, std::string dest_file);
~ReadCallback() override;
int64_t process(const std::shared_ptr<io::BaseStream>& stream) override;
bool commit();
diff --git a/extensions/standard-processors/processors/TailFile.cpp b/extensions/standard-processors/processors/TailFile.cpp
index 836bbd2..26ea193 100644
--- a/extensions/standard-processors/processors/TailFile.cpp
+++ b/extensions/standard-processors/processors/TailFile.cpp
@@ -399,7 +399,7 @@ void TailFile::onSchedule(const std::shared_ptr<core::ProcessContext> &context,
std::string rolling_filename_pattern_glob;
context->getProperty(RollingFilenamePattern.getName(), rolling_filename_pattern_glob);
rolling_filename_pattern_ = utils::file::globToRegex(rolling_filename_pattern_glob);
- initial_start_position_ = utils::parsePropertyWithAllowableValuesOrThrow(*context, InitialStartPosition.getName(), InitialStartPositions::values());
+ initial_start_position_ = InitialStartPositions{utils::parsePropertyWithAllowableValuesOrThrow(*context, InitialStartPosition.getName(), InitialStartPositions::values())};
}
void TailFile::parseStateFileLine(char *buf, std::map<std::string, TailState> &state) const {
diff --git a/extensions/tensorflow/TFApplyGraph.cpp b/extensions/tensorflow/TFApplyGraph.cpp
index 4419caa..14d349e 100644
--- a/extensions/tensorflow/TFApplyGraph.cpp
+++ b/extensions/tensorflow/TFApplyGraph.cpp
@@ -16,10 +16,12 @@
*/
#include "TFApplyGraph.h"
-#include <core/ProcessContext.h>
-#include <core/ProcessSession.h>
#include <tensorflow/cc/ops/standard_ops.h>
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+#include "utils/gsl.h"
+
namespace org {
namespace apache {
namespace nifi {
@@ -191,29 +193,23 @@ void TFApplyGraph::onTrigger(const std::shared_ptr<core::ProcessContext> &contex
int64_t TFApplyGraph::GraphReadCallback::process(const std::shared_ptr<io::BaseStream>& stream) {
std::string graph_proto_buf;
graph_proto_buf.resize(stream->size());
- auto num_read = stream->read(reinterpret_cast<uint8_t *>(&graph_proto_buf[0]),
- static_cast<int>(stream->size()));
-
+ const auto num_read = stream->read(reinterpret_cast<uint8_t *>(&graph_proto_buf[0]), stream->size());
if (num_read != stream->size()) {
throw std::runtime_error("GraphReadCallback failed to fully read flow file input stream");
}
-
graph_def_->ParseFromString(graph_proto_buf);
- return num_read;
+ return gsl::narrow<int64_t>(num_read);
}
int64_t TFApplyGraph::TensorReadCallback::process(const std::shared_ptr<io::BaseStream>& stream) {
std::string tensor_proto_buf;
tensor_proto_buf.resize(stream->size());
- auto num_read = stream->read(reinterpret_cast<uint8_t *>(&tensor_proto_buf[0]),
- static_cast<int>(stream->size()));
-
+ const auto num_read = stream->read(reinterpret_cast<uint8_t *>(&tensor_proto_buf[0]), stream->size());
if (num_read != stream->size()) {
throw std::runtime_error("TensorReadCallback failed to fully read flow file input stream");
}
-
tensor_proto_->ParseFromString(tensor_proto_buf);
- return num_read;
+ return gsl::narrow<int64_t>(num_read);
}
int64_t TFApplyGraph::TensorWriteCallback::process(const std::shared_ptr<io::BaseStream>& stream) {
diff --git a/extensions/tensorflow/TFConvertImageToTensor.cpp b/extensions/tensorflow/TFConvertImageToTensor.cpp
index 5f94548..aea09f8 100644
--- a/extensions/tensorflow/TFConvertImageToTensor.cpp
+++ b/extensions/tensorflow/TFConvertImageToTensor.cpp
@@ -321,14 +321,10 @@ int64_t TFConvertImageToTensor::ImageReadCallback::process(const std::shared_ptr
if (tensor_->AllocatedBytes() < stream->size()) {
throw std::runtime_error("Tensor is not big enough to hold FlowFile bytes");
}
-
- auto num_read = stream->read(tensor_->flat<unsigned char>().data(),
- static_cast<int>(stream->size()));
-
+ const auto num_read = stream->read(tensor_->flat<unsigned char>().data(), stream->size());
if (num_read != stream->size()) {
throw std::runtime_error("TensorReadCallback failed to fully read flow file input stream");
}
-
return num_read;
}
diff --git a/extensions/tensorflow/TFExtractTopLabels.cpp b/extensions/tensorflow/TFExtractTopLabels.cpp
index 1786b6f..2f44f84 100644
--- a/extensions/tensorflow/TFExtractTopLabels.cpp
+++ b/extensions/tensorflow/TFExtractTopLabels.cpp
@@ -19,6 +19,8 @@
#include "tensorflow/cc/ops/standard_ops.h"
+#include "utils/gsl.h"
+
namespace org {
namespace apache {
namespace nifi {
@@ -122,7 +124,7 @@ void TFExtractTopLabels::onTrigger(const std::shared_ptr<core::ProcessContext> &
}
int64_t TFExtractTopLabels::LabelsReadCallback::process(const std::shared_ptr<io::BaseStream>& stream) {
- int64_t total_read = 0;
+ size_t total_read = 0;
std::string label;
uint64_t max_label_len = 65536;
label.resize(max_label_len);
@@ -132,9 +134,9 @@ int64_t TFExtractTopLabels::LabelsReadCallback::process(const std::shared_ptr<io
buf.resize(buf_size);
while (total_read < stream->size()) {
- auto read = stream->read(reinterpret_cast<uint8_t *>(&buf[0]), static_cast<int>(buf_size));
-
- for (auto i = 0; i < read; i++) {
+ const auto read = stream->read(reinterpret_cast<uint8_t *>(&buf[0]), buf_size);
+ if (io::isError(read)) break;
+ for (size_t i = 0; i < read; i++) {
if (buf[i] == '\n' || total_read + i == stream->size()) {
labels_->emplace_back(label.substr(0, label_size));
label_size = 0;
@@ -147,21 +149,18 @@ int64_t TFExtractTopLabels::LabelsReadCallback::process(const std::shared_ptr<io
total_read += read;
}
- return total_read;
+ return gsl::narrow<int64_t>(total_read);
}
int64_t TFExtractTopLabels::TensorReadCallback::process(const std::shared_ptr<io::BaseStream>& stream) {
std::string tensor_proto_buf;
tensor_proto_buf.resize(stream->size());
- auto num_read = stream->read(reinterpret_cast<uint8_t *>(&tensor_proto_buf[0]),
- static_cast<int>(stream->size()));
-
+ const auto num_read = stream->read(reinterpret_cast<uint8_t *>(&tensor_proto_buf[0]), stream->size());
if (num_read != stream->size()) {
throw std::runtime_error("TensorReadCallback failed to fully read flow file input stream");
}
-
tensor_proto_->ParseFromString(tensor_proto_buf);
- return num_read;
+ return gsl::narrow<int64_t>(num_read);
}
} /* namespace processors */
diff --git a/libminifi/include/io/AtomicEntryStream.h b/libminifi/include/io/AtomicEntryStream.h
index 75b2d4c..6456ff2 100644
--- a/libminifi/include/io/AtomicEntryStream.h
+++ b/libminifi/include/io/AtomicEntryStream.h
@@ -64,7 +64,7 @@ class AtomicEntryStream : public BaseStream {
* Skip to the specified offset.
* @param offset offset to which we will skip
*/
- void seek(uint64_t offset) override;
+ void seek(size_t offset) override;
size_t size() const override {
return length_;
@@ -75,7 +75,7 @@ class AtomicEntryStream : public BaseStream {
* @param buf buffer in which we extract data
* @param buflen
*/
- int read(uint8_t *buf, int buflen) override;
+ size_t read(uint8_t *buf, size_t buflen) override;
/**
* writes value to stream
@@ -103,7 +103,7 @@ AtomicEntryStream<T>::~AtomicEntryStream() {
}
template<typename T>
-void AtomicEntryStream<T>::seek(uint64_t offset) {
+void AtomicEntryStream<T>::seek(size_t offset) {
std::lock_guard<std::recursive_mutex> lock(entry_lock_);
offset_ = gsl::narrow<size_t>(offset);
}
@@ -129,14 +129,13 @@ int AtomicEntryStream<T>::write(const uint8_t *value, int size) {
}
template<typename T>
-int AtomicEntryStream<T>::read(uint8_t *buf, int buflen) {
- gsl_Expects(buflen >= 0);
+size_t AtomicEntryStream<T>::read(uint8_t *buf, size_t buflen) {
if (buflen == 0) {
return 0;
}
if (nullptr != buf && !invalid_stream_) {
std::lock_guard<std::recursive_mutex> lock(entry_lock_);
- int len = buflen;
+ auto len = buflen;
core::repository::RepoValue<T> *value;
if (entry_->getValue(key_, &value)) {
if (offset_ + len > value->getBufferSize()) {
@@ -152,7 +151,7 @@ int AtomicEntryStream<T>::read(uint8_t *buf, int buflen) {
return len;
}
}
- return -1;
+ return STREAM_ERROR;
}
} // namespace io
diff --git a/libminifi/include/io/BufferStream.h b/libminifi/include/io/BufferStream.h
index 3f0bfdc..7ab2b9e 100644
--- a/libminifi/include/io/BufferStream.h
+++ b/libminifi/include/io/BufferStream.h
@@ -35,7 +35,7 @@ class BufferStream : public BaseStream {
public:
BufferStream() = default;
- BufferStream(const uint8_t *buf, const unsigned int len) {
+ BufferStream(const uint8_t *buf, const size_t len) {
write(buf, len);
}
@@ -48,7 +48,7 @@ class BufferStream : public BaseStream {
int write(const uint8_t* data, int len) final;
- int read(uint8_t* buffer, int len) override;
+ size_t read(uint8_t* buffer, size_t len) override;
int initialize() override {
buffer_.clear();
@@ -56,7 +56,7 @@ class BufferStream : public BaseStream {
return 0;
}
- void seek(uint64_t offset) override {
+ void seek(size_t offset) override {
readOffset_ += offset;
}
diff --git a/libminifi/include/io/CRCStream.h b/libminifi/include/io/CRCStream.h
index 2204537..b6ae751 100644
--- a/libminifi/include/io/CRCStream.h
+++ b/libminifi/include/io/CRCStream.h
@@ -84,9 +84,9 @@ class InputCRCStream : public virtual CRCStreamBase<StreamType>, public InputStr
public:
using InputStream::read;
- int read(uint8_t *buf, int buflen) override {
- int ret = child_stream_->read(buf, buflen);
- if (ret > 0) {
+ size_t read(uint8_t *buf, size_t buflen) override {
+ const auto ret = child_stream_->read(buf, buflen);
+ if (ret > 0 && !io::isError(ret)) {
crc_ = crc32(crc_, buf, ret);
}
return ret;
diff --git a/libminifi/include/io/ClientSocket.h b/libminifi/include/io/ClientSocket.h
index 84f3836..db71cba 100644
--- a/libminifi/include/io/ClientSocket.h
+++ b/libminifi/include/io/ClientSocket.h
@@ -159,7 +159,7 @@ class Socket : public BaseStream {
* @param buflen
* @param retrieve_all_bytes determines if we should read all bytes before returning
*/
- int read(uint8_t *buf, int buflen) override {
+ size_t read(uint8_t *buf, size_t buflen) override {
return read(buf, buflen, true);
}
@@ -169,7 +169,7 @@ class Socket : public BaseStream {
* @param buflen
* @param retrieve_all_bytes determines if we should read all bytes before returning
*/
- virtual int read(uint8_t *buf, int buflen, bool retrieve_all_bytes);
+ virtual size_t read(uint8_t *buf, size_t buflen, bool retrieve_all_bytes);
protected:
/**
diff --git a/libminifi/include/io/DescriptorStream.h b/libminifi/include/io/DescriptorStream.h
index a7ea713..ee5aec7 100644
--- a/libminifi/include/io/DescriptorStream.h
+++ b/libminifi/include/io/DescriptorStream.h
@@ -52,14 +52,14 @@ class DescriptorStream : public io::BaseStream {
* Skip to the specified offset.
* @param offset offset to which we will skip
*/
- void seek(uint64_t offset) override;
+ void seek(size_t offset) override;
/**
* Reads data and places it into buf
* @param buf buffer in which we extract data
* @param buflen
*/
- int read(uint8_t *buf, int buflen) override;
+ size_t read(uint8_t *buf, size_t buflen) override;
/**
* writes value to stream
diff --git a/libminifi/include/io/FileStream.h b/libminifi/include/io/FileStream.h
index 3156a30..ad7f0dc 100644
--- a/libminifi/include/io/FileStream.h
+++ b/libminifi/include/io/FileStream.h
@@ -63,7 +63,7 @@ class FileStream : public io::BaseStream {
* Skip to the specified offset.
* @param offset offset to which we will skip
*/
- void seek(uint64_t offset) override;
+ void seek(size_t offset) override;
size_t size() const override {
return length_;
@@ -77,7 +77,7 @@ class FileStream : public io::BaseStream {
* @param buf buffer in which we extract data
* @param buflen
*/
- int read(uint8_t *buf, int buflen) override;
+ size_t read(uint8_t *buf, size_t buflen) override;
/**
* writes value to stream
diff --git a/libminifi/include/io/InputStream.h b/libminifi/include/io/InputStream.h
index 60a297c..ba2342b 100644
--- a/libminifi/include/io/InputStream.h
+++ b/libminifi/include/io/InputStream.h
@@ -36,46 +36,46 @@ class InputStream : public virtual Stream {
throw std::runtime_error("Querying size is not supported");
}
/**
- * reads a byte array from the stream
+ * Reads a byte array from the stream. Use isError (Stream.h) to check for errors.
* @param value reference in which will set the result
* @param len length to read
- * @return resulting read size
+ * @return resulting read size or STREAM_ERROR on error or static_cast<size_t>(-2) on EAGAIN
**/
- virtual int read(uint8_t *value, int len) = 0;
+ virtual size_t read(uint8_t *value, size_t len) = 0;
- int read(std::vector<uint8_t>& buffer, int len);
+ size_t read(std::vector<uint8_t>& buffer, size_t len);
/**
- * read string from stream
+ * Read string from stream. Use isError (Stream.h) to check for errors.
* @param str reference string
- * @return resulting read size
+ * @return resulting read size or STREAM_ERROR on error or static_cast<size_t>(-2) on EAGAIN
**/
- int read(std::string &str, bool widen = false);
+ size_t read(std::string &str, bool widen = false);
/**
- * read a bool from stream
+ * Read a bool from stream. Use isError (Stream.h) to check for errors.
* @param value reference to the output
- * @return resulting read size
+ * @return resulting read size or STREAM_ERROR on error or static_cast<size_t>(-2) on EAGAIN
**/
- int read(bool& value);
+ size_t read(bool& value);
/**
- * read a uuid from stream
+ * Read a uuid from stream. Use isError (Stream.h) to check for errors.
* @param value reference to the output
- * @return resulting read size
+ * @return resulting read size or STREAM_ERROR on error or static_cast<size_t>(-2) on EAGAIN
**/
- int read(utils::Identifier& value);
+ size_t read(utils::Identifier& value);
/**
- * reads sizeof(Integral) bytes from the stream
- * @param value reference in which will set the result
- * @return resulting read size
- **/
+ * Reads sizeof(Integral) bytes from the stream. Use isError (Stream.h) to check for errors.
+ * @param value reference in which will set the result
+ * @return resulting read size or STREAM_ERROR on error or static_cast<size_t>(-2) on EAGAIN
+ **/
template<typename Integral, typename = std::enable_if<std::is_unsigned<Integral>::value && !std::is_same<Integral, bool>::value>>
- int read(Integral& value) {
+ size_t read(Integral& value) {
uint8_t buf[sizeof(Integral)]{};
if (read(buf, sizeof(Integral)) != sizeof(Integral)) {
- return -1;
+ return io::STREAM_ERROR;
}
value = 0;
diff --git a/libminifi/include/io/Stream.h b/libminifi/include/io/Stream.h
index e067b52..e694c2e 100644
--- a/libminifi/include/io/Stream.h
+++ b/libminifi/include/io/Stream.h
@@ -24,6 +24,17 @@ namespace nifi {
namespace minifi {
namespace io {
+constexpr size_t STREAM_ERROR = static_cast<size_t>(-1);
+
+inline bool isError(const size_t read_return) noexcept {
+ return read_return == STREAM_ERROR // general error
+ || read_return == static_cast<size_t>(-2); // Socket EAGAIN, to be refactored to eliminate this error condition
+}
+
+inline bool isError(const int write_return) noexcept {
+ return write_return == -1;
+}
+
/**
* All streams serialize/deserialize in big-endian
*/
@@ -31,7 +42,7 @@ class Stream {
public:
virtual void close() {}
- virtual void seek(uint64_t /*offset*/) {
+ virtual void seek(size_t /*offset*/) {
throw std::runtime_error("Seek is not supported");
}
diff --git a/libminifi/include/io/StreamPipe.h b/libminifi/include/io/StreamPipe.h
index b3fafce..abac015 100644
--- a/libminifi/include/io/StreamPipe.h
+++ b/libminifi/include/io/StreamPipe.h
@@ -49,14 +49,10 @@ inline int64_t pipe(const std::shared_ptr<io::InputStream>& src, const std::shar
uint8_t buffer[4096U];
int64_t totalTransferred = 0;
while (true) {
- int readRet = src->read(buffer, sizeof(buffer));
- if (readRet < 0) {
- return readRet;
- }
- if (readRet == 0) {
- break;
- }
- int remaining = readRet;
+ const auto readRet = src->read(buffer, sizeof(buffer));
+ if (io::isError(readRet)) return -1;
+ if (readRet == 0) break;
+ auto remaining = readRet;
int transferred = 0;
while (remaining > 0) {
int writeRet = dst->write(buffer + transferred, remaining);
diff --git a/libminifi/include/io/tls/SecureDescriptorStream.h b/libminifi/include/io/tls/SecureDescriptorStream.h
index 7699910..5f285c1 100644
--- a/libminifi/include/io/tls/SecureDescriptorStream.h
+++ b/libminifi/include/io/tls/SecureDescriptorStream.h
@@ -63,7 +63,7 @@ class SecureDescriptorStream : public io::BaseStream {
* Skip to the specified offset.
* @param offset offset to which we will skip
*/
- void seek(uint64_t offset) override;
+ void seek(size_t offset) override;
size_t size() const override {
return -1;
@@ -74,7 +74,7 @@ class SecureDescriptorStream : public io::BaseStream {
* @param buf buffer in which we extract data
* @param buflen
*/
- int read(uint8_t *buf, int buflen) override;
+ size_t read(uint8_t *buf, size_t buflen) override;
/**
* writes value to stream
diff --git a/libminifi/include/io/tls/TLSSocket.h b/libminifi/include/io/tls/TLSSocket.h
index 6bf741d..24df1d9 100644
--- a/libminifi/include/io/tls/TLSSocket.h
+++ b/libminifi/include/io/tls/TLSSocket.h
@@ -136,14 +136,14 @@ class TLSSocket : public Socket {
using Socket::read;
using Socket::write;
- int read(uint8_t *buf, int buflen, bool retrieve_all_bytes) override;
+ size_t read(uint8_t *buf, size_t buflen, bool retrieve_all_bytes) override;
/**
* Reads data and places it into buf
* @param buf buffer in which we extract data
* @param buflen
*/
- int read(uint8_t *buf, int buflen) override;
+ size_t read(uint8_t *buf, size_t buflen) override;
/**
* Write value to the stream using uint8_t ptr
diff --git a/libminifi/include/provenance/Provenance.h b/libminifi/include/provenance/Provenance.h
index 3b7b4f7..ed1cadc 100644
--- a/libminifi/include/provenance/Provenance.h
+++ b/libminifi/include/provenance/Provenance.h
@@ -363,25 +363,23 @@ class ProvenanceEventRecord : public core::SerializableComponent {
uint64_t getEventTime(const uint8_t *buffer, const size_t bufferSize) {
const auto size = std::min<size_t>(72, bufferSize);
- org::apache::nifi::minifi::io::BufferStream outStream(buffer, gsl::narrow<int>(size));
+ org::apache::nifi::minifi::io::BufferStream outStream(buffer, size);
std::string uuid;
- int ret = outStream.read(uuid);
-
- if (ret <= 0) {
+ const auto uuidret = outStream.read(uuid);
+ if (uuidret == 0 || io::isError(uuidret)) {
return 0;
}
uint32_t eventType;
- ret = outStream.read(eventType);
- if (ret != 4) {
+ const auto typeret = outStream.read(eventType);
+ if (typeret != 4) {
return 0;
}
uint64_t event_time;
-
- ret = outStream.read(event_time);
- if (ret != 8) {
+ const auto timeret = outStream.read(event_time);
+ if (timeret != 8) {
return 0;
}
diff --git a/libminifi/include/sitetosite/Peer.h b/libminifi/include/sitetosite/Peer.h
index a82e3b4..d2138ea 100644
--- a/libminifi/include/sitetosite/Peer.h
+++ b/libminifi/include/sitetosite/Peer.h
@@ -299,7 +299,7 @@ class SiteToSitePeer : public org::apache::nifi::minifi::io::BaseStream {
return stream_->write(data, len);
}
- int read(uint8_t* data, int len) override {
+ size_t read(uint8_t* data, size_t len) override {
return stream_->read(data, len);
}
diff --git a/libminifi/include/sitetosite/SiteToSiteClient.h b/libminifi/include/sitetosite/SiteToSiteClient.h
index fc1dc91..d142bfa 100644
--- a/libminifi/include/sitetosite/SiteToSiteClient.h
+++ b/libminifi/include/sitetosite/SiteToSiteClient.h
@@ -19,6 +19,7 @@
#ifndef LIBMINIFI_INCLUDE_SITETOSITE_SITETOSITECLIENT_H_
#define LIBMINIFI_INCLUDE_SITETOSITE_SITETOSITECLIENT_H_
+#include <algorithm>
#include <map>
#include <memory>
#include <string>
@@ -30,6 +31,7 @@
#include "core/ProcessSession.h"
#include "core/ProcessContext.h"
#include "core/Connectable.h"
+#include "utils/gsl.h"
namespace org {
namespace apache {
@@ -43,15 +45,14 @@ namespace sitetosite {
*/
class DataPacket {
public:
- DataPacket(const std::shared_ptr<logging::Logger> &logger, const std::shared_ptr<Transaction> &transaction, std::map<std::string, std::string> attributes, const std::string &payload)
- : payload_(payload),
- logger_reference_(logger) {
- _size = 0;
- transaction_ = transaction;
- _attributes = attributes;
+ DataPacket(std::shared_ptr<logging::Logger> logger, std::shared_ptr<Transaction> transaction, std::map<std::string, std::string> attributes, const std::string &payload)
+ : _attributes{std::move(attributes)},
+ transaction_{std::move(transaction)},
+ payload_{payload},
+ logger_reference_{std::move(logger)} {
}
std::map<std::string, std::string> _attributes;
- uint64_t _size;
+ uint64_t _size{0};
std::shared_ptr<Transaction> transaction_;
const std::string & payload_;
std::shared_ptr<logging::Logger> logger_reference_;
@@ -60,24 +61,10 @@ class DataPacket {
class SiteToSiteClient : public core::Connectable {
public:
SiteToSiteClient()
- : core::Connectable("SitetoSiteClient"),
- peer_state_(IDLE),
- _batchSendNanos(5000000000),
- ssl_context_service_(nullptr),
- logger_(logging::LoggerFactory<SiteToSiteClient>::getLogger()) {
- _supportedVersion[0] = 5;
- _supportedVersion[1] = 4;
- _supportedVersion[2] = 3;
- _supportedVersion[3] = 2;
- _supportedVersion[4] = 1;
- _currentVersion = _supportedVersion[0];
- _currentVersionIndex = 0;
- _supportedCodecVersion[0] = 1;
- _currentCodecVersion = _supportedCodecVersion[0];
- _currentCodecVersionIndex = 0;
+ : core::Connectable("SitetoSiteClient") {
}
- virtual ~SiteToSiteClient() = default;
+ ~SiteToSiteClient() override = default;
void setSSLContextService(const std::shared_ptr<minifi::controllers::SSLContextService> &context_service) {
ssl_context_service_ = context_service;
@@ -189,13 +176,13 @@ class SiteToSiteClient : public core::Connectable {
return logger_;
}
- virtual void yield() {
+ void yield() override {
}
/**
* Determines if we are connected and operating
*/
- virtual bool isRunning() {
+ bool isRunning() override {
return running_;
}
@@ -203,7 +190,7 @@ class SiteToSiteClient : public core::Connectable {
* Determines if work is available by this connectable
* @return boolean if work is available.
*/
- virtual bool isWorkAvailable() {
+ bool isWorkAvailable() override {
return true;
}
@@ -234,16 +221,16 @@ class SiteToSiteClient : public core::Connectable {
virtual int writeResponse(const std::shared_ptr<Transaction> &transaction, RespondCode code, std::string message);
// getRespondCodeContext
virtual RespondCodeContext *getRespondCodeContext(RespondCode code) {
- for (unsigned int i = 0; i < sizeof(SiteToSiteRequest::respondCodeContext) / sizeof(RespondCodeContext); i++) {
- if (SiteToSiteRequest::respondCodeContext[i].code == code) {
- return &SiteToSiteRequest::respondCodeContext[i];
+ for (auto & i : SiteToSiteRequest::respondCodeContext) {
+ if (i.code == code) {
+ return &i;
}
}
- return NULL;
+ return nullptr;
}
// Peer State
- PeerState peer_state_;
+ PeerState peer_state_{PeerState::IDLE};
// portId
utils::Identifier port_id_;
@@ -254,28 +241,28 @@ class SiteToSiteClient : public core::Connectable {
// Peer Connection
std::unique_ptr<SiteToSitePeer> peer_;
- std::atomic<bool> running_;
+ std::atomic<bool> running_{false};
// transaction map
std::map<utils::Identifier, std::shared_ptr<Transaction>> known_transactions_;
// BATCH_SEND_NANOS
- uint64_t _batchSendNanos;
+ uint64_t _batchSendNanos{5000000000};
/***
* versioning
*/
- uint32_t _supportedVersion[5];
- uint32_t _currentVersion;
- int _currentVersionIndex;
- uint32_t _supportedCodecVersion[1];
- uint32_t _currentCodecVersion;
- int _currentCodecVersionIndex;
+ uint32_t _supportedVersion[5] = {5, 4, 3, 2, 1};
+ int _currentVersionIndex{0};
+ uint32_t _currentVersion{_supportedVersion[_currentVersionIndex]};
+ uint32_t _supportedCodecVersion[1] = {1};
+ int _currentCodecVersionIndex{0};
+ uint32_t _currentCodecVersion{_supportedCodecVersion[_currentCodecVersionIndex]};
std::shared_ptr<minifi::controllers::SSLContextService> ssl_context_service_;
private:
- std::shared_ptr<logging::Logger> logger_;
+ std::shared_ptr<logging::Logger> logger_{logging::LoggerFactory<SiteToSiteClient>::getLogger()};
};
// Nest Callback Class for write stream
@@ -286,13 +273,13 @@ class WriteCallback : public OutputStreamCallback {
}
DataPacket *_packet;
// void process(std::ofstream *stream) {
- int64_t process(const std::shared_ptr<io::BaseStream>& stream) {
+ int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
uint8_t buffer[16384];
uint64_t len = _packet->_size;
uint64_t total = 0;
while (len > 0) {
- int size = len < 16384 ? static_cast<int>(len) : 16384;
- int ret = _packet->transaction_->getStream().read(buffer, size);
+ const auto size = std::min(len, uint64_t{16384});
+ const auto ret = _packet->transaction_->getStream().read(buffer, size);
if (ret != size) {
logging::LOG_ERROR(_packet->logger_reference_) << "Site2Site Receive Flow Size " << size << " Failed " << ret << ", should have received " << len;
return -1;
@@ -302,7 +289,7 @@ class WriteCallback : public OutputStreamCallback {
total += size;
}
logging::LOG_INFO(_packet->logger_reference_) << "Received " << total << " from stream";
- return len;
+ return gsl::narrow<int64_t>(len);
}
};
// Nest Callback Class for read stream
@@ -312,29 +299,23 @@ class ReadCallback : public InputStreamCallback {
: _packet(packet) {
}
DataPacket *_packet;
- int64_t process(const std::shared_ptr<io::BaseStream>& stream) {
+ int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
_packet->_size = 0;
uint8_t buffer[8192] = { 0 };
- int readSize;
size_t size = 0;
do {
- readSize = stream->read(buffer, 8192);
-
- if (readSize == 0) {
- break;
- }
- if (readSize < 0) {
- return -1;
- }
- int ret = _packet->transaction_->getStream().write(buffer, readSize);
- if (ret != readSize) {
+ const auto readSize = stream->read(buffer, 8192);
+ if (readSize == 0) break;
+ if (io::isError(readSize)) return -1;
+ const auto ret = _packet->transaction_->getStream().write(buffer, readSize);
+ if (ret < 0 || gsl::narrow<size_t>(ret) != readSize) {
logging::LOG_INFO(_packet->logger_reference_) << "Site2Site Send Flow Size " << readSize << " Failed " << ret;
return -1;
}
size += readSize;
} while (size < stream->size());
_packet->_size = size;
- return size;
+ return gsl::narrow<int64_t>(size);
}
};
diff --git a/libminifi/include/utils/ByteArrayCallback.h b/libminifi/include/utils/ByteArrayCallback.h
index 84e3362..ace72b6 100644
--- a/libminifi/include/utils/ByteArrayCallback.h
+++ b/libminifi/include/utils/ByteArrayCallback.h
@@ -24,6 +24,7 @@
#include "concurrentqueue.h"
#include "FlowFileRecord.h"
#include "core/logging/LoggerConfiguration.h"
+#include "utils/gsl.h"
namespace org {
namespace apache {
@@ -45,10 +46,10 @@ class ByteInputCallBack : public InputStreamCallback {
if (stream->size() > 0) {
vec.resize(stream->size());
- stream->read(reinterpret_cast<uint8_t*>(vec.data()), gsl::narrow<int>(stream->size()));
+ stream->read(reinterpret_cast<uint8_t*>(vec.data()), stream->size());
}
- return vec.size();
+ return gsl::narrow<int64_t>(vec.size());
}
virtual void seek(size_t) { }
@@ -101,7 +102,7 @@ class ByteOutputCallback : public OutputStreamCallback {
virtual int64_t process(const std::shared_ptr<io::BaseStream>& stream);
- virtual const std::vector<char> to_string();
+ virtual std::vector<char> to_string();
virtual void close();
diff --git a/libminifi/include/utils/Enum.h b/libminifi/include/utils/Enum.h
index fc4754a..b452307 100644
--- a/libminifi/include/utils/Enum.h
+++ b/libminifi/include/utils/Enum.h
@@ -83,8 +83,8 @@ namespace utils {
#define SMART_ENUM_BODY(Clazz, ...) \
constexpr Clazz(Type value = static_cast<Type>(-1)) : value_{value} {} \
- Clazz(const std::string& str) : value_{parse(str.c_str()).value_} {} \
- Clazz(const char* str) : value_{parse(str).value_} {} \
+ explicit Clazz(const std::string& str) : value_{parse(str.c_str()).value_} {} \
+ explicit Clazz(const char* str) : value_{parse(str).value_} {} \
private: \
Type value_; \
public: \
diff --git a/libminifi/src/FlowControlProtocol.cpp b/libminifi/src/FlowControlProtocol.cpp
index 8d8ad4a..79334c2 100644
--- a/libminifi/src/FlowControlProtocol.cpp
+++ b/libminifi/src/FlowControlProtocol.cpp
@@ -71,26 +71,26 @@ int FlowControlProtocol::selectClient(int msec) {
}
int FlowControlProtocol::readData(uint8_t *buf, int buflen) {
+ gsl_Expects(buflen >= 0);
int sendSize = buflen;
while (buflen) {
- int status;
- status = selectClient(MAX_READ_TIMEOUT);
- if (status <= 0) {
- return status;
+ const auto selectstatus = selectClient(MAX_READ_TIMEOUT);
+ if (selectstatus <= 0) {
+ return selectstatus;
}
#ifdef WIN32
- status = _read(_socket, buf, buflen);
+ const auto readstatus = _read(_socket, buf, buflen);
#elif !defined(__MACH__)
- status = read(_socket, buf, buflen);
+ const auto readstatus = read(_socket, buf, gsl::narrow<size_t>(buflen));
#else
- status = recv(_socket, buf, buflen, 0);
+ const auto readstatus = recv(_socket, buf, buflen, 0);
#endif
- if (status <= 0) {
- return status;
+ if (readstatus <= 0) {
+ return gsl::narrow<int>(readstatus);
}
- buflen -= status;
- buf += status;
+ buflen -= readstatus;
+ buf += readstatus;
}
return sendSize;
diff --git a/libminifi/src/FlowFileRecord.cpp b/libminifi/src/FlowFileRecord.cpp
index 813137b..89096a9 100644
--- a/libminifi/src/FlowFileRecord.cpp
+++ b/libminifi/src/FlowFileRecord.cpp
@@ -161,70 +161,90 @@ bool FlowFileRecord::Persist(const std::shared_ptr<core::Repository>& flowReposi
}
std::shared_ptr<FlowFileRecord> FlowFileRecord::DeSerialize(io::InputStream& inStream, const std::shared_ptr<core::ContentRepository>& content_repo, utils::Identifier& container) {
- int ret;
-
auto file = std::make_shared<FlowFileRecord>();
- ret = inStream.read(file->event_time_);
- if (ret != 8) {
- return {};
+ {
+ const auto ret = inStream.read(file->event_time_);
+ if (ret != 8) {
+ return {};
+ }
}
- ret = inStream.read(file->entry_date_);
- if (ret != 8) {
- return {};
+ {
+ const auto ret = inStream.read(file->entry_date_);
+ if (ret != 8) {
+ return {};
+ }
}
- ret = inStream.read(file->lineage_start_date_);
- if (ret != 8) {
- return {};
+ {
+ const auto ret = inStream.read(file->lineage_start_date_);
+ if (ret != 8) {
+ return {};
+ }
}
- ret = inStream.read(file->uuid_);
- if (ret <= 0) {
- return {};
+ {
+ const auto ret = inStream.read(file->uuid_);
+ if (ret == 0 || io::isError(ret)) {
+ return {};
+ }
}
- ret = inStream.read(container);
- if (ret <= 0) {
- return {};
+ {
+ const auto ret = inStream.read(container);
+ if (ret == 0 || io::isError(ret)) {
+ return {};
+ }
}
// read flow attributes
uint32_t numAttributes = 0;
- ret = inStream.read(numAttributes);
- if (ret != 4) {
- return {};
+ {
+ const auto ret = inStream.read(numAttributes);
+ if (ret != 4) {
+ return {};
+ }
}
for (uint32_t i = 0; i < numAttributes; i++) {
std::string key;
- ret = inStream.read(key, true);
- if (ret <= 0) {
- return {};
+ {
+ const auto ret = inStream.read(key, true);
+ if (ret == 0 || io::isError(ret)) {
+ return {};
+ }
}
std::string value;
- ret = inStream.read(value, true);
- if (ret <= 0) {
- return {};
+ {
+ const auto ret = inStream.read(value, true);
+ if (ret == 0 || io::isError(ret)) {
+ return {};
+ }
}
file->attributes_[key] = value;
}
std::string content_full_path;
- ret = inStream.read(content_full_path);
- if (ret <= 0) {
- return {};
+ {
+ const auto ret = inStream.read(content_full_path);
+ if (ret == 0 || io::isError(ret)) {
+ return {};
+ }
}
- ret = inStream.read(file->size_);
- if (ret != 8) {
- return {};
+ {
+ const auto ret = inStream.read(file->size_);
+ if (ret != 8) {
+ return {};
+ }
}
- ret = inStream.read(file->offset_);
- if (ret != 8) {
- return {};
+ {
+ const auto ret = inStream.read(file->offset_);
+ if (ret != 8) {
+ return {};
+ }
}
file->claim_ = std::make_shared<ResourceClaim>(content_full_path, content_repo);
diff --git a/libminifi/src/c2/ControllerSocketProtocol.cpp b/libminifi/src/c2/ControllerSocketProtocol.cpp
index 756c61c..049a3d0 100644
--- a/libminifi/src/c2/ControllerSocketProtocol.cpp
+++ b/libminifi/src/c2/ControllerSocketProtocol.cpp
@@ -94,8 +94,8 @@ void ControllerSocketProtocol::initialize(core::controller::ControllerServicePro
case Operation::START:
{
std::string componentStr;
- int size = stream->read(componentStr);
- if ( size != -1 ) {
+ const auto size = stream->read(componentStr);
+ if (!io::isError(size)) {
auto components = update_sink_->getComponents(componentStr);
for (const auto& component : components) {
component->start();
@@ -108,8 +108,8 @@ void ControllerSocketProtocol::initialize(core::controller::ControllerServicePro
case Operation::STOP:
{
std::string componentStr;
- int size = stream->read(componentStr);
- if ( size != -1 ) {
+ const auto size = stream->read(componentStr);
+ if (!io::isError(size)) {
auto components = update_sink_->getComponents(componentStr);
for (const auto& component : components) {
component->stop();
@@ -122,8 +122,8 @@ void ControllerSocketProtocol::initialize(core::controller::ControllerServicePro
case Operation::CLEAR:
{
std::string connection;
- int size = stream->read(connection);
- if ( size != -1 ) {
+ const auto size = stream->read(connection);
+ if (!io::isError(size)) {
update_sink_->clearConnection(connection);
}
}
@@ -131,21 +131,25 @@ void ControllerSocketProtocol::initialize(core::controller::ControllerServicePro
case Operation::UPDATE:
{
std::string what;
- int size = stream->read(what);
- if (size == -1) {
- logger_->log_debug("Connection broke");
- break;
+ {
+ const auto size = stream->read(what);
+ if (io::isError(size)) {
+ logger_->log_debug("Connection broke");
+ break;
+ }
}
if (what == "flow") {
std::string ff_loc;
- int size = stream->read(ff_loc);
+ {
+ const auto size = stream->read(ff_loc);
+ if (io::isError(size)) {
+ logger_->log_debug("Connection broke");
+ break;
+ }
+ }
std::ifstream tf(ff_loc);
std::string configuration((std::istreambuf_iterator<char>(tf)),
std::istreambuf_iterator<char>());
- if (size == -1) {
- logger_->log_debug("Connection broke");
- break;
- }
update_sink_->applyUpdate("ControllerSocketProtocol", configuration);
}
}
@@ -153,15 +157,15 @@ void ControllerSocketProtocol::initialize(core::controller::ControllerServicePro
case Operation::DESCRIBE:
{
std::string what;
- int size = stream->read(what);
- if (size == -1) {
+ const auto size = stream->read(what);
+ if (io::isError(size)) {
logger_->log_debug("Connection broke");
break;
}
if (what == "queue") {
std::string connection;
- int size = stream->read(connection);
- if (size == -1) {
+ const auto size_ = stream->read(connection);
+ if (io::isError(size_)) {
logger_->log_debug("Connection broke");
break;
}
@@ -177,8 +181,8 @@ void ControllerSocketProtocol::initialize(core::controller::ControllerServicePro
} else if (what == "components") {
io::BufferStream resp;
resp.write(&head, 1);
- uint16_t size = gsl::narrow<uint16_t>(update_sink_->getAllComponents().size());
- resp.write(size);
+ const auto size_ = gsl::narrow<uint16_t>(update_sink_->getAllComponents().size());
+ resp.write(size_);
for (const auto &component : update_sink_->getAllComponents()) {
resp.write(component->getComponentName());
resp.write(component->isRunning() ? "true" : "false");
@@ -203,8 +207,8 @@ void ControllerSocketProtocol::initialize(core::controller::ControllerServicePro
} else if (what == "connections") {
io::BufferStream resp;
resp.write(&head, 1);
- uint16_t size = gsl::narrow<uint16_t>(queue_full_.size());
- resp.write(size);
+ const auto size_ = gsl::narrow<uint16_t>(queue_full_.size());
+ resp.write(size_);
for (const auto &connection : queue_full_) {
resp.write(connection.first, false);
}
@@ -213,17 +217,17 @@ void ControllerSocketProtocol::initialize(core::controller::ControllerServicePro
std::vector<std::string> full_connections;
{
std::lock_guard<std::mutex> lock(controller_mutex_);
- for (auto conn : queue_full_) {
- if (conn.second == true) {
+ for (const auto& conn : queue_full_) {
+ if (conn.second) {
full_connections.push_back(conn.first);
}
}
}
io::BufferStream resp;
resp.write(&head, 1);
- uint16_t full_connection_count = gsl::narrow<uint16_t>(full_connections.size());
+ const auto full_connection_count = gsl::narrow<uint16_t>(full_connections.size());
resp.write(full_connection_count);
- for (auto conn : full_connections) {
+ for (const auto& conn : full_connections) {
resp.write(conn);
}
stream->write(const_cast<uint8_t*>(resp.getBuffer()), gsl::narrow<int>(resp.size()));
@@ -241,10 +245,10 @@ void ControllerSocketProtocol::initialize(core::controller::ControllerServicePro
void ControllerSocketProtocol::parse_content(const std::vector<C2ContentResponse> &content) {
for (const auto &payload_content : content) {
if (payload_content.name == "Components") {
- for (auto content : payload_content.operation_arguments) {
- bool is_enabled = minifi::utils::StringUtils::toBool(content.second.to_string()).value_or(false);
+ for (const auto& operation_argument : payload_content.operation_arguments) {
+ bool is_enabled = minifi::utils::StringUtils::toBool(operation_argument.second.to_string()).value_or(false);
std::lock_guard<std::mutex> lock(controller_mutex_);
- component_map_[content.first] = is_enabled;
+ component_map_[operation_argument.first] = is_enabled;
}
}
}
@@ -253,21 +257,20 @@ void ControllerSocketProtocol::parse_content(const std::vector<C2ContentResponse
int16_t ControllerSocketProtocol::heartbeat(const C2Payload &payload) {
if (server_socket_ == nullptr)
return 0;
- const std::vector<C2ContentResponse> &content = payload.getContent();
for (const auto &pc : payload.getNestedPayloads()) {
if (pc.getLabel() == "flowInfo" || pc.getLabel() == "metrics") {
for (const auto &metrics_payload : pc.getNestedPayloads()) {
if (metrics_payload.getLabel() == "QueueMetrics" || metrics_payload.getLabel() == "queues") {
for (const auto &queue_metrics : metrics_payload.getNestedPayloads()) {
- auto metric_content = queue_metrics.getContent();
- for (const auto &payload_content : queue_metrics.getContent()) {
+ const auto& metric_content = queue_metrics.getContent();
+ for (const auto &payload_content : metric_content) {
uint64_t size = 0;
uint64_t max = 0;
- for (auto content : payload_content.operation_arguments) {
+ for (const auto& content : payload_content.operation_arguments) {
if (content.first == "datasize") {
- size = std::stol(content.second.to_string());
+ size = std::stoull(content.second.to_string());
} else if (content.first == "datasizemax") {
- max = std::stol(content.second.to_string());
+ max = std::stoull(content.second.to_string());
}
}
std::lock_guard<std::mutex> lock(controller_mutex_);
@@ -285,7 +288,7 @@ int16_t ControllerSocketProtocol::heartbeat(const C2Payload &payload) {
}
}
- parse_content(content);
+ parse_content(payload.getContent());
std::vector<uint8_t> buffer;
buffer.resize(1024);
diff --git a/libminifi/src/core/ProcessSession.cpp b/libminifi/src/core/ProcessSession.cpp
index e3ce368..5e3454d 100644
--- a/libminifi/src/core/ProcessSession.cpp
+++ b/libminifi/src/core/ProcessSession.cpp
@@ -337,8 +337,8 @@ void ProcessSession::importFrom(io::InputStream&& stream, const std::shared_ptr<
*
*/
void ProcessSession::importFrom(io::InputStream &stream, const std::shared_ptr<core::FlowFile> &flow) {
- std::shared_ptr<ResourceClaim> claim = content_session_->create();
- int max_read = getpagesize();
+ const std::shared_ptr<ResourceClaim> claim = content_session_->create();
+ const auto max_read = gsl::narrow_cast<size_t>(getpagesize());
std::vector<uint8_t> charBuffer(max_read);
try {
@@ -348,10 +348,10 @@ void ProcessSession::importFrom(io::InputStream &stream, const std::shared_ptr<c
if (nullptr == content_stream) {
throw Exception(FILE_OPERATION_EXCEPTION, "Could not obtain claim for " + claim->getContentFullPath());
}
- int position = 0;
- const int max_size = gsl::narrow<int>(stream.size());
+ size_t position = 0;
+ const auto max_size = stream.size();
while (position < max_size) {
- const int read_size = std::min(max_read, max_size - position);
+ const auto read_size = std::min(max_read, max_size - position);
stream.read(charBuffer, read_size);
content_stream->write(charBuffer.data(), read_size);
diff --git a/libminifi/src/core/ProcessSessionReadCallback.cpp b/libminifi/src/core/ProcessSessionReadCallback.cpp
index 8a21f73..0d822c7 100644
--- a/libminifi/src/core/ProcessSessionReadCallback.cpp
+++ b/libminifi/src/core/ProcessSessionReadCallback.cpp
@@ -24,6 +24,7 @@
#include "core/logging/LoggerConfiguration.h"
#include "io/BaseStream.h"
+#include "utils/gsl.h"
namespace org {
namespace apache {
@@ -47,20 +48,16 @@ int64_t ProcessSessionReadCallback::process(const std::shared_ptr<io::BaseStream
size_t size = 0;
uint8_t buffer[8192];
do {
- int read = stream->read(buffer, 8192);
- if (read < 0) {
- return -1;
- }
- if (read == 0) {
- break;
- }
+ const auto read = stream->read(buffer, 8192);
+ if (io::isError(read)) return -1;
+ if (read == 0) break;
if (!_tmpFileOs.write(reinterpret_cast<char*>(buffer), read)) {
return -1;
}
size += read;
} while (size < stream->size());
_writeSucceeded = true;
- return size;
+ return gsl::narrow<int64_t>(size);
}
// Renames tmp file to final destination
diff --git a/libminifi/src/io/BufferStream.cpp b/libminifi/src/io/BufferStream.cpp
index 6d3ebab..bd0a9d2 100644
--- a/libminifi/src/io/BufferStream.cpp
+++ b/libminifi/src/io/BufferStream.cpp
@@ -36,17 +36,16 @@ int BufferStream::write(const uint8_t *value, int size) {
return size;
}
-int BufferStream::read(uint8_t *buf, int len) {
- gsl_Expects(len >= 0);
- int bytes_available_in_buffer = gsl::narrow<int>(buffer_.size() - readOffset_);
- len = std::min(len, bytes_available_in_buffer);
- auto begin = buffer_.begin() + readOffset_;
- std::copy(begin, begin + len, buf);
+size_t BufferStream::read(uint8_t *buf, size_t len) {
+ const auto bytes_available_in_buffer = buffer_.size() - readOffset_;
+ const auto readlen = std::min(len, gsl::narrow<size_t>(bytes_available_in_buffer));
+ const auto begin = buffer_.begin() + gsl::narrow<decltype(buffer_)::difference_type>(readOffset_);
+ std::copy(begin, begin + gsl::narrow<decltype(buffer_)::difference_type>(readlen), buf);
// increase offset for the next read
- readOffset_ += len;
+ readOffset_ += readlen;
- return len;
+ return readlen;
}
} /* namespace io */
diff --git a/libminifi/src/io/ClientSocket.cpp b/libminifi/src/io/ClientSocket.cpp
index 6427447..57c6915 100644
--- a/libminifi/src/io/ClientSocket.cpp
+++ b/libminifi/src/io/ClientSocket.cpp
@@ -47,6 +47,8 @@
#include "core/logging/LoggerConfiguration.h"
#include "utils/file/FileUtils.h"
#include "utils/GeneralUtils.h"
+#include "utils/gsl.h"
+
namespace util = org::apache::nifi::minifi::utils;
namespace mio = org::apache::nifi::minifi::io;
@@ -523,9 +525,8 @@ int Socket::write(const uint8_t *value, int size) {
return bytes;
}
-int Socket::read(uint8_t *buf, int buflen, bool retrieve_all_bytes) {
- gsl_Expects(buflen >= 0);
- int32_t total_read = 0;
+size_t Socket::read(uint8_t *buf, size_t buflen, bool retrieve_all_bytes) {
+ size_t total_read = 0;
while (buflen) {
int16_t fd = select_descriptor(1000);
if (fd < 0) {
@@ -533,9 +534,9 @@ int Socket::read(uint8_t *buf, int buflen, bool retrieve_all_bytes) {
logger_->log_debug("fd %d close %i", fd, buflen);
utils::file::FileUtils::close(socket_file_descriptor_);
}
- return -1;
+ return STREAM_ERROR;
}
- int bytes_read = recv(fd, reinterpret_cast<char*>(buf), buflen, 0);
+ const auto bytes_read = recv(fd, reinterpret_cast<char*>(buf), buflen, 0);
logger_->log_trace("Recv call %d", bytes_read);
if (bytes_read <= 0) {
if (bytes_read == 0) {
@@ -545,23 +546,23 @@ int Socket::read(uint8_t *buf, int buflen, bool retrieve_all_bytes) {
int err = WSAGetLastError();
if (err == WSAEWOULDBLOCK) {
// continue
- return -2;
+ return static_cast<size_t>(-2);
}
logger_->log_error("Could not recv on %d (port %d), error code: %d", fd, port_, err);
#else
if (errno == EAGAIN || errno == EWOULDBLOCK) {
// continue
- return -2;
+ return static_cast<size_t>(-2);
}
logger_->log_error("Could not recv on %d (port %d), error: %s", fd, port_, strerror(errno));
#endif // WIN32
}
- return -1;
+ return STREAM_ERROR;
}
- buflen -= bytes_read;
+ buflen -= gsl::narrow<size_t>(bytes_read);
buf += bytes_read;
- total_read += bytes_read;
+ total_read += gsl::narrow<size_t>(bytes_read);
if (!retrieve_all_bytes) {
break;
}
diff --git a/libminifi/src/io/DescriptorStream.cpp b/libminifi/src/io/DescriptorStream.cpp
index 2b5fc70..a3a4ee8 100644
--- a/libminifi/src/io/DescriptorStream.cpp
+++ b/libminifi/src/io/DescriptorStream.cpp
@@ -40,7 +40,7 @@ DescriptorStream::DescriptorStream(int fd)
logger_(logging::LoggerFactory<DescriptorStream>::getLogger()) {
}
-void DescriptorStream::seek(uint64_t offset) {
+void DescriptorStream::seek(size_t offset) {
std::lock_guard<std::recursive_mutex> lock(file_lock_);
#ifdef WIN32
_lseeki64(fd_, gsl::narrow<int64_t>(offset), 0x00);
@@ -70,25 +70,24 @@ int DescriptorStream::write(const uint8_t *value, int size) {
}
}
-int DescriptorStream::read(uint8_t *buf, int buflen) {
- gsl_Expects(buflen >= 0);
+size_t DescriptorStream::read(uint8_t *buf, size_t buflen) {
if (buflen == 0) {
return 0;
}
if (!IsNullOrEmpty(buf)) {
#ifdef WIN32
- auto size_read = _read(fd_, buf, buflen);
+ const auto size_read = _read(fd_, buf, buflen);
#else
- auto size_read = ::read(fd_, buf, buflen);
+ const auto size_read = ::read(fd_, buf, buflen);
#endif
if (size_read < 0) {
- return -1;
+ return STREAM_ERROR;
}
- return size_read;
+ return gsl::narrow<size_t>(size_read);
} else {
- return -1;
+ return STREAM_ERROR;
}
}
diff --git a/libminifi/src/io/FileStream.cpp b/libminifi/src/io/FileStream.cpp
index 4f759ae..90fb0a2 100644
--- a/libminifi/src/io/FileStream.cpp
+++ b/libminifi/src/io/FileStream.cpp
@@ -25,6 +25,8 @@
#include "io/FileStream.h"
#include "io/InputStream.h"
#include "io/OutputStream.h"
+#include "utils/gsl.h"
+
namespace org {
namespace apache {
namespace nifi {
@@ -97,13 +99,13 @@ void FileStream::close() {
file_stream_.reset();
}
-void FileStream::seek(uint64_t offset) {
+void FileStream::seek(size_t offset) {
std::lock_guard<std::mutex> lock(file_lock_);
if (file_stream_ == nullptr || !file_stream_->is_open()) {
logging::LOG_ERROR(logger_) << SEEK_ERROR_MSG << INVALID_FILE_STREAM_ERROR_MSG;
return;
}
- offset_ = gsl::narrow<size_t>(offset);
+ offset_ = offset;
file_stream_->clear();
if (!file_stream_->seekg(offset_))
logging::LOG_ERROR(logger_) << SEEK_ERROR_MSG << SEEKG_CALL_ERROR_MSG;
@@ -142,8 +144,7 @@ int FileStream::write(const uint8_t *value, int size) {
}
}
-int FileStream::read(uint8_t *buf, int buflen) {
- gsl_Expects(buflen >= 0);
+size_t FileStream::read(uint8_t *buf, size_t buflen) {
if (buflen == 0) {
return 0;
}
@@ -151,32 +152,31 @@ int FileStream::read(uint8_t *buf, int buflen) {
std::lock_guard<std::mutex> lock(file_lock_);
if (file_stream_ == nullptr || !file_stream_->is_open()) {
logging::LOG_ERROR(logger_) << READ_ERROR_MSG << INVALID_FILE_STREAM_ERROR_MSG;
- return -1;
+ return STREAM_ERROR;
}
- file_stream_->read(reinterpret_cast<char*>(buf), buflen);
+ file_stream_->read(reinterpret_cast<char*>(buf), gsl::narrow<std::streamsize>(buflen));
if (file_stream_->eof() || file_stream_->fail()) {
file_stream_->clear();
seekToEndOfFile(READ_ERROR_MSG);
auto tellg_result = file_stream_->tellg();
if (tellg_result == std::streampos(-1)) {
logging::LOG_ERROR(logger_) << READ_ERROR_MSG << TELLG_CALL_ERROR_MSG;
- return -1;
+ return STREAM_ERROR;
}
- size_t len = gsl::narrow<size_t>(tellg_result);
+ const auto len = gsl::narrow<size_t>(tellg_result);
size_t ret = len - offset_;
offset_ = len;
length_ = len;
logging::LOG_DEBUG(logger_) << path_ << " eof bit, ended at " << offset_;
- return gsl::narrow<int>(ret);
+ return ret;
} else {
offset_ += buflen;
- file_stream_->seekp(offset_);
+ file_stream_->seekp(gsl::narrow<std::streamoff>(offset_));
return buflen;
}
-
} else {
logging::LOG_ERROR(logger_) << READ_ERROR_MSG << INVALID_BUFFER_ERROR_MSG;
- return -1;
+ return STREAM_ERROR;
}
}
diff --git a/libminifi/src/io/InputStream.cpp b/libminifi/src/io/InputStream.cpp
index cdd101e..8daa499 100644
--- a/libminifi/src/io/InputStream.cpp
+++ b/libminifi/src/io/InputStream.cpp
@@ -16,13 +16,10 @@
* limitations under the License.
*/
#include <cstdio>
-#include <iostream>
#include <vector>
#include <string>
-#include <algorithm>
#include "io/InputStream.h"
#include "utils/gsl.h"
-#include "utils/OptionalUtils.h"
namespace org {
namespace apache {
@@ -30,67 +27,68 @@ namespace nifi {
namespace minifi {
namespace io {
-int InputStream::read(std::vector<uint8_t>& buffer, int len) {
- if (buffer.size() < gsl::narrow<size_t>(len)) {
+size_t InputStream::read(std::vector<uint8_t>& buffer, size_t len) {
+ if (buffer.size() < len) {
buffer.resize(len);
}
- int ret = read(buffer.data(), len);
- buffer.resize((std::max)(ret, 0));
+ const auto ret = read(buffer.data(), len);
+ if (io::isError(ret)) return ret;
+ buffer.resize(ret);
return ret;
}
-int InputStream::read(bool &value) {
+size_t InputStream::read(bool &value) {
uint8_t buf = 0;
if (read(&buf, 1) != 1) {
- return -1;
+ return STREAM_ERROR;
}
value = buf;
return 1;
}
-int InputStream::read(utils::Identifier &value) {
+size_t InputStream::read(utils::Identifier &value) {
std::string uuidStr;
- int ret = read(uuidStr);
- if (ret < 0) {
+ const auto ret = read(uuidStr);
+ if (isError(ret)) {
return ret;
}
auto optional_uuid = utils::Identifier::parse(uuidStr);
if (!optional_uuid) {
- return -1;
+ return STREAM_ERROR;
}
value = optional_uuid.value();
return ret;
}
-int InputStream::read(std::string &str, bool widen) {
- uint32_t len = 0;
- int ret = 0;
+size_t InputStream::read(std::string &str, bool widen) {
+ uint32_t string_length = 0;
+ size_t length_return = 0;
if (!widen) {
uint16_t shortLength = 0;
- ret = read(shortLength);
- len = shortLength;
+ length_return = read(shortLength);
+ string_length = shortLength;
} else {
- ret = read(len);
+ length_return = read(string_length);
}
- if (ret <= 0) {
- return ret;
+ if (length_return == 0 || isError(length_return)) {
+ return length_return;
}
- if (len == 0) {
- str = "";
- return ret;
+ if (string_length == 0) {
+ str.clear();
+ return length_return;
}
- std::vector<uint8_t> buffer(len);
- uint32_t bytes_read = gsl::narrow<uint32_t>(read(buffer.data(), len));
- if (bytes_read != len) {
- return -1;
+ std::vector<uint8_t> buffer(string_length);
+ const auto read_return = read(buffer.data(), string_length);
+ if (read_return != string_length) {
+ return read_return;
}
- str = std::string(reinterpret_cast<const char*>(buffer.data()), len);
- return ret + len;
+ str = std::string(reinterpret_cast<const char*>(buffer.data()), string_length);
+ return length_return + string_length;
}
} /* namespace io */
diff --git a/libminifi/src/io/tls/SecureDescriptorStream.cpp b/libminifi/src/io/tls/SecureDescriptorStream.cpp
index c5b0b3f..ea93dea 100644
--- a/libminifi/src/io/tls/SecureDescriptorStream.cpp
+++ b/libminifi/src/io/tls/SecureDescriptorStream.cpp
@@ -23,6 +23,8 @@
#include <string>
#include <Exception.h>
#include "io/validation.h"
+#include "utils/gsl.h"
+
namespace org {
namespace apache {
namespace nifi {
@@ -34,7 +36,7 @@ SecureDescriptorStream::SecureDescriptorStream(int fd, SSL *ssl)
logger_(logging::LoggerFactory<SecureDescriptorStream>::getLogger()) {
}
-void SecureDescriptorStream::seek(uint64_t offset) {
+void SecureDescriptorStream::seek(size_t offset) {
std::lock_guard<std::recursive_mutex> lock(file_lock_);
#ifdef WIN32
_lseeki64(fd_, gsl::narrow<int64_t>(offset), 0x00);
@@ -71,34 +73,30 @@ int SecureDescriptorStream::write(const uint8_t *value, int size) {
}
}
-int SecureDescriptorStream::read(uint8_t *buf, int buflen) {
- gsl_Expects(buflen >= 0);
+size_t SecureDescriptorStream::read(uint8_t * const buf, const size_t buflen) {
if (buflen == 0) {
return 0;
}
- if (!IsNullOrEmpty(buf)) {
- int total_read = 0;
- int status = 0;
- while (buflen) {
- int sslStatus;
- do {
- status = SSL_read(ssl_, buf, buflen);
- sslStatus = SSL_get_error(ssl_, status);
- } while (status < 0 && sslStatus == SSL_ERROR_WANT_READ);
-
- if (status < 0)
- break;
-
- buflen -= status;
- buf += status;
- total_read += status;
- }
+ if (IsNullOrEmpty(buf)) return STREAM_ERROR;
+ size_t total_read = 0;
+ uint8_t* writepos = buf;
+ while (buflen > total_read) {
+ int status;
+ int sslStatus;
+ do {
+ const auto ssl_read_size = gsl::narrow<int>(std::min(buflen - total_read, gsl::narrow<size_t>(std::numeric_limits<int>::max())));
+ status = SSL_read(ssl_, writepos, ssl_read_size);
+ sslStatus = SSL_get_error(ssl_, status);
+ } while (status < 0 && sslStatus == SSL_ERROR_WANT_READ);
- return total_read;
+ if (status < 0)
+ break;
- } else {
- return -1;
+ writepos += status;
+ total_read += gsl::narrow<size_t>(status);
}
+
+ return total_read;
}
} /* namespace io */
diff --git a/libminifi/src/io/tls/TLSSocket.cpp b/libminifi/src/io/tls/TLSSocket.cpp
index a19e5b9..93422fa 100644
--- a/libminifi/src/io/tls/TLSSocket.cpp
+++ b/libminifi/src/io/tls/TLSSocket.cpp
@@ -350,36 +350,38 @@ int16_t TLSSocket::select_descriptor(const uint16_t msec) {
return -1;
}
-int TLSSocket::read(uint8_t *buf, int buflen, bool /*retrieve_all_bytes*/) {
- gsl_Expects(buflen >= 0);
- int total_read = 0;
+size_t TLSSocket::read(uint8_t *buf, size_t buflen, bool /*retrieve_all_bytes*/) {
+ size_t total_read = 0;
int status = 0;
int loc = 0;
int16_t fd = select_descriptor(1000);
if (fd < 0) {
close();
- return -1;
+ return STREAM_ERROR;
}
auto fd_ssl = get_ssl(fd);
if (IsNullOrEmpty(fd_ssl)) {
- return -1;
+ return STREAM_ERROR;
}
if (!SSL_pending(fd_ssl)) {
return 0;
}
while (buflen) {
if (fd <= 0) {
- return -1;
+ return STREAM_ERROR;
}
int sslStatus;
do {
- status = SSL_read(fd_ssl, buf + loc, buflen);
+ const auto ssl_read_size = gsl::narrow<int>(std::min(buflen, gsl::narrow<size_t>(std::numeric_limits<int>::max())));
+ status = SSL_read(fd_ssl, buf + loc, ssl_read_size);
sslStatus = SSL_get_error(fd_ssl, status);
} while (status < 0 && sslStatus == SSL_ERROR_WANT_READ && SSL_pending(fd_ssl));
- buflen -= status;
+ if (status < 0) break;
+
+ buflen -= gsl::narrow<size_t>(status);
loc += status;
- total_read += status;
+ total_read += gsl::narrow<size_t>(status);
}
return total_read;
@@ -417,33 +419,33 @@ int TLSSocket::write(const uint8_t *value, int size) {
return writeData(value, size, fd);
}
-int TLSSocket::read(uint8_t *buf, int buflen) {
- gsl_Expects(buflen >= 0);
- int total_read = 0;
+size_t TLSSocket::read(uint8_t *buf, size_t buflen) {
+ size_t total_read = 0;
int status = 0;
while (buflen) {
- int16_t fd = select_descriptor(1000);
+ const int16_t fd = select_descriptor(1000);
if (fd < 0) {
close();
- return -1;
+ return STREAM_ERROR;
}
int sslStatus;
do {
- auto fd_ssl = get_ssl(fd);
+ const auto fd_ssl = get_ssl(fd);
if (IsNullOrEmpty(fd_ssl)) {
- return -1;
+ return STREAM_ERROR;
}
- status = SSL_read(fd_ssl, buf, buflen);
+ const auto ssl_read_size = gsl::narrow<int>(std::min(buflen, gsl::narrow<size_t>(std::numeric_limits<int>::max())));
+ status = SSL_read(fd_ssl, buf, ssl_read_size);
sslStatus = SSL_get_error(fd_ssl, status);
} while (status < 0 && sslStatus == SSL_ERROR_WANT_READ);
if (status < 0)
break;
- buflen -= status;
+ buflen -= gsl::narrow<size_t>(status);
buf += status;
- total_read += status;
+ total_read += gsl::narrow<size_t>(status);
}
return total_read;
diff --git a/libminifi/src/provenance/Provenance.cpp b/libminifi/src/provenance/Provenance.cpp
index f9af7f9..f3b0715 100644
--- a/libminifi/src/provenance/Provenance.cpp
+++ b/libminifi/src/provenance/Provenance.cpp
@@ -235,145 +235,191 @@ bool ProvenanceEventRecord::Serialize(const std::shared_ptr<core::SerializableCo
}
bool ProvenanceEventRecord::DeSerialize(const uint8_t *buffer, const size_t bufferSize) {
- int ret;
-
org::apache::nifi::minifi::io::BufferStream outStream(buffer, gsl::narrow<unsigned int>(bufferSize));
- ret = outStream.read(uuid_);
- if (ret <= 0) {
- return false;
+ {
+ const auto ret = outStream.read(uuid_);
+ if (ret == 0 || io::isError(ret)) {
+ return false;
+ }
}
uint32_t eventType;
- ret = outStream.read(eventType);
- if (ret != 4) {
- return false;
+ {
+ const auto ret = outStream.read(eventType);
+ if (ret != 4) {
+ return false;
+ }
}
- this->_eventType = (ProvenanceEventRecord::ProvenanceEventType) eventType;
- ret = outStream.read(this->_eventTime);
- if (ret != 8) {
- return false;
+ this->_eventType = (ProvenanceEventRecord::ProvenanceEventType) eventType;
+ {
+ const auto ret = outStream.read(this->_eventTime);
+ if (ret != 8) {
+ return false;
+ }
}
- ret = outStream.read(this->_entryDate);
- if (ret != 8) {
- return false;
+ {
+ const auto ret = outStream.read(this->_entryDate);
+ if (ret != 8) {
+ return false;
+ }
}
- ret = outStream.read(this->_eventDuration);
- if (ret != 8) {
- return false;
+ {
+ const auto ret = outStream.read(this->_eventDuration);
+ if (ret != 8) {
+ return false;
+ }
}
- ret = outStream.read(this->_lineageStartDate);
- if (ret != 8) {
- return false;
+ {
+ const auto ret = outStream.read(this->_lineageStartDate);
+ if (ret != 8) {
+ return false;
+ }
}
- ret = outStream.read(this->_componentId);
- if (ret <= 0) {
- return false;
+ {
+ const auto ret = outStream.read(this->_componentId);
+ if (ret == 0 || io::isError(ret)) {
+ return false;
+ }
}
- ret = outStream.read(this->_componentType);
- if (ret <= 0) {
- return false;
+ {
+ const auto ret = outStream.read(this->_componentType);
+ if (ret == 0 || io::isError(ret)) {
+ return false;
+ }
}
- ret = outStream.read(this->flow_uuid_);
- if (ret <= 0) {
- return false;
+ {
+ const auto ret = outStream.read(this->flow_uuid_);
+ if (ret == 0 || io::isError(ret)) {
+ return false;
+ }
}
- ret = outStream.read(this->_details);
- if (ret <= 0) {
- return false;
+ {
+ const auto ret = outStream.read(this->_details);
+ if (ret == 0 || io::isError(ret)) {
+ return false;
+ }
}
// read flow attributes
uint32_t numAttributes = 0;
- ret = outStream.read(numAttributes);
- if (ret != 4) {
- return false;
+ {
+ const auto ret = outStream.read(numAttributes);
+ if (ret != 4) {
+ return false;
+ }
}
for (uint32_t i = 0; i < numAttributes; i++) {
std::string key;
- ret = outStream.read(key);
- if (ret <= 0) {
- return false;
+ {
+ const auto ret = outStream.read(key);
+ if (ret == 0 || io::isError(ret)) {
+ return false;
+ }
}
std::string value;
- ret = outStream.read(value);
- if (ret <= 0) {
- return false;
+ {
+ const auto ret = outStream.read(value);
+ if (ret == 0 || io::isError(ret)) {
+ return false;
+ }
}
this->_attributes[key] = value;
}
- ret = outStream.read(this->_contentFullPath);
- if (ret <= 0) {
- return false;
+ {
+ const auto ret = outStream.read(this->_contentFullPath);
+ if (ret == 0 || io::isError(ret)) {
+ return false;
+ }
}
- ret = outStream.read(this->_size);
- if (ret != 8) {
- return false;
+ {
+ const auto ret = outStream.read(this->_size);
+ if (ret != 8) {
+ return false;
+ }
}
- ret = outStream.read(this->_offset);
- if (ret != 8) {
- return false;
+ {
+ const auto ret = outStream.read(this->_offset);
+ if (ret != 8) {
+ return false;
+ }
}
- ret = outStream.read(this->_sourceQueueIdentifier);
- if (ret <= 0) {
- return false;
+ {
+ const auto ret = outStream.read(this->_sourceQueueIdentifier);
+ if (ret == 0 || io::isError(ret)) {
+ return false;
+ }
}
if (this->_eventType == ProvenanceEventRecord::FORK || this->_eventType == ProvenanceEventRecord::CLONE || this->_eventType == ProvenanceEventRecord::JOIN) {
// read UUIDs
uint32_t number = 0;
- ret = outStream.read(number);
- if (ret != 4) {
- return false;
+ {
+ const auto ret = outStream.read(number);
+ if (ret != 4) {
+ return false;
+ }
}
for (uint32_t i = 0; i < number; i++) {
utils::Identifier parentUUID;
- ret = outStream.read(parentUUID);
- if (ret <= 0) {
- return false;
+ {
+ const auto ret = outStream.read(parentUUID);
+ if (ret == 0 || io::isError(ret)) {
+ return false;
+ }
}
this->addParentUuid(parentUUID);
}
number = 0;
- ret = outStream.read(number);
- if (ret != 4) {
- return false;
+ {
+ const auto ret = outStream.read(number);
+ if (ret != 4) {
+ return false;
+ }
}
for (uint32_t i = 0; i < number; i++) {
utils::Identifier childUUID;
- ret = outStream.read(childUUID);
- if (ret <= 0) {
- return false;
+ {
+ const auto ret = outStream.read(childUUID);
+ if (ret == 0 || io::isError(ret)) {
+ return false;
+ }
}
this->addChildUuid(childUUID);
}
} else if (this->_eventType == ProvenanceEventRecord::SEND || this->_eventType == ProvenanceEventRecord::FETCH) {
- ret = outStream.read(this->_transitUri);
- if (ret <= 0) {
- return false;
+ {
+ const auto ret = outStream.read(this->_transitUri);
+ if (ret == 0 || io::isError(ret)) {
+ return false;
+ }
}
} else if (this->_eventType == ProvenanceEventRecord::RECEIVE) {
- ret = outStream.read(this->_transitUri);
- if (ret <= 0) {
- return false;
+ {
+ const auto ret = outStream.read(this->_transitUri);
+ if (ret == 0 || io::isError(ret)) {
+ return false;
+ }
}
- ret = outStream.read(this->_sourceSystemFlowFileIdentifier);
- if (ret <= 0) {
- return false;
+ {
+ const auto ret = outStream.read(this->_sourceSystemFlowFileIdentifier);
+ if (ret == 0 || io::isError(ret)) {
+ return false;
+ }
}
}
diff --git a/libminifi/src/sitetosite/RawSocketProtocol.cpp b/libminifi/src/sitetosite/RawSocketProtocol.cpp
index 7ea213f..b60b2f4 100644
--- a/libminifi/src/sitetosite/RawSocketProtocol.cpp
+++ b/libminifi/src/sitetosite/RawSocketProtocol.cpp
@@ -114,28 +114,31 @@ bool RawSiteToSiteClient::initiateResourceNegotiation() {
logger_->log_debug("Negotiate protocol version with destination port %s current version %d", port_id_.to_string(), _currentVersion);
- int ret = peer_->write(getResourceName());
-
- logger_->log_trace("result of writing resource name is %i", ret);
- if (ret <= 0) {
- logger_->log_debug("result of writing resource name is %i", ret);
- // tearDown();
- return false;
+ {
+ const auto ret = peer_->write(getResourceName());
+ logger_->log_trace("result of writing resource name is %i", ret);
+ if (ret <= 0) {
+ logger_->log_debug("result of writing resource name is %i", ret);
+ // tearDown();
+ return false;
+ }
}
- ret = peer_->write(_currentVersion);
-
- if (ret <= 0) {
- logger_->log_debug("result of writing version is %i", ret);
- return false;
+ {
+ const auto ret = peer_->write(_currentVersion);
+ if (ret <= 0) {
+ logger_->log_debug("result of writing version is %i", ret);
+ return false;
+ }
}
uint8_t statusCode;
- ret = peer_->read(statusCode);
-
- if (ret <= 0) {
- logger_->log_debug("result of writing version status code %i", ret);
- return false;
+ {
+ const auto ret = peer_->read(statusCode);
+ if (ret == 0 || io::isError(ret)) {
+ logger_->log_debug("result of writing version status code %i", ret);
+ return false;
+ }
}
logger_->log_debug("status code is %i", statusCode);
switch (statusCode) {
@@ -144,9 +147,11 @@ bool RawSiteToSiteClient::initiateResourceNegotiation() {
return true;
case DIFFERENT_RESOURCE_VERSION:
uint32_t serverVersion;
- ret = peer_->read(serverVersion);
- if (ret <= 0) {
- return false;
+ {
+ const auto ret = peer_->read(serverVersion);
+ if (ret == 0 || io::isError(ret)) {
+ return false;
+ }
}
logging::LOG_INFO(logger_) << "Site2Site Server Response asked for a different protocol version " << serverVersion;
@@ -178,36 +183,40 @@ bool RawSiteToSiteClient::initiateCodecResourceNegotiation() {
logger_->log_trace("Negotiate Codec version with destination port %s current version %d", port_id_.to_string(), _currentCodecVersion);
- int ret = peer_->write(getCodecResourceName());
-
- if (ret <= 0) {
- logger_->log_debug("result of getCodecResourceName is %i", ret);
- return false;
+ {
+ const auto ret = peer_->write(getCodecResourceName());
+ if (ret <= 0) {
+ logger_->log_debug("result of getCodecResourceName is %i", ret);
+ return false;
+ }
}
- ret = peer_->write(_currentCodecVersion);
-
- if (ret <= 0) {
- logger_->log_debug("result of _currentCodecVersion is %i", ret);
- return false;
+ {
+ const auto ret = peer_->write(_currentCodecVersion);
+ if (ret <= 0) {
+ logger_->log_debug("result of _currentCodecVersion is %i", ret);
+ return false;
+ }
}
uint8_t statusCode;
- ret = peer_->read(statusCode);
-
- if (ret <= 0) {
- return false;
+ {
+ const auto ret = peer_->read(statusCode);
+ if (ret == 0 || io::isError(ret)) {
+ return false;
+ }
}
-
switch (statusCode) {
case RESOURCE_OK:
logger_->log_trace("Site2Site Codec Negotiate version OK");
return true;
case DIFFERENT_RESOURCE_VERSION:
uint32_t serverVersion;
- ret = peer_->read(serverVersion);
- if (ret <= 0) {
- return false;
+ {
+ const auto ret = peer_->read(serverVersion);
+ if (ret == 0 || io::isError(ret)) {
+ return false;
+ }
}
logging::LOG_INFO(logger_) << "Site2Site Server Response asked for a different protocol version " << serverVersion;
@@ -237,10 +246,11 @@ bool RawSiteToSiteClient::handShake() {
logger_->log_debug("Site2Site Protocol Perform hand shake with destination port %s", port_id_.to_string());
_commsIdentifier = id_generator_->generate();
- int ret = peer_->write(_commsIdentifier);
-
- if (ret <= 0) {
- return false;
+ {
+ const auto ret = peer_->write(_commsIdentifier);
+ if (ret <= 0) {
+ return false;
+ }
}
std::map<std::string, std::string> properties;
@@ -257,27 +267,33 @@ bool RawSiteToSiteClient::handShake() {
}
if (_currentVersion >= 3) {
- ret = peer_->write(peer_->getURL());
+ const auto ret = peer_->write(peer_->getURL());
if (ret <= 0) {
return false;
}
}
- uint32_t size = gsl::narrow<uint32_t>(properties.size());
- ret = peer_->write(size);
- if (ret <= 0) {
- return false;
+ {
+ const auto size = gsl::narrow<uint32_t>(properties.size());
+ const auto ret = peer_->write(size);
+ if (ret <= 0) {
+ return false;
+ }
}
std::map<std::string, std::string>::iterator it;
for (it = properties.begin(); it != properties.end(); it++) {
- ret = peer_->write(it->first);
- if (ret <= 0) {
- return false;
+ {
+ const auto ret = peer_->write(it->first);
+ if (ret <= 0) {
+ return false;
+ }
}
- ret = peer_->write(it->second);
- if (ret <= 0) {
- return false;
+ {
+ const auto ret = peer_->write(it->second);
+ if (ret <= 0) {
+ return false;
+ }
}
logger_->log_debug("Site2Site Protocol Send handshake properties %s %s", it->first, it->second);
}
@@ -285,10 +301,11 @@ bool RawSiteToSiteClient::handShake() {
RespondCode code;
std::string message;
- ret = readRespond(nullptr, code, message);
-
- if (ret <= 0) {
- return false;
+ {
+ const auto ret = readRespond(nullptr, code, message);
+ if (ret <= 0) {
+ return false;
+ }
}
std::string error;
@@ -310,13 +327,11 @@ bool RawSiteToSiteClient::handShake() {
// Unknown error
default:
logger_->log_error("HandShake Failed because of unknown respond code %d", code);
- ret = -1;
return false;
}
// All known error cases handled here
logger_->log_error("Site2Site HandShake Failed because destination port, %s, is %s", port_id_.to_string(), error);
- ret = -1;
return false;
}
@@ -334,48 +349,54 @@ void RawSiteToSiteClient::tearDown() {
bool RawSiteToSiteClient::getPeerList(std::vector<PeerStatus> &peers) {
if (establish() && handShake()) {
- int status = writeRequestType(REQUEST_PEER_LIST);
-
- if (status <= 0) {
+ if (writeRequestType(REQUEST_PEER_LIST) <= 0) {
tearDown();
return false;
}
- uint32_t number;
- status = peer_->read(number);
-
- if (status <= 0) {
- tearDown();
- return false;
+ uint32_t number_of_peers;
+ {
+ const auto ret = peer_->read(number_of_peers);
+ if (ret == 0 || io::isError(ret)) {
+ tearDown();
+ return false;
+ }
}
- for (uint32_t i = 0; i < number; i++) {
+ for (uint32_t i = 0; i < number_of_peers; i++) {
std::string host;
- status = peer_->read(host);
- if (status <= 0) {
- tearDown();
- return false;
+ {
+ const auto ret = peer_->read(host);
+ if (ret == 0 || io::isError(ret)) {
+ tearDown();
+ return false;
+ }
}
uint32_t port;
- status = peer_->read(port);
- if (status <= 0) {
- tearDown();
- return false;
+ {
+ const auto ret = peer_->read(port);
+ if (ret == 0 || io::isError(ret)) {
+ tearDown();
+ return false;
+ }
}
uint8_t secure;
- status = peer_->read(secure);
- if (status <= 0) {
- tearDown();
- return false;
+ {
+ const auto ret = peer_->read(secure);
+ if (ret == 0 || io::isError(ret)) {
+ tearDown();
+ return false;
+ }
}
uint32_t count;
- status = peer_->read(count);
- if (status <= 0) {
- tearDown();
- return false;
+ {
+ const auto ret = peer_->read(count);
+ if (ret == 0 || io::isError(ret)) {
+ tearDown();
+ return false;
+ }
}
- PeerStatus status(std::make_shared<Peer>(port_id_, host, gsl::narrow<uint16_t>(port), secure != 0), count, true);
- peers.push_back(std::move(status));
+ peers.push_back(PeerStatus{std::make_shared<Peer>(port_id_, host, gsl::narrow<uint16_t>(port), secure != 0), count, true});
logging::LOG_TRACE(logger_) << "Site2Site Peer host " << host << " port " << port << " Secure " << std::to_string(secure);
}
@@ -397,15 +418,14 @@ bool RawSiteToSiteClient::getPeerList(std::vector<PeerStatus> &peers) {
int RawSiteToSiteClient::readRequestType(RequestType &type) {
std::string requestTypeStr;
- int ret = peer_->read(requestTypeStr);
-
- if (ret <= 0)
- return ret;
+ const auto ret = peer_->read(requestTypeStr);
+ if (ret == 0 || io::isError(ret))
+ return static_cast<int>(ret);
for (int i = NEGOTIATE_FLOWFILE_CODEC; i <= SHUTDOWN; i++) {
if (SiteToSiteRequest::RequestTypeStr[i] == requestTypeStr) {
type = (RequestType) i;
- return ret;
+ return static_cast<int>(ret);
}
}
@@ -417,7 +437,7 @@ int RawSiteToSiteClient::readRespond(const std::shared_ptr<Transaction> &transac
}
int RawSiteToSiteClient::writeRespond(const std::shared_ptr<Transaction> &transaction, RespondCode code, std::string message) {
- return writeResponse(transaction, code, message);
+ return writeResponse(transaction, code, std::move(message));
}
bool RawSiteToSiteClient::negotiateCodec() {
diff --git a/libminifi/src/sitetosite/SiteToSiteClient.cpp b/libminifi/src/sitetosite/SiteToSiteClient.cpp
index 1833144..60cef54 100644
--- a/libminifi/src/sitetosite/SiteToSiteClient.cpp
+++ b/libminifi/src/sitetosite/SiteToSiteClient.cpp
@@ -31,47 +31,43 @@ namespace sitetosite {
int SiteToSiteClient::readResponse(const std::shared_ptr<Transaction>& /*transaction*/, RespondCode &code, std::string &message) {
uint8_t firstByte;
-
- int ret = peer_->read(firstByte);
-
- if (ret <= 0 || firstByte != CODE_SEQUENCE_VALUE_1)
- return -1;
+ {
+ const auto ret = peer_->read(firstByte);
+ if (ret == 0 || io::isError(ret) || firstByte != CODE_SEQUENCE_VALUE_1)
+ return -1;
+ }
uint8_t secondByte;
-
- ret = peer_->read(secondByte);
-
- if (ret <= 0 || secondByte != CODE_SEQUENCE_VALUE_2)
- return -1;
+ {
+ const auto ret = peer_->read(secondByte);
+ if (ret == 0 || io::isError(ret) || secondByte != CODE_SEQUENCE_VALUE_2)
+ return -1;
+ }
uint8_t thirdByte;
-
- ret = peer_->read(thirdByte);
-
- if (ret <= 0)
- return ret;
+ {
+ const auto ret = peer_->read(thirdByte);
+ if (ret == 0 || io::isError(ret))
+ return static_cast<int>(ret);
+ }
code = (RespondCode) thirdByte;
-
RespondCodeContext *resCode = this->getRespondCodeContext(code);
-
- if (resCode == NULL) {
- // Not a valid respond code
+ if (!resCode) {
return -1;
}
if (resCode->hasDescription) {
- ret = peer_->read(message);
- if (ret <= 0)
+ const auto ret = peer_->read(message);
+ if (ret == 0 || io::isError(ret))
return -1;
}
return gsl::narrow<int>(3 + message.size());
}
void SiteToSiteClient::deleteTransaction(const utils::Identifier& transactionID) {
- std::shared_ptr<Transaction> transaction = NULL;
+ std::shared_ptr<Transaction> transaction;
auto it = this->known_transactions_.find(transactionID);
-
if (it == known_transactions_.end()) {
return;
} else {
@@ -85,7 +81,7 @@ void SiteToSiteClient::deleteTransaction(const utils::Identifier& transactionID)
int SiteToSiteClient::writeResponse(const std::shared_ptr<Transaction>& /*transaction*/, RespondCode code, std::string message) {
RespondCodeContext *resCode = this->getRespondCodeContext(code);
- if (resCode == NULL) {
+ if (!resCode) {
// Not a valid respond code
return -1;
}
@@ -205,7 +201,7 @@ bool SiteToSiteClient::transferFlowFiles(const std::shared_ptr<core::ProcessCont
bool SiteToSiteClient::confirm(const utils::Identifier& transactionID) {
int ret;
- std::shared_ptr<Transaction> transaction = NULL;
+ std::shared_ptr<Transaction> transaction;
if (peer_state_ != READY) {
bootstrap();
@@ -529,8 +525,7 @@ int16_t SiteToSiteClient::send(const utils::Identifier &transactionID, DataPacke
}
bool SiteToSiteClient::receive(const utils::Identifier& transactionID, DataPacket *packet, bool &eof) {
- int ret;
- std::shared_ptr<Transaction> transaction = NULL;
+ std::shared_ptr<Transaction> transaction;
if (peer_state_ != READY) {
bootstrap();
@@ -568,9 +563,7 @@ bool SiteToSiteClient::receive(const utils::Identifier& transactionID, DataPacke
RespondCode code;
std::string message;
- ret = readResponse(transaction, code, message);
-
- if (ret <= 0) {
+ if (readResponse(transaction, code, message) <= 0) {
return false;
}
if (code == CONTINUE_TRANSACTION) {
@@ -595,9 +588,11 @@ bool SiteToSiteClient::receive(const utils::Identifier& transactionID, DataPacke
// start to read the packet
uint32_t numAttributes;
- ret = transaction->getStream().read(numAttributes);
- if (ret <= 0 || numAttributes > MAX_NUM_ATTRIBUTES) {
- return false;
+ {
+ const auto ret = transaction->getStream().read(numAttributes);
+ if (ret == 0 || io::isError(ret) || numAttributes > MAX_NUM_ATTRIBUTES) {
+ return false;
+ }
}
// read the attributes
@@ -605,22 +600,28 @@ bool SiteToSiteClient::receive(const utils::Identifier& transactionID, DataPacke
for (unsigned int i = 0; i < numAttributes; i++) {
std::string key;
std::string value;
- ret = transaction->getStream().read(key, true);
- if (ret <= 0) {
- return false;
+ {
+ const auto ret = transaction->getStream().read(key, true);
+ if (ret == 0 || io::isError(ret)) {
+ return false;
+ }
}
- ret = transaction->getStream().read(value, true);
- if (ret <= 0) {
- return false;
+ {
+ const auto ret = transaction->getStream().read(value, true);
+ if (ret == 0 || io::isError(ret)) {
+ return false;
+ }
}
packet->_attributes[key] = value;
logger_->log_debug("Site2Site transaction %s receives attribute key %s value %s", transactionID.to_string(), key, value);
}
uint64_t len;
- ret = transaction->getStream().read(len);
- if (ret <= 0) {
- return false;
+ {
+ const auto ret = transaction->getStream().read(len);
+ if (ret == 0 || io::isError(ret)) {
+ return false;
+ }
}
packet->_size = len;
diff --git a/libminifi/src/utils/ByteArrayCallback.cpp b/libminifi/src/utils/ByteArrayCallback.cpp
index 33c9634..a64c3ac 100644
--- a/libminifi/src/utils/ByteArrayCallback.cpp
+++ b/libminifi/src/utils/ByteArrayCallback.cpp
@@ -35,10 +35,10 @@ int64_t ByteOutputCallback::process(const std::shared_ptr<io::BaseStream>& strea
if (stream->size() > 0) {
std::unique_ptr<char> buffer = std::unique_ptr<char>(new char[stream->size()]);
readFully(buffer.get(), stream->size());
- stream->read(reinterpret_cast<uint8_t*>(buffer.get()), gsl::narrow<int>(stream->size()));
- return stream->size();
+ stream->read(reinterpret_cast<uint8_t*>(buffer.get()), stream->size());
+ return gsl::narrow<int64_t>(stream->size());
}
- return size_.load();
+ return gsl::narrow<int64_t>(size_.load());
}
int64_t StreamOutputCallback::process(const std::shared_ptr<io::BaseStream>& stream) {
@@ -46,7 +46,7 @@ int64_t StreamOutputCallback::process(const std::shared_ptr<io::BaseStream>& str
std::unique_ptr<char> buffer = std::unique_ptr<char>(new char[size_.load()]);
auto written = readFully(buffer.get(), size_);
stream->write(reinterpret_cast<uint8_t*>(buffer.get()), gsl::narrow<int>(written));
- return stream->size();
+ return gsl::narrow<int64_t>(stream->size());
}
void StreamOutputCallback::write(char *data, size_t size) {
@@ -55,7 +55,7 @@ void StreamOutputCallback::write(char *data, size_t size) {
write_and_notify(data, size);
}
-const std::vector<char> ByteOutputCallback::to_string() {
+std::vector<char> ByteOutputCallback::to_string() {
std::vector<char> buffer;
buffer.resize(size_.load());
readFully(buffer.data(), size_.load());
diff --git a/libminifi/test/BufferReader.h b/libminifi/test/BufferReader.h
index df39220..8ce8ff2 100644
--- a/libminifi/test/BufferReader.h
+++ b/libminifi/test/BufferReader.h
@@ -24,20 +24,21 @@
#include <vector>
#include "FlowFileRecord.h"
+#include "Stream.h"
#include "utils/gsl.h"
class BufferReader : public org::apache::nifi::minifi::InputStreamCallback {
public:
explicit BufferReader(std::vector<uint8_t>& buffer) : buffer_(buffer) {}
- int write(org::apache::nifi::minifi::io::BaseStream& input, std::size_t len) {
+ size_t write(org::apache::nifi::minifi::io::BaseStream& input, std::size_t len) {
uint8_t tmpBuffer[4096]{};
std::size_t remaining_len = len;
- int total_read = 0;
+ size_t total_read = 0;
while (remaining_len > 0) {
- auto ret = input.read(tmpBuffer, gsl::narrow<int>(std::min(remaining_len, sizeof(tmpBuffer))));
+ const auto ret = input.read(tmpBuffer, std::min(remaining_len, sizeof(tmpBuffer)));
if (ret == 0) break;
- if (ret < 0) return ret;
+ if (minifi::io::isError(ret)) return ret;
remaining_len -= ret;
total_read += ret;
auto prevSize = buffer_.size();
@@ -48,7 +49,8 @@ class BufferReader : public org::apache::nifi::minifi::InputStreamCallback {
}
int64_t process(const std::shared_ptr<org::apache::nifi::minifi::io::BaseStream>& stream) {
- return write(*stream.get(), stream->size());
+ const auto write_result = write(*stream.get(), stream->size());
+ return minifi::io::isError(write_result) ? -1 : gsl::narrow<int64_t>(write_result);
}
private:
diff --git a/libminifi/test/archive-tests/CompressContentTests.cpp b/libminifi/test/archive-tests/CompressContentTests.cpp
index 76507df..de1a774 100644
--- a/libminifi/test/archive-tests/CompressContentTests.cpp
+++ b/libminifi/test/archive-tests/CompressContentTests.cpp
@@ -41,29 +41,28 @@
#include "processors/PutFile.h"
#include "utils/file/FileUtils.h"
#include "../Utils.h"
+#include "utils/gsl.h"
class ReadCallback: public minifi::InputStreamCallback {
public:
- explicit ReadCallback(size_t size) :
- read_size_(0) {
- buffer_size_ = size;
- buffer_ = new uint8_t[buffer_size_];
- archive_buffer_ = nullptr;
- archive_buffer_size_ = 0;
- }
- ~ReadCallback() {
- if (buffer_)
- delete[] buffer_;
- if (archive_buffer_)
- delete[] archive_buffer_;
- }
- int64_t process(const std::shared_ptr<minifi::io::BaseStream>& stream) {
+ explicit ReadCallback(size_t size)
+ :buffer_size_{size} // default member initializers use this
+ {}
+ ReadCallback(const ReadCallback&) = delete;
+ ReadCallback(ReadCallback&&) = delete;
+ ReadCallback& operator=(const ReadCallback&) = delete;
+ ReadCallback& operator=(ReadCallback&&) = delete;
+
+ ~ReadCallback() override {
+ delete[] buffer_;
+ delete[] archive_buffer_;
+ }
+ int64_t process(const std::shared_ptr<minifi::io::BaseStream>& stream) override {
int64_t total_read = 0;
- int64_t ret = 0;
do {
- ret = stream->read(buffer_ + read_size_, gsl::narrow<int>(buffer_size_ - read_size_));
+ const auto ret = stream->read(buffer_ + read_size_, buffer_size_ - read_size_);
if (ret == 0) break;
- if (ret < 0) return ret;
+ if (minifi::io::isError(ret)) return -1;
read_size_ += gsl::narrow<size_t>(ret);
total_read += ret;
} while (buffer_size_ != read_size_);
@@ -78,18 +77,18 @@ class ReadCallback: public minifi::InputStreamCallback {
struct archive_entry *ae;
REQUIRE(archive_read_next_header(a, &ae) == ARCHIVE_OK);
- int size = gsl::narrow<int>(archive_entry_size(ae));
+ const auto size = archive_entry_size(ae);
archive_buffer_ = new char[size];
archive_buffer_size_ = size;
- archive_read_data(a, archive_buffer_, size);
+ archive_read_data(a, archive_buffer_, gsl::narrow<size_t>(size));
archive_read_free(a);
}
- uint8_t *buffer_;
size_t buffer_size_;
- size_t read_size_;
- char *archive_buffer_;
- int archive_buffer_size_;
+ uint8_t *buffer_ = new uint8_t[buffer_size_];
+ size_t read_size_ = 0;
+ char *archive_buffer_ = nullptr;
+ int64_t archive_buffer_size_ = 0;
};
/**
diff --git a/libminifi/test/archive-tests/MergeFileTests.cpp b/libminifi/test/archive-tests/MergeFileTests.cpp
index 60e233f..4ad473d 100644
--- a/libminifi/test/archive-tests/MergeFileTests.cpp
+++ b/libminifi/test/archive-tests/MergeFileTests.cpp
@@ -88,9 +88,9 @@ class FixedBuffer : public minifi::InputStreamCallback {
REQUIRE(size_ + len <= capacity_);
int total_read = 0;
do {
- auto ret = input.read(end(), gsl::narrow<int>(len));
+ const auto ret = input.read(end(), len);
if (ret == 0) break;
- if (ret < 0) return ret;
+ if (minifi::io::isError(ret)) return -1;
size_ += ret;
len -= ret;
total_read += ret;
@@ -98,7 +98,7 @@ class FixedBuffer : public minifi::InputStreamCallback {
return total_read;
}
int64_t process(const std::shared_ptr<minifi::io::BaseStream>& stream) {
- return write(*stream.get(), capacity_);
+ return write(*stream, capacity_);
}
private:
@@ -111,8 +111,9 @@ std::vector<FixedBuffer> read_archives(const FixedBuffer& input) {
class ArchiveEntryReader {
public:
explicit ArchiveEntryReader(archive* arch) : arch(arch) {}
- int read(uint8_t* out, std::size_t len) {
- return gsl::narrow<int>(archive_read_data(arch, out, len));
+ size_t read(uint8_t* out, std::size_t len) {
+ const auto ret = archive_read_data(arch, out, len);
+ return ret < 0 ? minifi::io::STREAM_ERROR : gsl::narrow<size_t>(ret);
}
private:
archive* arch;
diff --git a/libminifi/test/rocksdb-tests/ContentSessionTests.cpp b/libminifi/test/rocksdb-tests/ContentSessionTests.cpp
index 059dfa9..f63947b 100644
--- a/libminifi/test/rocksdb-tests/ContentSessionTests.cpp
+++ b/libminifi/test/rocksdb-tests/ContentSessionTests.cpp
@@ -56,12 +56,10 @@ const std::shared_ptr<minifi::io::BaseStream>& operator>>(const std::shared_ptr<
str = "";
uint8_t buffer[4096]{};
while (true) {
- auto ret = stream->read(buffer, sizeof(buffer));
- REQUIRE(ret >= 0);
- if (ret == 0) {
- break;
- }
- str += std::string{reinterpret_cast<char*>(buffer), static_cast<std::size_t>(ret)};
+ const auto ret = stream->read(buffer, sizeof(buffer));
+ REQUIRE(!minifi::io::isError(ret));
+ if (ret == 0) { break; }
+ str += std::string{reinterpret_cast<char*>(buffer), ret};
}
return stream;
}
diff --git a/libminifi/test/rocksdb-tests/DBContentRepositoryTests.cpp b/libminifi/test/rocksdb-tests/DBContentRepositoryTests.cpp
index 6409f3e..712d9f8 100644
--- a/libminifi/test/rocksdb-tests/DBContentRepositoryTests.cpp
+++ b/libminifi/test/rocksdb-tests/DBContentRepositoryTests.cpp
@@ -103,8 +103,8 @@ TEST_CASE("Delete Claim", "[TestDBCR2]") {
std::string readstr;
- // -1 tell us we have an invalid stream
- REQUIRE(read_stream->read(readstr) == -1);
+ // error tells us we have an invalid stream
+ REQUIRE(minifi::io::isError(read_stream->read(readstr)));
}
TEST_CASE("Test Empty Claim", "[TestDBCR3]") {
@@ -139,8 +139,8 @@ TEST_CASE("Test Empty Claim", "[TestDBCR3]") {
std::string readstr;
- // -1 tell us we have an invalid stream
- REQUIRE(read_stream->read(readstr) == -1);
+ // error tells us we have an invalid stream
+ REQUIRE(minifi::io::isError(read_stream->read(readstr)));
}
TEST_CASE("Delete NonExistent Claim", "[TestDBCR4]") {
@@ -179,7 +179,6 @@ TEST_CASE("Delete NonExistent Claim", "[TestDBCR4]") {
std::string readstr;
- // -1 tell us we have an invalid stream
read_stream->read(readstr);
REQUIRE(readstr == "well hello there");
@@ -227,7 +226,6 @@ TEST_CASE("Delete Remove Count Claim", "[TestDBCR5]") {
std::string readstr;
- // -1 tell us we have an invalid stream
read_stream->read(readstr);
REQUIRE(readstr == "well hello there");
diff --git a/libminifi/test/rocksdb-tests/RocksDBStreamTests.cpp b/libminifi/test/rocksdb-tests/RocksDBStreamTests.cpp
index 88b3929..69336b8 100644
--- a/libminifi/test/rocksdb-tests/RocksDBStreamTests.cpp
+++ b/libminifi/test/rocksdb-tests/RocksDBStreamTests.cpp
@@ -72,5 +72,5 @@ TEST_CASE_METHOD(RocksDBStreamTest, "Read zero bytes") {
minifi::io::RocksDbStream nonExistingStream("two", gsl::make_not_null(db.get()));
- REQUIRE(nonExistingStream.read(nullptr, 0) == -1);
+ REQUIRE(minifi::io::isError(nonExistingStream.read(nullptr, 0)));
}
diff --git a/libminifi/test/unit/FileStreamTests.cpp b/libminifi/test/unit/FileStreamTests.cpp
index b13dcfa..57bae57 100644
--- a/libminifi/test/unit/FileStreamTests.cpp
+++ b/libminifi/test/unit/FileStreamTests.cpp
@@ -45,7 +45,7 @@ TEST_CASE("TestFileOverWrite", "[TestFiles]") {
minifi::io::FileStream stream(path, 0, true);
std::vector<uint8_t> readBuffer;
- REQUIRE(stream.read(readBuffer, gsl::narrow<int>(stream.size())) == stream.size());
+ REQUIRE(stream.read(readBuffer, stream.size()) == stream.size());
uint8_t* data = readBuffer.data();
@@ -59,7 +59,7 @@ TEST_CASE("TestFileOverWrite", "[TestFiles]") {
std::vector<uint8_t> verifybuffer;
- REQUIRE(stream.read(verifybuffer, gsl::narrow<int>(stream.size())) == stream.size());
+ REQUIRE(stream.read(verifybuffer, stream.size()) == stream.size());
data = verifybuffer.data();
@@ -83,7 +83,7 @@ TEST_CASE("TestFileBadArgumentNoChange", "[TestLoader]") {
minifi::io::FileStream stream(path, 0, true);
std::vector<uint8_t> readBuffer;
- REQUIRE(stream.read(readBuffer, gsl::narrow<int>(stream.size())) == stream.size());
+ REQUIRE(stream.read(readBuffer, stream.size()) == stream.size());
uint8_t* data = readBuffer.data();
@@ -97,7 +97,7 @@ TEST_CASE("TestFileBadArgumentNoChange", "[TestLoader]") {
std::vector<uint8_t> verifybuffer;
- REQUIRE(stream.read(verifybuffer, gsl::narrow<int>(stream.size())) == stream.size());
+ REQUIRE(stream.read(verifybuffer, stream.size()) == stream.size());
data = verifybuffer.data();
@@ -121,7 +121,7 @@ TEST_CASE("TestFileBadArgumentNoChange2", "[TestLoader]") {
minifi::io::FileStream stream(path, 0, true);
std::vector<uint8_t> readBuffer;
- REQUIRE(stream.read(readBuffer, gsl::narrow<int>(stream.size())) == stream.size());
+ REQUIRE(stream.read(readBuffer, stream.size()) == stream.size());
uint8_t* data = readBuffer.data();
@@ -135,7 +135,7 @@ TEST_CASE("TestFileBadArgumentNoChange2", "[TestLoader]") {
std::vector<uint8_t> verifybuffer;
- REQUIRE(stream.read(verifybuffer, gsl::narrow<int>(stream.size())) == stream.size());
+ REQUIRE(stream.read(verifybuffer, stream.size()) == stream.size());
data = verifybuffer.data();
@@ -159,7 +159,7 @@ TEST_CASE("TestFileBadArgumentNoChange3", "[TestLoader]") {
minifi::io::FileStream stream(path, 0, true);
std::vector<uint8_t> readBuffer;
- REQUIRE(stream.read(readBuffer, gsl::narrow<int>(stream.size())) == stream.size());
+ REQUIRE(stream.read(readBuffer, stream.size()) == stream.size());
uint8_t* data = readBuffer.data();
@@ -173,11 +173,11 @@ TEST_CASE("TestFileBadArgumentNoChange3", "[TestLoader]") {
std::vector<uint8_t> verifybuffer;
- REQUIRE(stream.read(nullptr, gsl::narrow<int>(stream.size())) == -1);
+ REQUIRE(minifi::io::isError(stream.read(nullptr, stream.size())));
data = verifybuffer.data();
- REQUIRE(std::string(reinterpret_cast<char*>(data), verifybuffer.size()) == "");
+ REQUIRE(std::string(reinterpret_cast<char*>(data), verifybuffer.size()).empty());
std::remove(ss.str().c_str());
}
@@ -197,7 +197,7 @@ TEST_CASE("TestFileBeyondEnd3", "[TestLoader]") {
minifi::io::FileStream stream(path, 0, true);
std::vector<uint8_t> readBuffer;
- REQUIRE(stream.read(readBuffer, gsl::narrow<int>(stream.size())) == stream.size());
+ REQUIRE(stream.read(readBuffer, stream.size()) == stream.size());
uint8_t* data = readBuffer.data();
@@ -232,7 +232,7 @@ TEST_CASE("TestFileExceedSize", "[TestLoader]") {
minifi::io::FileStream stream(path, 0, true);
std::vector<uint8_t> readBuffer;
- REQUIRE(stream.read(readBuffer, gsl::narrow<int>(stream.size())) == stream.size());
+ REQUIRE(stream.read(readBuffer, stream.size()) == stream.size());
uint8_t* data = readBuffer.data();
@@ -280,7 +280,7 @@ TEST_CASE("Non-existing file read/write test") {
REQUIRE(test_controller.getLog().getInstance().contains("Error writing to file: invalid file stream", std::chrono::seconds(0)));
std::vector<uint8_t> readBuffer;
stream.seek(0);
- REQUIRE(stream.read(readBuffer, 1) == -1);
+ REQUIRE(minifi::io::isError(stream.read(readBuffer, 1)));
REQUIRE(test_controller.getLog().getInstance().contains("Error reading from file: invalid file stream", std::chrono::seconds(0)));
}
@@ -300,10 +300,10 @@ TEST_CASE("Existing file read/write test") {
REQUIRE_FALSE(test_controller.getLog().getInstance().contains("Error writing to file", std::chrono::seconds(0)));
std::vector<uint8_t> readBuffer;
stream.seek(0);
- REQUIRE_FALSE(stream.read(readBuffer, 11) == -1);
+ REQUIRE_FALSE(minifi::io::isError(stream.read(readBuffer, 11)));
REQUIRE_FALSE(test_controller.getLog().getInstance().contains("Error reading from file", std::chrono::seconds(0)));
stream.seek(0);
- REQUIRE(stream.read(nullptr, 11) == -1);
+ REQUIRE(minifi::io::isError(stream.read(nullptr, 11)));
REQUIRE(test_controller.getLog().getInstance().contains("Error reading from file: invalid buffer", std::chrono::seconds(0)));
}
diff --git a/libminifi/test/unit/SiteToSiteHelper.h b/libminifi/test/unit/SiteToSiteHelper.h
index b0c9139..6e913b9 100644
--- a/libminifi/test/unit/SiteToSiteHelper.h
+++ b/libminifi/test/unit/SiteToSiteHelper.h
@@ -60,7 +60,7 @@ class SiteToSiteResponder : public minifi::io::BaseStream {
* @param len length to read
* @return resulting read size
**/
- int read(uint8_t *value, int len) override {
+ size_t read(uint8_t *value, size_t len) override {
return server_responses_.read(value, len);
}
};
diff --git a/nanofi/src/api/nanofi.cpp b/nanofi/src/api/nanofi.cpp
index bdb7e37..d3808a1 100644
--- a/nanofi/src/api/nanofi.cpp
+++ b/nanofi/src/api/nanofi.cpp
@@ -459,7 +459,7 @@ int get_content(const flow_file_record* ff, uint8_t* target, int size) {
std::shared_ptr<minifi::ResourceClaim> claim = std::make_shared<minifi::ResourceClaim>(ff->contentLocation,
*content_repo);
auto stream = (*content_repo)->read(*claim);
- return stream->read(target, size);
+ return static_cast<int>(stream->read(target, static_cast<size_t>(size)));
} else {
file_buffer fb = file_to_buffer(ff->contentLocation);
if (size < 0) {
diff --git a/nanofi/tests/CSite2SiteTests.cpp b/nanofi/tests/CSite2SiteTests.cpp
index e5af7b7..3db3f1c 100644
--- a/nanofi/tests/CSite2SiteTests.cpp
+++ b/nanofi/tests/CSite2SiteTests.cpp
@@ -39,6 +39,7 @@
#include "core/cstructs.h"
#include "RandomServerSocket.h"
#include "core/log.h"
+#include "utils/gsl.h"
#define FMT_DEFAULT fmt_lower
@@ -144,15 +145,15 @@ void sunny_path_bootstrap(minifi::io::BaseStream* stream, TransferState& transfe
size_t read_len = 0;
while (!found_codec) {
uint8_t handshake_data[1000];
- int actual_len = stream->read(handshake_data+read_len, 1000-read_len);
- if (actual_len <= 0) {
+ const auto actual_len = stream->read(handshake_data+read_len, 1000-read_len);
+ if(actual_len == 0 || minifi::io::isError(actual_len)) {
continue;
}
read_len += actual_len;
- std::string incoming_data(reinterpret_cast<const char *>(handshake_data), read_len);
- auto it = std::search(incoming_data.begin(), incoming_data.end(), CODEC_NAME.begin(), CODEC_NAME.end());
- if (it != incoming_data.end()) {
- size_t idx = std::distance(incoming_data.begin(), it);
+ const std::string incoming_data(reinterpret_cast<const char *>(handshake_data), read_len);
+ const auto it = std::search(incoming_data.begin(), incoming_data.end(), CODEC_NAME.begin(), CODEC_NAME.end());
+ if(it != incoming_data.end()){
+ const auto idx = gsl::narrow<size_t>(std::distance(incoming_data.begin(), it));
// Actual version follows the string as an uint32_t // that should be the end of the buffer
found_codec = idx + CODEC_NAME.length() + sizeof(uint32_t) == read_len;
}