You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by GitBox <gi...@apache.org> on 2022/01/04 13:10:14 UTC

[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #1224: MINIFICPP-1698 - Make archive read/write agent-wide available

arpadboda commented on a change in pull request #1224:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1224#discussion_r778006636



##########
File path: extensions/libarchive/CompressContent.cpp
##########
@@ -176,9 +177,36 @@ void CompressContent::processFlowFile(const std::shared_ptr<core::FlowFile>& flo
   std::shared_ptr<core::FlowFile> result = session->create(flowFile);
   bool success = false;
   if (encapsulateInTar_) {
-    CompressContent::WriteCallback callback(compressMode_, compressLevel_, compressFormat, flowFile, session);
-    session->write(result, &callback);
-    success = callback.status_ >= 0;
+    std::function<int64_t(const std::shared_ptr<io::InputStream>&, const std::shared_ptr<io::OutputStream>&)> transformer;
+
+    if (compressMode_ == CompressionMode::Compress) {
+      std::string filename;
+      flowFile->getAttribute(core::SpecialFlowAttribute::FILENAME, filename);
+      transformer = [&, filename] (const std::shared_ptr<io::InputStream>& in, const std::shared_ptr<io::OutputStream>& out) -> int64_t {
+        io::WriteArchiveStreamImpl compressor(compressLevel_, compressFormat, out);
+        if (!compressor.newEntry({filename, in->size()})) {
+          return -1;
+        }
+        return internal::pipe(in.get(), &compressor);
+      };
+    } else {
+      transformer = [&] (const std::shared_ptr<io::InputStream>& in, const std::shared_ptr<io::OutputStream>& out) -> int64_t {
+        io::ReadArchiveStreamImpl decompressor(in);
+        if (!decompressor.nextEntry()) {
+          return -1;
+        }
+        return internal::pipe(&decompressor, out.get());
+      };
+    }
+    session->write(result, FunctionOutputStreamCallback([&] (const auto& out) {
+      return session->read(flowFile, FunctionInputStreamCallback([&] (const auto& in) {
+        return transformer(in, out);
+      }));
+    }));
+    // TODO(adebreceni): previous attempt to handle a malformed archive were in vain

Review comment:
       Is there a follow-up ticket to address this?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org