You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by sz...@apache.org on 2021/05/26 12:46:24 UTC
[nifi-minifi-cpp] 16/17: fix various issues
This is an automated email from the ASF dual-hosted git repository.
szaszm pushed a commit to branch MINIFICPP-1507
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
commit efd9dc647219c69439b2b785b83cdac2e04b5921
Author: Marton Szasz <sz...@gmail.com>
AuthorDate: Thu Mar 25 12:11:59 2021 +0100
fix various issues
---
extensions/libarchive/CompressContent.h | 39 ++++++++++++++++-----------------
1 file changed, 19 insertions(+), 20 deletions(-)
diff --git a/extensions/libarchive/CompressContent.h b/extensions/libarchive/CompressContent.h
index aae9492..b93471f 100644
--- a/extensions/libarchive/CompressContent.h
+++ b/extensions/libarchive/CompressContent.h
@@ -44,7 +44,7 @@ namespace minifi {
namespace processors {
// CompressContent Class
-class CompressContent: public core::Processor {
+class CompressContent : public core::Processor {
public:
// Constructor
/*!
@@ -120,7 +120,7 @@ public:
status_ = -1;
return -1;
}
- read_size += ret;
+ read_size += gsl::narrow<uint64_t>(ret);
} else {
break;
}
@@ -134,26 +134,24 @@ public:
std::shared_ptr<logging::Logger> logger_;
};
// Nest Callback Class for read stream from flow for decompress
- class ReadCallbackDecompress: public InputStreamCallback {
- public:
+ struct ReadCallbackDecompress : InputStreamCallback {
explicit ReadCallbackDecompress(std::shared_ptr<core::FlowFile> flow) :
- flow_(std::move(flow)) {
+ flow_file(std::move(flow)) {
}
~ReadCallbackDecompress() override = default;
int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
- read_size_ = 0;
- stream->seek(offset_);
- const auto readRet = stream->read(buffer_, sizeof(buffer_));
- read_size_ = readRet;
+ stream->seek(offset);
+ const auto readRet = stream->read(buffer, sizeof(buffer));
+ stream_read_result = readRet;
if (!io::isError(readRet)) {
- offset_ += readRet;
+ offset += readRet;
}
return gsl::narrow<int64_t>(readRet);
}
- size_t read_size_ = 0;
- uint8_t buffer_[8192] = {0};
- size_t offset_ = 0;
- std::shared_ptr<core::FlowFile> flow_;
+ size_t stream_read_result = 0; // read size or error code, to be checked with io::isError
+ uint8_t buffer[8192] = {0};
+ size_t offset = 0;
+ std::shared_ptr<core::FlowFile> flow_file;
};
// Nest Callback Class for write stream
class WriteCallback: public OutputStreamCallback {
@@ -192,8 +190,9 @@ public:
static la_ssize_t archive_read(struct archive*, void *context, const void **buff) {
auto *callback = (WriteCallback *) context;
callback->session_->read(callback->flow_, &callback->readDecompressCb_);
- *buff = callback->readDecompressCb_.buffer_;
- return gsl::narrow<la_ssize_t>(callback->readDecompressCb_.read_size_);
+ *buff = callback->readDecompressCb_.buffer;
+ if (io::isError(callback->readDecompressCb_.stream_read_result)) return -1;
+ return gsl::narrow<la_ssize_t>(callback->readDecompressCb_.stream_read_result);
}
static la_int64_t archive_skip(struct archive* /*a*/, void* /*client_data*/, la_int64_t /*request*/) {
@@ -420,14 +419,14 @@ public:
* @param sessionFactory process session factory that is used when creating
* ProcessSession objects.
*/
- void onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory);
+ void onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory) override;
// OnTrigger method, implemented by NiFi CompressContent
- virtual void onTrigger(core::ProcessContext* /*context*/, core::ProcessSession* /*session*/) {
+ void onTrigger(core::ProcessContext* /*context*/, core::ProcessSession* /*session*/) override {
}
// OnTrigger method, implemented by NiFi CompressContent
- virtual void onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session);
+ void onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) override;
// Initialize, over write by NiFi CompressContent
- virtual void initialize(void);
+ void initialize() override;
private:
static std::string toMimeType(CompressionFormat format);