You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by sz...@apache.org on 2021/05/26 12:46:24 UTC

[nifi-minifi-cpp] 16/17: fix various issues

This is an automated email from the ASF dual-hosted git repository.

szaszm pushed a commit to branch MINIFICPP-1507
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit efd9dc647219c69439b2b785b83cdac2e04b5921
Author: Marton Szasz <sz...@gmail.com>
AuthorDate: Thu Mar 25 12:11:59 2021 +0100

    fix various issues
---
 extensions/libarchive/CompressContent.h | 39 ++++++++++++++++-----------------
 1 file changed, 19 insertions(+), 20 deletions(-)

diff --git a/extensions/libarchive/CompressContent.h b/extensions/libarchive/CompressContent.h
index aae9492..b93471f 100644
--- a/extensions/libarchive/CompressContent.h
+++ b/extensions/libarchive/CompressContent.h
@@ -44,7 +44,7 @@ namespace minifi {
 namespace processors {
 
 // CompressContent Class
-class CompressContent: public core::Processor {
+class CompressContent : public core::Processor {
 public:
   // Constructor
   /*!
@@ -120,7 +120,7 @@ public:
             status_ = -1;
             return -1;
           }
-          read_size += ret;
+          read_size += gsl::narrow<uint64_t>(ret);
         } else {
           break;
         }
@@ -134,26 +134,24 @@ public:
     std::shared_ptr<logging::Logger> logger_;
   };
   // Nest Callback Class for read stream from flow for decompress
-  class ReadCallbackDecompress: public InputStreamCallback {
-  public:
+  struct ReadCallbackDecompress : InputStreamCallback {
     explicit ReadCallbackDecompress(std::shared_ptr<core::FlowFile> flow) :
-        flow_(std::move(flow)) {
+        flow_file(std::move(flow)) {
     }
     ~ReadCallbackDecompress() override = default;
     int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
-      read_size_ = 0;
-      stream->seek(offset_);
-      const auto readRet = stream->read(buffer_, sizeof(buffer_));
-      read_size_ = readRet;
+      stream->seek(offset);
+      const auto readRet = stream->read(buffer, sizeof(buffer));
+      stream_read_result = readRet;
       if (!io::isError(readRet)) {
-        offset_ += readRet;
+        offset += readRet;
       }
       return gsl::narrow<int64_t>(readRet);
     }
-    size_t read_size_ = 0;
-    uint8_t buffer_[8192] = {0};
-    size_t offset_ = 0;
-    std::shared_ptr<core::FlowFile> flow_;
+    size_t stream_read_result = 0;  // read size or error code, to be checked with io::isError
+    uint8_t buffer[8192] = {0};
+    size_t offset = 0;
+    std::shared_ptr<core::FlowFile> flow_file;
   };
   // Nest Callback Class for write stream
   class WriteCallback: public OutputStreamCallback {
@@ -192,8 +190,9 @@ public:
     static la_ssize_t archive_read(struct archive*, void *context, const void **buff) {
       auto *callback = (WriteCallback *) context;
       callback->session_->read(callback->flow_, &callback->readDecompressCb_);
-      *buff = callback->readDecompressCb_.buffer_;
-      return gsl::narrow<la_ssize_t>(callback->readDecompressCb_.read_size_);
+      *buff = callback->readDecompressCb_.buffer;
+      if (io::isError(callback->readDecompressCb_.stream_read_result)) return -1;
+      return gsl::narrow<la_ssize_t>(callback->readDecompressCb_.stream_read_result);
     }
 
     static la_int64_t archive_skip(struct archive* /*a*/, void* /*client_data*/, la_int64_t /*request*/) {
@@ -420,14 +419,14 @@ public:
    * @param sessionFactory process session factory that is used when creating
    * ProcessSession objects.
    */
-  void onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory);
+  void onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory) override;
   // OnTrigger method, implemented by NiFi CompressContent
-  virtual void onTrigger(core::ProcessContext* /*context*/, core::ProcessSession* /*session*/) {
+  void onTrigger(core::ProcessContext* /*context*/, core::ProcessSession* /*session*/) override {
   }
   // OnTrigger method, implemented by NiFi CompressContent
-  virtual void onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session);
+  void onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) override;
   // Initialize, over write by NiFi CompressContent
-  virtual void initialize(void);
+  void initialize() override;
 
 private:
   static std::string toMimeType(CompressionFormat format);