You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by GitBox <gi...@apache.org> on 2021/12/10 13:40:18 UTC

[GitHub] [nifi-minifi-cpp] adamdebreceni opened a new pull request #1224: MINIFICPP-1698 - Make archive read/write agent-wide available

adamdebreceni opened a new pull request #1224:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1224


   Thank you for submitting a contribution to Apache NiFi - MiNiFi C++.
   
   In order to streamline the review of the contribution we ask you
   to ensure the following steps have been taken:
   
   ### For all changes:
   - [ ] Is there a JIRA ticket associated with this PR? Is it referenced
        in the commit message?
   
   - [ ] Does your PR title start with MINIFICPP-XXXX where XXXX is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character.
   
   - [ ] Has your PR been rebased against the latest commit within the target branch (typically main)?
   
   - [ ] Is your initial contribution a single, squashed commit?
   
   ### For code changes:
   - [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)?
   - [ ] If applicable, have you updated the LICENSE file?
   - [ ] If applicable, have you updated the NOTICE file?
   
   ### For documentation related changes:
   - [ ] Have you ensured that format looks appropriate for the output in which it is rendered?
   
   ### Note:
   Please ensure that once the PR is submitted, you check GitHub Actions CI results for build issues and submit an update to your PR as soon as possible.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #1224: MINIFICPP-1698 - Make archive read/write agent-wide available

Posted by GitBox <gi...@apache.org>.
arpadboda commented on a change in pull request #1224:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1224#discussion_r778006636



##########
File path: extensions/libarchive/CompressContent.cpp
##########
@@ -176,9 +177,36 @@ void CompressContent::processFlowFile(const std::shared_ptr<core::FlowFile>& flo
   std::shared_ptr<core::FlowFile> result = session->create(flowFile);
   bool success = false;
   if (encapsulateInTar_) {
-    CompressContent::WriteCallback callback(compressMode_, compressLevel_, compressFormat, flowFile, session);
-    session->write(result, &callback);
-    success = callback.status_ >= 0;
+    std::function<int64_t(const std::shared_ptr<io::InputStream>&, const std::shared_ptr<io::OutputStream>&)> transformer;
+
+    if (compressMode_ == CompressionMode::Compress) {
+      std::string filename;
+      flowFile->getAttribute(core::SpecialFlowAttribute::FILENAME, filename);
+      transformer = [&, filename] (const std::shared_ptr<io::InputStream>& in, const std::shared_ptr<io::OutputStream>& out) -> int64_t {
+        io::WriteArchiveStreamImpl compressor(compressLevel_, compressFormat, out);
+        if (!compressor.newEntry({filename, in->size()})) {
+          return -1;
+        }
+        return internal::pipe(in.get(), &compressor);
+      };
+    } else {
+      transformer = [&] (const std::shared_ptr<io::InputStream>& in, const std::shared_ptr<io::OutputStream>& out) -> int64_t {
+        io::ReadArchiveStreamImpl decompressor(in);
+        if (!decompressor.nextEntry()) {
+          return -1;
+        }
+        return internal::pipe(&decompressor, out.get());
+      };
+    }
+    session->write(result, FunctionOutputStreamCallback([&] (const auto& out) {
+      return session->read(flowFile, FunctionInputStreamCallback([&] (const auto& in) {
+        return transformer(in, out);
+      }));
+    }));
+    // TODO(adebreceni): previous attempt to handle a malformed archive were in vain

Review comment:
       Is there a follow-up ticket to address this?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a change in pull request #1224: MINIFICPP-1698 - Make archive read/write agent-wide available

Posted by GitBox <gi...@apache.org>.
adamdebreceni commented on a change in pull request #1224:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1224#discussion_r777889252



##########
File path: extensions/libarchive/ReadArchiveStream.h
##########
@@ -0,0 +1,90 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <memory>
+#include <utility>
+
+#include "io/OutputStream.h"
+#include "io/ArchiveStream.h"
+#include "core/logging/LoggerConfiguration.h"
+
+#include "archive_entry.h"
+#include "archive.h"
+
+namespace org::apache::nifi::minifi::io {
+
+class ReadArchiveStreamImpl : public ReadArchiveStream {
+  struct archive_ptr : public std::unique_ptr<struct archive, int(*)(struct archive*)> {
+    using Base = std::unique_ptr<struct archive, int(*)(struct archive*)>;
+    archive_ptr(): Base(nullptr, archive_read_free) {}
+    archive_ptr(std::nullptr_t): Base(nullptr, archive_read_free) {}
+    archive_ptr(struct archive* arch): Base(arch, archive_read_free) {}
+  };

Review comment:
       don't we have to then manually specify the deleter `archive_read_free` on each construction? (specifically at `return nullptr;` in `createReadArchive`)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a change in pull request #1224: MINIFICPP-1698 - Make archive read/write agent-wide available

Posted by GitBox <gi...@apache.org>.
adamdebreceni commented on a change in pull request #1224:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1224#discussion_r778085588



##########
File path: extensions/libarchive/WriteArchiveStream.cpp
##########
@@ -0,0 +1,131 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "WriteArchiveStream.h"
+
+#include <utility>
+#include <string>
+
+namespace org::apache::nifi::minifi::io {
+
+WriteArchiveStreamImpl::archive_ptr WriteArchiveStreamImpl::createWriteArchive() {
+  archive_ptr arch{archive_write_new()};
+  if (!arch) {
+    logger_->log_error("Failed to create write archive");
+    return nullptr;
+  }
+
+  int result;
+
+  result = archive_write_set_format_ustar(arch.get());
+  if (result != ARCHIVE_OK) {
+    logger_->log_error("Archive write set format ustar error %s", archive_error_string(arch.get()));
+    return nullptr;
+  }
+  if (compress_format_ == CompressionFormat::GZIP) {
+    result = archive_write_add_filter_gzip(arch.get());
+    if (result != ARCHIVE_OK) {
+      logger_->log_error("Archive write add filter gzip error %s", archive_error_string(arch.get()));
+      return nullptr;
+    }
+    std::string option = "gzip:compression-level=" + std::to_string(compress_level_);
+    result = archive_write_set_options(arch.get(), option.c_str());
+    if (result != ARCHIVE_OK) {
+      logger_->log_error("Archive write set options error %s", archive_error_string(arch.get()));
+      return nullptr;
+    }
+  } else if (compress_format_ == CompressionFormat::BZIP2) {
+    result = archive_write_add_filter_bzip2(arch.get());
+    if (result != ARCHIVE_OK) {
+      logger_->log_error("Archive write add filter bzip2 error %s", archive_error_string(arch.get()));
+      return nullptr;
+    }
+  } else if (compress_format_ == CompressionFormat::LZMA) {
+    result = archive_write_add_filter_lzma(arch.get());
+    if (result != ARCHIVE_OK) {
+      logger_->log_error("Archive write add filter lzma error %s", archive_error_string(arch.get()));
+      return nullptr;
+    }
+  } else if (compress_format_ == CompressionFormat::XZ_LZMA2) {
+    result = archive_write_add_filter_xz(arch.get());
+    if (result != ARCHIVE_OK) {
+      logger_->log_error("Archive write add filter xz error %s", archive_error_string(arch.get()));
+      return nullptr;
+    }
+  } else {
+    logger_->log_error("Archive write unsupported compression format");
+    return nullptr;
+  }
+  result = archive_write_set_bytes_per_block(arch.get(), 0);
+  if (result != ARCHIVE_OK) {
+    logger_->log_error("Archive write set bytes per block error %s", archive_error_string(arch.get()));
+    return nullptr;
+  }
+  result = archive_write_open(arch.get(), sink_.get(), nullptr, archive_write, nullptr);
+  if (result != ARCHIVE_OK) {
+    logger_->log_error("Archive write open error %s", archive_error_string(arch.get()));
+    return nullptr;
+  }
+  return arch;
+}
+
+bool WriteArchiveStreamImpl::newEntry(const EntryInfo& info) {
+  if (!arch_) {
+    return false;
+  }
+  arch_entry_.reset(archive_entry_new());
+  if (!arch_entry_) {
+    logger_->log_error("Failed to create archive entry");
+    return false;
+  }
+  archive_entry_set_pathname(arch_entry_.get(), info.filename.c_str());
+  archive_entry_set_size(arch_entry_.get(), info.size);
+  archive_entry_set_mode(arch_entry_.get(), S_IFREG | 0755);
+
+  int result = archive_write_header(arch_.get(), arch_entry_.get());
+  if (result != ARCHIVE_OK) {
+    logger_->log_error("Archive write header error %s", archive_error_string(arch_.get()));
+    return false;
+  }
+  return true;
+}
+
+size_t WriteArchiveStreamImpl::write(const uint8_t* data, size_t len) {
+  if (!arch_ || !arch_entry_) {
+    return STREAM_ERROR;
+  }
+
+  if (len == 0) {
+    return 0;
+  }
+  gsl_Expects(data);
+
+  int result = archive_write_data(arch_.get(), data, len);
+  if (result < 0) {
+    logger_->log_error("Archive write data error %s", archive_error_string(arch_.get()));
+    arch_entry_.reset();
+    arch_.reset();
+    return STREAM_ERROR;
+  }
+
+  return len;

Review comment:
       yes, changed it to return the result




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a change in pull request #1224: MINIFICPP-1698 - Make archive read/write agent-wide available

Posted by GitBox <gi...@apache.org>.
adamdebreceni commented on a change in pull request #1224:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1224#discussion_r767743992



##########
File path: extensions/libarchive/ReadArchiveStream.cpp
##########
@@ -0,0 +1,82 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "ReadArchiveStream.h"
+
+namespace org::apache::nifi::minifi::io {
+
+ReadArchiveStreamImpl::archive_ptr ReadArchiveStreamImpl::createReadArchive() {
+  archive_ptr arch = archive_read_new();
+  if (!arch) {
+    logger_->log_error("Failed to create read archive");
+    return nullptr;
+  }
+
+  int result;
+
+  result = archive_read_support_format_all(arch.get());

Review comment:
       since this variable is reused for each operation I was reluctant to merge them, since that made me think that the result belongs to this operation only, I am not sure if that was only me or not, should we merge them?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a change in pull request #1224: MINIFICPP-1698 - Make archive read/write agent-wide available

Posted by GitBox <gi...@apache.org>.
adamdebreceni commented on a change in pull request #1224:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1224#discussion_r778085157



##########
File path: extensions/libarchive/CompressContent.cpp
##########
@@ -176,9 +177,36 @@ void CompressContent::processFlowFile(const std::shared_ptr<core::FlowFile>& flo
   std::shared_ptr<core::FlowFile> result = session->create(flowFile);
   bool success = false;
   if (encapsulateInTar_) {
-    CompressContent::WriteCallback callback(compressMode_, compressLevel_, compressFormat, flowFile, session);
-    session->write(result, &callback);
-    success = callback.status_ >= 0;
+    std::function<int64_t(const std::shared_ptr<io::InputStream>&, const std::shared_ptr<io::OutputStream>&)> transformer;
+
+    if (compressMode_ == CompressionMode::Compress) {
+      std::string filename;
+      flowFile->getAttribute(core::SpecialFlowAttribute::FILENAME, filename);
+      transformer = [&, filename] (const std::shared_ptr<io::InputStream>& in, const std::shared_ptr<io::OutputStream>& out) -> int64_t {
+        io::WriteArchiveStreamImpl compressor(compressLevel_, compressFormat, out);
+        if (!compressor.newEntry({filename, in->size()})) {
+          return -1;
+        }
+        return internal::pipe(in.get(), &compressor);
+      };
+    } else {
+      transformer = [&] (const std::shared_ptr<io::InputStream>& in, const std::shared_ptr<io::OutputStream>& out) -> int64_t {
+        io::ReadArchiveStreamImpl decompressor(in);
+        if (!decompressor.nextEntry()) {
+          return -1;
+        }
+        return internal::pipe(&decompressor, out.get());
+      };
+    }
+    session->write(result, FunctionOutputStreamCallback([&] (const auto& out) {
+      return session->read(flowFile, FunctionInputStreamCallback([&] (const auto& in) {
+        return transformer(in, out);
+      }));
+    }));
+    // TODO(adebreceni): previous attempt to handle a malformed archive were in vain

Review comment:
       created ticked and added link as comment

##########
File path: extensions/libarchive/ReadArchiveStream.h
##########
@@ -0,0 +1,91 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <memory>
+#include <utility>
+
+#include "io/OutputStream.h"
+#include "io/ArchiveStream.h"
+#include "core/logging/LoggerConfiguration.h"
+
+#include "archive_entry.h"
+#include "archive.h"
+
+namespace org::apache::nifi::minifi::io {
+
+class ReadArchiveStreamImpl : public ReadArchiveStream {
+  struct archive_read_deleter {
+    int operator()(struct archive* ptr) const {
+      return archive_read_free(ptr);
+    }
+  };
+  using archive_ptr = std::unique_ptr<struct archive, archive_read_deleter>;
+
+  class BufferedReader {
+   public:
+    explicit BufferedReader(std::shared_ptr<InputStream> input) : input_(std::move(input)) {}
+
+    std::optional<gsl::span<const uint8_t>> readChunk() {
+      size_t result = input_->read(buffer_.data(), buffer_.size());
+      if (io::isError(result)) {
+        return std::nullopt;
+      }
+      return gsl::span<const uint8_t>(buffer_.data(), result);
+    }
+
+   private:
+    std::shared_ptr<InputStream> input_;
+    std::array<uint8_t, 4096> buffer_;
+  };
+
+  archive_ptr createReadArchive();
+
+ public:
+  explicit ReadArchiveStreamImpl(std::shared_ptr<InputStream> input) : reader_(std::move(input)) {
+    arch_ = createReadArchive();
+  }
+
+  std::shared_ptr<InputStream> input_;
+  std::shared_ptr<core::logging::Logger> logger_ = core::logging::LoggerFactory<ReadArchiveStream>::getLogger();
+  BufferedReader reader_;
+  archive_ptr arch_;

Review comment:
       made them private

##########
File path: extensions/libarchive/ReadArchiveStream.h
##########
@@ -0,0 +1,91 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <memory>
+#include <utility>
+
+#include "io/OutputStream.h"
+#include "io/ArchiveStream.h"
+#include "core/logging/LoggerConfiguration.h"
+
+#include "archive_entry.h"
+#include "archive.h"
+
+namespace org::apache::nifi::minifi::io {
+
+class ReadArchiveStreamImpl : public ReadArchiveStream {
+  struct archive_read_deleter {
+    int operator()(struct archive* ptr) const {
+      return archive_read_free(ptr);
+    }
+  };
+  using archive_ptr = std::unique_ptr<struct archive, archive_read_deleter>;
+
+  class BufferedReader {
+   public:
+    explicit BufferedReader(std::shared_ptr<InputStream> input) : input_(std::move(input)) {}
+
+    std::optional<gsl::span<const uint8_t>> readChunk() {
+      size_t result = input_->read(buffer_.data(), buffer_.size());
+      if (io::isError(result)) {
+        return std::nullopt;
+      }
+      return gsl::span<const uint8_t>(buffer_.data(), result);
+    }
+
+   private:
+    std::shared_ptr<InputStream> input_;
+    std::array<uint8_t, 4096> buffer_;
+  };
+
+  archive_ptr createReadArchive();
+
+ public:
+  explicit ReadArchiveStreamImpl(std::shared_ptr<InputStream> input) : reader_(std::move(input)) {
+    arch_ = createReadArchive();
+  }
+
+  std::shared_ptr<InputStream> input_;

Review comment:
       removed




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a change in pull request #1224: MINIFICPP-1698 - Make archive read/write agent-wide available

Posted by GitBox <gi...@apache.org>.
adamdebreceni commented on a change in pull request #1224:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1224#discussion_r777891867



##########
File path: extensions/libarchive/WriteArchiveStream.cpp
##########
@@ -0,0 +1,151 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "WriteArchiveStream.h"
+
+#include <utility>
+#include <string>
+
+#include "core/Resource.h"
+#include "ReadArchiveStream.h"
+
+namespace org::apache::nifi::minifi::io {
+
+WriteArchiveStreamImpl::archive_ptr WriteArchiveStreamImpl::createWriteArchive() {
+  archive_ptr arch = archive_write_new();
+  if (!arch) {
+    logger_->log_error("Failed to create write archive");
+    return nullptr;
+  }
+
+  int result;
+
+  result = archive_write_set_format_ustar(arch.get());

Review comment:
       I'll leave it as is if there is no strong preference 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #1224: MINIFICPP-1698 - Make archive read/write agent-wide available

Posted by GitBox <gi...@apache.org>.
szaszm commented on a change in pull request #1224:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1224#discussion_r777567108



##########
File path: extensions/libarchive/WriteArchiveStream.cpp
##########
@@ -0,0 +1,151 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "WriteArchiveStream.h"
+
+#include <utility>
+#include <string>
+
+#include "core/Resource.h"
+#include "ReadArchiveStream.h"
+
+namespace org::apache::nifi::minifi::io {
+
+WriteArchiveStreamImpl::archive_ptr WriteArchiveStreamImpl::createWriteArchive() {
+  archive_ptr arch = archive_write_new();
+  if (!arch) {
+    logger_->log_error("Failed to create write archive");
+    return nullptr;
+  }
+
+  int result;
+
+  result = archive_write_set_format_ustar(arch.get());

Review comment:
       My preference would be separate scope for all of the result variables. Like an [SSA](https://en.wikipedia.org/wiki/Static_single_assignment_form) form, but done by hand to avoid multiple meanings of the same variable. I can also live with it as is.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a change in pull request #1224: MINIFICPP-1698 - Make archive read/write agent-wide available

Posted by GitBox <gi...@apache.org>.
adamdebreceni commented on a change in pull request #1224:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1224#discussion_r777891164



##########
File path: extensions/libarchive/WriteArchiveStream.cpp
##########
@@ -0,0 +1,151 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "WriteArchiveStream.h"
+
+#include <utility>
+#include <string>
+
+#include "core/Resource.h"
+#include "ReadArchiveStream.h"
+
+namespace org::apache::nifi::minifi::io {
+
+WriteArchiveStreamImpl::archive_ptr WriteArchiveStreamImpl::createWriteArchive() {
+  archive_ptr arch = archive_write_new();
+  if (!arch) {
+    logger_->log_error("Failed to create write archive");
+    return nullptr;
+  }
+
+  int result;
+
+  result = archive_write_set_format_ustar(arch.get());
+  if (result != ARCHIVE_OK) {
+    logger_->log_error("Archive write set format ustar error %s", archive_error_string(arch.get()));
+    return nullptr;
+  }
+  if (compress_format_ == CompressionFormat::GZIP) {
+    result = archive_write_add_filter_gzip(arch.get());
+    if (result != ARCHIVE_OK) {
+      logger_->log_error("Archive write add filter gzip error %s", archive_error_string(arch.get()));
+      return nullptr;
+    }
+    std::string option = "gzip:compression-level=" + std::to_string(compress_level_);
+    result = archive_write_set_options(arch.get(), option.c_str());
+    if (result != ARCHIVE_OK) {
+      logger_->log_error("Archive write set options error %s", archive_error_string(arch.get()));
+      return nullptr;
+    }
+  } else if (compress_format_ == CompressionFormat::BZIP2) {
+    result = archive_write_add_filter_bzip2(arch.get());
+    if (result != ARCHIVE_OK) {
+      logger_->log_error("Archive write add filter bzip2 error %s", archive_error_string(arch.get()));
+      return nullptr;
+    }
+  } else if (compress_format_ == CompressionFormat::LZMA) {
+    result = archive_write_add_filter_lzma(arch.get());
+    if (result != ARCHIVE_OK) {
+      logger_->log_error("Archive write add filter lzma error %s", archive_error_string(arch.get()));
+      return nullptr;
+    }
+  } else if (compress_format_ == CompressionFormat::XZ_LZMA2) {
+    result = archive_write_add_filter_xz(arch.get());
+    if (result != ARCHIVE_OK) {
+      logger_->log_error("Archive write add filter xz error %s", archive_error_string(arch.get()));
+      return nullptr;
+    }
+  } else {
+    logger_->log_error("Archive write unsupported compression format");
+    return nullptr;
+  }
+  result = archive_write_set_bytes_per_block(arch.get(), 0);
+  if (result != ARCHIVE_OK) {
+    logger_->log_error("Archive write set bytes per block error %s", archive_error_string(arch.get()));
+    return nullptr;
+  }
+  result = archive_write_open(arch.get(), sink_.get(), nullptr, archive_write, nullptr);
+  if (result != ARCHIVE_OK) {
+    logger_->log_error("Archive write open error %s", archive_error_string(arch.get()));
+    return nullptr;
+  }
+  return arch;
+}
+
+bool WriteArchiveStreamImpl::newEntry(const EntryInfo& info) {
+  if (!arch_) {
+    return false;
+  }
+  arch_entry_ = archive_entry_new();
+  if (!arch_entry_) {
+    logger_->log_error("Failed to create archive entry");
+    return false;
+  }
+  archive_entry_set_pathname(arch_entry_.get(), info.filename.c_str());
+  archive_entry_set_size(arch_entry_.get(), info.size);
+  archive_entry_set_mode(arch_entry_.get(), S_IFREG | 0755);
+
+  int result = archive_write_header(arch_.get(), arch_entry_.get());
+  if (result != ARCHIVE_OK) {
+    logger_->log_error("Archive write header error %s", archive_error_string(arch_.get()));
+    return false;
+  }
+  return true;
+}
+
+size_t WriteArchiveStreamImpl::write(const uint8_t* data, size_t len) {

Review comment:
       added precondition




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a change in pull request #1224: MINIFICPP-1698 - Make archive read/write agent-wide available

Posted by GitBox <gi...@apache.org>.
adamdebreceni commented on a change in pull request #1224:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1224#discussion_r777889252



##########
File path: extensions/libarchive/ReadArchiveStream.h
##########
@@ -0,0 +1,90 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <memory>
+#include <utility>
+
+#include "io/OutputStream.h"
+#include "io/ArchiveStream.h"
+#include "core/logging/LoggerConfiguration.h"
+
+#include "archive_entry.h"
+#include "archive.h"
+
+namespace org::apache::nifi::minifi::io {
+
+class ReadArchiveStreamImpl : public ReadArchiveStream {
+  struct archive_ptr : public std::unique_ptr<struct archive, int(*)(struct archive*)> {
+    using Base = std::unique_ptr<struct archive, int(*)(struct archive*)>;
+    archive_ptr(): Base(nullptr, archive_read_free) {}
+    archive_ptr(std::nullptr_t): Base(nullptr, archive_read_free) {}
+    archive_ptr(struct archive* arch): Base(arch, archive_read_free) {}
+  };

Review comment:
       don't we have to then manually specify the deleter `archive_read_free` on each construction? (specifically at `return nullptr;` in `createReadArchive`)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #1224: MINIFICPP-1698 - Make archive read/write agent-wide available

Posted by GitBox <gi...@apache.org>.
szaszm commented on a change in pull request #1224:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1224#discussion_r777567955



##########
File path: extensions/libarchive/WriteArchiveStream.cpp
##########
@@ -0,0 +1,151 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "WriteArchiveStream.h"
+
+#include <utility>
+#include <string>
+
+#include "core/Resource.h"
+#include "ReadArchiveStream.h"
+
+namespace org::apache::nifi::minifi::io {
+
+WriteArchiveStreamImpl::archive_ptr WriteArchiveStreamImpl::createWriteArchive() {
+  archive_ptr arch = archive_write_new();
+  if (!arch) {
+    logger_->log_error("Failed to create write archive");
+    return nullptr;
+  }
+
+  int result;
+
+  result = archive_write_set_format_ustar(arch.get());
+  if (result != ARCHIVE_OK) {
+    logger_->log_error("Archive write set format ustar error %s", archive_error_string(arch.get()));
+    return nullptr;
+  }
+  if (compress_format_ == CompressionFormat::GZIP) {
+    result = archive_write_add_filter_gzip(arch.get());
+    if (result != ARCHIVE_OK) {
+      logger_->log_error("Archive write add filter gzip error %s", archive_error_string(arch.get()));
+      return nullptr;
+    }
+    std::string option = "gzip:compression-level=" + std::to_string(compress_level_);
+    result = archive_write_set_options(arch.get(), option.c_str());
+    if (result != ARCHIVE_OK) {
+      logger_->log_error("Archive write set options error %s", archive_error_string(arch.get()));
+      return nullptr;
+    }
+  } else if (compress_format_ == CompressionFormat::BZIP2) {
+    result = archive_write_add_filter_bzip2(arch.get());
+    if (result != ARCHIVE_OK) {
+      logger_->log_error("Archive write add filter bzip2 error %s", archive_error_string(arch.get()));
+      return nullptr;
+    }
+  } else if (compress_format_ == CompressionFormat::LZMA) {
+    result = archive_write_add_filter_lzma(arch.get());
+    if (result != ARCHIVE_OK) {
+      logger_->log_error("Archive write add filter lzma error %s", archive_error_string(arch.get()));
+      return nullptr;
+    }
+  } else if (compress_format_ == CompressionFormat::XZ_LZMA2) {
+    result = archive_write_add_filter_xz(arch.get());
+    if (result != ARCHIVE_OK) {
+      logger_->log_error("Archive write add filter xz error %s", archive_error_string(arch.get()));
+      return nullptr;
+    }
+  } else {
+    logger_->log_error("Archive write unsupported compression format");
+    return nullptr;
+  }
+  result = archive_write_set_bytes_per_block(arch.get(), 0);
+  if (result != ARCHIVE_OK) {
+    logger_->log_error("Archive write set bytes per block error %s", archive_error_string(arch.get()));
+    return nullptr;
+  }
+  result = archive_write_open(arch.get(), sink_.get(), nullptr, archive_write, nullptr);
+  if (result != ARCHIVE_OK) {
+    logger_->log_error("Archive write open error %s", archive_error_string(arch.get()));
+    return nullptr;
+  }
+  return arch;
+}
+
+bool WriteArchiveStreamImpl::newEntry(const EntryInfo& info) {
+  if (!arch_) {
+    return false;
+  }
+  arch_entry_ = archive_entry_new();
+  if (!arch_entry_) {
+    logger_->log_error("Failed to create archive entry");
+    return false;
+  }
+  archive_entry_set_pathname(arch_entry_.get(), info.filename.c_str());
+  archive_entry_set_size(arch_entry_.get(), info.size);
+  archive_entry_set_mode(arch_entry_.get(), S_IFREG | 0755);
+
+  int result = archive_write_header(arch_.get(), arch_entry_.get());
+  if (result != ARCHIVE_OK) {
+    logger_->log_error("Archive write header error %s", archive_error_string(arch_.get()));
+    return false;
+  }
+  return true;
+}
+
+size_t WriteArchiveStreamImpl::write(const uint8_t* data, size_t len) {

Review comment:
       It would be nice to specify a precondition here for not-null data. Not sure about requirements on len, but it would seem logical to require it to be greater than zero, or allow either both data and len to be zero, or neither.
   ```suggestion
   size_t WriteArchiveStreamImpl::write(const uint8_t* data, size_t len) {
     gsl_Expects(data);  // maybe gsl_Expects(data && len > 0); or gsl_Expects(!data == (len == 0));
   ```

##########
File path: extensions/libarchive/ReadArchiveStream.h
##########
@@ -0,0 +1,90 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <memory>
+#include <utility>
+
+#include "io/OutputStream.h"
+#include "io/ArchiveStream.h"
+#include "core/logging/LoggerConfiguration.h"
+
+#include "archive_entry.h"
+#include "archive.h"
+
+namespace org::apache::nifi::minifi::io {
+
+class ReadArchiveStreamImpl : public ReadArchiveStream {
+  struct archive_ptr : public std::unique_ptr<struct archive, int(*)(struct archive*)> {
+    using Base = std::unique_ptr<struct archive, int(*)(struct archive*)>;
+    archive_ptr(): Base(nullptr, archive_read_free) {}
+    archive_ptr(std::nullptr_t): Base(nullptr, archive_read_free) {}
+    archive_ptr(struct archive* arch): Base(arch, archive_read_free) {}
+  };

Review comment:
       I think it's simpler to define deleter types and unique_ptr type aliases instead.

##########
File path: extensions/libarchive/WriteArchiveStream.cpp
##########
@@ -0,0 +1,151 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "WriteArchiveStream.h"
+
+#include <utility>
+#include <string>
+
+#include "core/Resource.h"
+#include "ReadArchiveStream.h"
+
+namespace org::apache::nifi::minifi::io {
+
+WriteArchiveStreamImpl::archive_ptr WriteArchiveStreamImpl::createWriteArchive() {
+  archive_ptr arch = archive_write_new();
+  if (!arch) {
+    logger_->log_error("Failed to create write archive");
+    return nullptr;
+  }
+
+  int result;
+
+  result = archive_write_set_format_ustar(arch.get());

Review comment:
       My preference would be separate scope for all of the result variables, but I can live with it as is.

##########
File path: extensions/libarchive/WriteArchiveStream.cpp
##########
@@ -0,0 +1,151 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "WriteArchiveStream.h"
+
+#include <utility>
+#include <string>
+
+#include "core/Resource.h"
+#include "ReadArchiveStream.h"
+
+namespace org::apache::nifi::minifi::io {
+
+WriteArchiveStreamImpl::archive_ptr WriteArchiveStreamImpl::createWriteArchive() {
+  archive_ptr arch = archive_write_new();
+  if (!arch) {
+    logger_->log_error("Failed to create write archive");
+    return nullptr;
+  }
+
+  int result;
+
+  result = archive_write_set_format_ustar(arch.get());
+  if (result != ARCHIVE_OK) {
+    logger_->log_error("Archive write set format ustar error %s", archive_error_string(arch.get()));
+    return nullptr;
+  }
+  if (compress_format_ == CompressionFormat::GZIP) {
+    result = archive_write_add_filter_gzip(arch.get());
+    if (result != ARCHIVE_OK) {
+      logger_->log_error("Archive write add filter gzip error %s", archive_error_string(arch.get()));
+      return nullptr;
+    }
+    std::string option = "gzip:compression-level=" + std::to_string(compress_level_);
+    result = archive_write_set_options(arch.get(), option.c_str());
+    if (result != ARCHIVE_OK) {
+      logger_->log_error("Archive write set options error %s", archive_error_string(arch.get()));
+      return nullptr;
+    }
+  } else if (compress_format_ == CompressionFormat::BZIP2) {
+    result = archive_write_add_filter_bzip2(arch.get());
+    if (result != ARCHIVE_OK) {
+      logger_->log_error("Archive write add filter bzip2 error %s", archive_error_string(arch.get()));
+      return nullptr;
+    }
+  } else if (compress_format_ == CompressionFormat::LZMA) {
+    result = archive_write_add_filter_lzma(arch.get());
+    if (result != ARCHIVE_OK) {
+      logger_->log_error("Archive write add filter lzma error %s", archive_error_string(arch.get()));
+      return nullptr;
+    }
+  } else if (compress_format_ == CompressionFormat::XZ_LZMA2) {
+    result = archive_write_add_filter_xz(arch.get());
+    if (result != ARCHIVE_OK) {
+      logger_->log_error("Archive write add filter xz error %s", archive_error_string(arch.get()));
+      return nullptr;
+    }
+  } else {
+    logger_->log_error("Archive write unsupported compression format");
+    return nullptr;
+  }
+  result = archive_write_set_bytes_per_block(arch.get(), 0);
+  if (result != ARCHIVE_OK) {
+    logger_->log_error("Archive write set bytes per block error %s", archive_error_string(arch.get()));
+    return nullptr;
+  }
+  result = archive_write_open(arch.get(), sink_.get(), nullptr, archive_write, nullptr);
+  if (result != ARCHIVE_OK) {
+    logger_->log_error("Archive write open error %s", archive_error_string(arch.get()));
+    return nullptr;
+  }
+  return arch;
+}
+
+bool WriteArchiveStreamImpl::newEntry(const EntryInfo& info) {
+  if (!arch_) {
+    return false;
+  }
+  arch_entry_ = archive_entry_new();
+  if (!arch_entry_) {
+    logger_->log_error("Failed to create archive entry");
+    return false;
+  }
+  archive_entry_set_pathname(arch_entry_.get(), info.filename.c_str());
+  archive_entry_set_size(arch_entry_.get(), info.size);
+  archive_entry_set_mode(arch_entry_.get(), S_IFREG | 0755);
+
+  int result = archive_write_header(arch_.get(), arch_entry_.get());
+  if (result != ARCHIVE_OK) {
+    logger_->log_error("Archive write header error %s", archive_error_string(arch_.get()));
+    return false;
+  }
+  return true;
+}
+
+size_t WriteArchiveStreamImpl::write(const uint8_t* data, size_t len) {
+  if (!arch_ || !arch_entry_) {
+    return STREAM_ERROR;
+  }
+
+  int result = archive_write_data(arch_.get(), data, len);
+  if (result < 0) {
+    logger_->log_error("Archive write data error %s", archive_error_string(arch_.get()));
+    arch_entry_.reset();
+    arch_.reset();
+    return STREAM_ERROR;
+  }
+
+  return len;
+}
+
+class ArchiveStreamProviderImpl : public ArchiveStreamProvider {
+ public:
+  using ArchiveStreamProvider::ArchiveStreamProvider;
+  std::unique_ptr<WriteArchiveStream> createWriteStream(int compress_level, const std::string& compress_format,
+                                              std::shared_ptr<OutputStream> sink, std::shared_ptr<core::logging::Logger> logger) override {
+    CompressionFormat format = CompressionFormat::parse(compress_format.c_str(), CompressionFormat{});
+    if (!format) {
+      if (logger) {
+        logger->log_error("Unrecognized compression format '%s'", compress_format);
+      }
+      return nullptr;
+    }
+    return std::make_unique<WriteArchiveStreamImpl>(compress_level, format, std::move(sink));
+  }
+
+  std::unique_ptr<ReadArchiveStream> createReadStream(std::shared_ptr<InputStream> archive_stream) override {
+    return std::make_unique<ReadArchiveStreamImpl>(std::move(archive_stream));
+  }
+};
+
+REGISTER_INTERNAL_RESOURCE_AS(ArchiveStreamProviderImpl, ("ArchiveStreamProvider"));

Review comment:
       We should move this to a separate file to avoid a dependency between WriteArchiveStream -> ReadArchiveStream. It's fine for me if it has an empty header, or just a forward declaration of the class, or no header at all.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a change in pull request #1224: MINIFICPP-1698 - Make archive read/write agent-wide available

Posted by GitBox <gi...@apache.org>.
adamdebreceni commented on a change in pull request #1224:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1224#discussion_r766717769



##########
File path: extensions/libarchive/ReadArchiveStream.cpp
##########
@@ -0,0 +1,79 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "ReadArchiveStream.h"
+
+namespace org::apache::nifi::minifi::io {
+
+ReadArchiveStreamImpl::archive_ptr ReadArchiveStreamImpl::createReadArchive() {

Review comment:
       tests incoming




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] szaszm closed pull request #1224: MINIFICPP-1698 - Make archive read/write agent-wide available

Posted by GitBox <gi...@apache.org>.
szaszm closed pull request #1224:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1224


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a change in pull request #1224: MINIFICPP-1698 - Make archive read/write agent-wide available

Posted by GitBox <gi...@apache.org>.
adamdebreceni commented on a change in pull request #1224:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1224#discussion_r777890318



##########
File path: extensions/libarchive/WriteArchiveStream.cpp
##########
@@ -0,0 +1,151 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "WriteArchiveStream.h"
+
+#include <utility>
+#include <string>
+
+#include "core/Resource.h"
+#include "ReadArchiveStream.h"
+
+namespace org::apache::nifi::minifi::io {
+
+WriteArchiveStreamImpl::archive_ptr WriteArchiveStreamImpl::createWriteArchive() {
+  archive_ptr arch = archive_write_new();
+  if (!arch) {
+    logger_->log_error("Failed to create write archive");
+    return nullptr;
+  }
+
+  int result;
+
+  result = archive_write_set_format_ustar(arch.get());
+  if (result != ARCHIVE_OK) {
+    logger_->log_error("Archive write set format ustar error %s", archive_error_string(arch.get()));
+    return nullptr;
+  }
+  if (compress_format_ == CompressionFormat::GZIP) {
+    result = archive_write_add_filter_gzip(arch.get());
+    if (result != ARCHIVE_OK) {
+      logger_->log_error("Archive write add filter gzip error %s", archive_error_string(arch.get()));
+      return nullptr;
+    }
+    std::string option = "gzip:compression-level=" + std::to_string(compress_level_);
+    result = archive_write_set_options(arch.get(), option.c_str());
+    if (result != ARCHIVE_OK) {
+      logger_->log_error("Archive write set options error %s", archive_error_string(arch.get()));
+      return nullptr;
+    }
+  } else if (compress_format_ == CompressionFormat::BZIP2) {
+    result = archive_write_add_filter_bzip2(arch.get());
+    if (result != ARCHIVE_OK) {
+      logger_->log_error("Archive write add filter bzip2 error %s", archive_error_string(arch.get()));
+      return nullptr;
+    }
+  } else if (compress_format_ == CompressionFormat::LZMA) {
+    result = archive_write_add_filter_lzma(arch.get());
+    if (result != ARCHIVE_OK) {
+      logger_->log_error("Archive write add filter lzma error %s", archive_error_string(arch.get()));
+      return nullptr;
+    }
+  } else if (compress_format_ == CompressionFormat::XZ_LZMA2) {
+    result = archive_write_add_filter_xz(arch.get());
+    if (result != ARCHIVE_OK) {
+      logger_->log_error("Archive write add filter xz error %s", archive_error_string(arch.get()));
+      return nullptr;
+    }
+  } else {
+    logger_->log_error("Archive write unsupported compression format");
+    return nullptr;
+  }
+  result = archive_write_set_bytes_per_block(arch.get(), 0);
+  if (result != ARCHIVE_OK) {
+    logger_->log_error("Archive write set bytes per block error %s", archive_error_string(arch.get()));
+    return nullptr;
+  }
+  result = archive_write_open(arch.get(), sink_.get(), nullptr, archive_write, nullptr);
+  if (result != ARCHIVE_OK) {
+    logger_->log_error("Archive write open error %s", archive_error_string(arch.get()));
+    return nullptr;
+  }
+  return arch;
+}
+
+bool WriteArchiveStreamImpl::newEntry(const EntryInfo& info) {
+  if (!arch_) {
+    return false;
+  }
+  arch_entry_ = archive_entry_new();
+  if (!arch_entry_) {
+    logger_->log_error("Failed to create archive entry");
+    return false;
+  }
+  archive_entry_set_pathname(arch_entry_.get(), info.filename.c_str());
+  archive_entry_set_size(arch_entry_.get(), info.size);
+  archive_entry_set_mode(arch_entry_.get(), S_IFREG | 0755);
+
+  int result = archive_write_header(arch_.get(), arch_entry_.get());
+  if (result != ARCHIVE_OK) {
+    logger_->log_error("Archive write header error %s", archive_error_string(arch_.get()));
+    return false;
+  }
+  return true;
+}
+
+size_t WriteArchiveStreamImpl::write(const uint8_t* data, size_t len) {
+  if (!arch_ || !arch_entry_) {
+    return STREAM_ERROR;
+  }
+
+  int result = archive_write_data(arch_.get(), data, len);
+  if (result < 0) {
+    logger_->log_error("Archive write data error %s", archive_error_string(arch_.get()));
+    arch_entry_.reset();
+    arch_.reset();
+    return STREAM_ERROR;
+  }
+
+  return len;
+}
+
+class ArchiveStreamProviderImpl : public ArchiveStreamProvider {
+ public:
+  using ArchiveStreamProvider::ArchiveStreamProvider;
+  std::unique_ptr<WriteArchiveStream> createWriteStream(int compress_level, const std::string& compress_format,
+                                              std::shared_ptr<OutputStream> sink, std::shared_ptr<core::logging::Logger> logger) override {
+    CompressionFormat format = CompressionFormat::parse(compress_format.c_str(), CompressionFormat{});
+    if (!format) {
+      if (logger) {
+        logger->log_error("Unrecognized compression format '%s'", compress_format);
+      }
+      return nullptr;
+    }
+    return std::make_unique<WriteArchiveStreamImpl>(compress_level, format, std::move(sink));
+  }
+
+  std::unique_ptr<ReadArchiveStream> createReadStream(std::shared_ptr<InputStream> archive_stream) override {
+    return std::make_unique<ReadArchiveStreamImpl>(std::move(archive_stream));
+  }
+};
+
+REGISTER_INTERNAL_RESOURCE_AS(ArchiveStreamProviderImpl, ("ArchiveStreamProvider"));

Review comment:
       moved it to a separate source




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a change in pull request #1224: MINIFICPP-1698 - Make archive read/write agent-wide available

Posted by GitBox <gi...@apache.org>.
adamdebreceni commented on a change in pull request #1224:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1224#discussion_r777896764



##########
File path: extensions/libarchive/ReadArchiveStream.h
##########
@@ -0,0 +1,90 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <memory>
+#include <utility>
+
+#include "io/OutputStream.h"
+#include "io/ArchiveStream.h"
+#include "core/logging/LoggerConfiguration.h"
+
+#include "archive_entry.h"
+#include "archive.h"
+
+namespace org::apache::nifi::minifi::io {
+
+class ReadArchiveStreamImpl : public ReadArchiveStream {
+  struct archive_ptr : public std::unique_ptr<struct archive, int(*)(struct archive*)> {
+    using Base = std::unique_ptr<struct archive, int(*)(struct archive*)>;
+    archive_ptr(): Base(nullptr, archive_read_free) {}
+    archive_ptr(std::nullptr_t): Base(nullptr, archive_read_free) {}
+    archive_ptr(struct archive* arch): Base(arch, archive_read_free) {}
+  };

Review comment:
       done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] lordgamez commented on a change in pull request #1224: MINIFICPP-1698 - Make archive read/write agent-wide available

Posted by GitBox <gi...@apache.org>.
lordgamez commented on a change in pull request #1224:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1224#discussion_r767756319



##########
File path: extensions/libarchive/ReadArchiveStream.cpp
##########
@@ -0,0 +1,82 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "ReadArchiveStream.h"
+
+namespace org::apache::nifi::minifi::io {
+
+ReadArchiveStreamImpl::archive_ptr ReadArchiveStreamImpl::createReadArchive() {
+  archive_ptr arch = archive_read_new();
+  if (!arch) {
+    logger_->log_error("Failed to create read archive");
+    return nullptr;
+  }
+
+  int result;
+
+  result = archive_read_support_format_all(arch.get());

Review comment:
       I didn't think about it like this, but if you think it improves the representation of this value and gives context to you or anyone else reading this, I don't insist on merging it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] lordgamez commented on a change in pull request #1224: MINIFICPP-1698 - Make archive read/write agent-wide available

Posted by GitBox <gi...@apache.org>.
lordgamez commented on a change in pull request #1224:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1224#discussion_r767623930



##########
File path: extensions/libarchive/WriteArchiveStream.cpp
##########
@@ -0,0 +1,151 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "WriteArchiveStream.h"
+
+#include <utility>
+#include <string>
+
+#include "core/Resource.h"
+#include "ReadArchiveStream.h"
+
+namespace org::apache::nifi::minifi::io {
+
+WriteArchiveStreamImpl::archive_ptr WriteArchiveStreamImpl::createWriteArchive() {
+  archive_ptr arch = archive_write_new();
+  if (!arch) {
+    logger_->log_error("Failed to create write archive");
+    return nullptr;
+  }
+
+  int result;
+
+  result = archive_write_set_format_ustar(arch.get());

Review comment:
       These 2 lines could also be merged

##########
File path: extensions/libarchive/ReadArchiveStream.cpp
##########
@@ -0,0 +1,82 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "ReadArchiveStream.h"
+
+namespace org::apache::nifi::minifi::io {
+
+ReadArchiveStreamImpl::archive_ptr ReadArchiveStreamImpl::createReadArchive() {
+  archive_ptr arch = archive_read_new();
+  if (!arch) {
+    logger_->log_error("Failed to create read archive");
+    return nullptr;
+  }
+
+  int result;
+
+  result = archive_read_support_format_all(arch.get());

Review comment:
       These 2 lines could be merged




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] fgerlits commented on a change in pull request #1224: MINIFICPP-1698 - Make archive read/write agent-wide available

Posted by GitBox <gi...@apache.org>.
fgerlits commented on a change in pull request #1224:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1224#discussion_r778014273



##########
File path: extensions/libarchive/ReadArchiveStream.h
##########
@@ -0,0 +1,91 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <memory>
+#include <utility>
+
+#include "io/OutputStream.h"
+#include "io/ArchiveStream.h"
+#include "core/logging/LoggerConfiguration.h"
+
+#include "archive_entry.h"
+#include "archive.h"
+
+namespace org::apache::nifi::minifi::io {
+
+class ReadArchiveStreamImpl : public ReadArchiveStream {
+  struct archive_read_deleter {
+    int operator()(struct archive* ptr) const {
+      return archive_read_free(ptr);
+    }
+  };
+  using archive_ptr = std::unique_ptr<struct archive, archive_read_deleter>;
+
+  class BufferedReader {
+   public:
+    explicit BufferedReader(std::shared_ptr<InputStream> input) : input_(std::move(input)) {}
+
+    std::optional<gsl::span<const uint8_t>> readChunk() {
+      size_t result = input_->read(buffer_.data(), buffer_.size());
+      if (io::isError(result)) {
+        return std::nullopt;
+      }
+      return gsl::span<const uint8_t>(buffer_.data(), result);
+    }
+
+   private:
+    std::shared_ptr<InputStream> input_;
+    std::array<uint8_t, 4096> buffer_;
+  };
+
+  archive_ptr createReadArchive();
+
+ public:
+  explicit ReadArchiveStreamImpl(std::shared_ptr<InputStream> input) : reader_(std::move(input)) {
+    arch_ = createReadArchive();
+  }
+
+  std::shared_ptr<InputStream> input_;

Review comment:
       it looks like `input_` has been moved inside `BufferedReader`, and can be removed from here

##########
File path: extensions/libarchive/WriteArchiveStream.cpp
##########
@@ -0,0 +1,131 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "WriteArchiveStream.h"
+
+#include <utility>
+#include <string>
+
+namespace org::apache::nifi::minifi::io {
+
+WriteArchiveStreamImpl::archive_ptr WriteArchiveStreamImpl::createWriteArchive() {
+  archive_ptr arch{archive_write_new()};
+  if (!arch) {
+    logger_->log_error("Failed to create write archive");
+    return nullptr;
+  }
+
+  int result;
+
+  result = archive_write_set_format_ustar(arch.get());
+  if (result != ARCHIVE_OK) {
+    logger_->log_error("Archive write set format ustar error %s", archive_error_string(arch.get()));
+    return nullptr;
+  }
+  if (compress_format_ == CompressionFormat::GZIP) {
+    result = archive_write_add_filter_gzip(arch.get());
+    if (result != ARCHIVE_OK) {
+      logger_->log_error("Archive write add filter gzip error %s", archive_error_string(arch.get()));
+      return nullptr;
+    }
+    std::string option = "gzip:compression-level=" + std::to_string(compress_level_);
+    result = archive_write_set_options(arch.get(), option.c_str());
+    if (result != ARCHIVE_OK) {
+      logger_->log_error("Archive write set options error %s", archive_error_string(arch.get()));
+      return nullptr;
+    }
+  } else if (compress_format_ == CompressionFormat::BZIP2) {
+    result = archive_write_add_filter_bzip2(arch.get());
+    if (result != ARCHIVE_OK) {
+      logger_->log_error("Archive write add filter bzip2 error %s", archive_error_string(arch.get()));
+      return nullptr;
+    }
+  } else if (compress_format_ == CompressionFormat::LZMA) {
+    result = archive_write_add_filter_lzma(arch.get());
+    if (result != ARCHIVE_OK) {
+      logger_->log_error("Archive write add filter lzma error %s", archive_error_string(arch.get()));
+      return nullptr;
+    }
+  } else if (compress_format_ == CompressionFormat::XZ_LZMA2) {
+    result = archive_write_add_filter_xz(arch.get());
+    if (result != ARCHIVE_OK) {
+      logger_->log_error("Archive write add filter xz error %s", archive_error_string(arch.get()));
+      return nullptr;
+    }
+  } else {
+    logger_->log_error("Archive write unsupported compression format");
+    return nullptr;
+  }
+  result = archive_write_set_bytes_per_block(arch.get(), 0);
+  if (result != ARCHIVE_OK) {
+    logger_->log_error("Archive write set bytes per block error %s", archive_error_string(arch.get()));
+    return nullptr;
+  }
+  result = archive_write_open(arch.get(), sink_.get(), nullptr, archive_write, nullptr);
+  if (result != ARCHIVE_OK) {
+    logger_->log_error("Archive write open error %s", archive_error_string(arch.get()));
+    return nullptr;
+  }
+  return arch;
+}
+
+bool WriteArchiveStreamImpl::newEntry(const EntryInfo& info) {
+  if (!arch_) {
+    return false;
+  }
+  arch_entry_.reset(archive_entry_new());
+  if (!arch_entry_) {
+    logger_->log_error("Failed to create archive entry");
+    return false;
+  }
+  archive_entry_set_pathname(arch_entry_.get(), info.filename.c_str());
+  archive_entry_set_size(arch_entry_.get(), info.size);
+  archive_entry_set_mode(arch_entry_.get(), S_IFREG | 0755);
+
+  int result = archive_write_header(arch_.get(), arch_entry_.get());
+  if (result != ARCHIVE_OK) {
+    logger_->log_error("Archive write header error %s", archive_error_string(arch_.get()));
+    return false;
+  }
+  return true;
+}
+
+size_t WriteArchiveStreamImpl::write(const uint8_t* data, size_t len) {
+  if (!arch_ || !arch_entry_) {
+    return STREAM_ERROR;
+  }
+
+  if (len == 0) {
+    return 0;
+  }
+  gsl_Expects(data);
+
+  int result = archive_write_data(arch_.get(), data, len);
+  if (result < 0) {
+    logger_->log_error("Archive write data error %s", archive_error_string(arch_.get()));
+    arch_entry_.reset();
+    arch_.reset();
+    return STREAM_ERROR;
+  }
+
+  return len;

Review comment:
       Should we return `result`?  I would expect the return value to be the number of bytes written, not the size of the input.  This is what other `OutputStream::write()` implementations do.

##########
File path: extensions/libarchive/ReadArchiveStream.h
##########
@@ -0,0 +1,91 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <memory>
+#include <utility>
+
+#include "io/OutputStream.h"
+#include "io/ArchiveStream.h"
+#include "core/logging/LoggerConfiguration.h"
+
+#include "archive_entry.h"
+#include "archive.h"
+
+namespace org::apache::nifi::minifi::io {
+
+class ReadArchiveStreamImpl : public ReadArchiveStream {
+  struct archive_read_deleter {
+    int operator()(struct archive* ptr) const {
+      return archive_read_free(ptr);
+    }
+  };
+  using archive_ptr = std::unique_ptr<struct archive, archive_read_deleter>;
+
+  class BufferedReader {
+   public:
+    explicit BufferedReader(std::shared_ptr<InputStream> input) : input_(std::move(input)) {}
+
+    std::optional<gsl::span<const uint8_t>> readChunk() {
+      size_t result = input_->read(buffer_.data(), buffer_.size());
+      if (io::isError(result)) {
+        return std::nullopt;
+      }
+      return gsl::span<const uint8_t>(buffer_.data(), result);
+    }
+
+   private:
+    std::shared_ptr<InputStream> input_;
+    std::array<uint8_t, 4096> buffer_;
+  };
+
+  archive_ptr createReadArchive();
+
+ public:
+  explicit ReadArchiveStreamImpl(std::shared_ptr<InputStream> input) : reader_(std::move(input)) {
+    arch_ = createReadArchive();
+  }
+
+  std::shared_ptr<InputStream> input_;
+  std::shared_ptr<core::logging::Logger> logger_ = core::logging::LoggerFactory<ReadArchiveStream>::getLogger();
+  BufferedReader reader_;
+  archive_ptr arch_;

Review comment:
       do all these members need to be public?

##########
File path: extensions/libarchive/WriteArchiveStream.h
##########
@@ -0,0 +1,86 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <memory>
+#include <utility>
+#include <string>
+
+#include "io/ArchiveStream.h"
+#include "archive_entry.h"
+#include "archive.h"
+#include "utils/Enum.h"
+#include "core/Core.h"
+#include "logging/LoggerConfiguration.h"
+
+namespace org::apache::nifi::minifi::io {
+
+SMART_ENUM(CompressionFormat,
+  (GZIP, "gzip"),
+  (LZMA, "lzma"),
+  (XZ_LZMA2, "xz-lzma2"),
+  (BZIP2, "bzip2")
+)
+
+class WriteArchiveStreamImpl: public WriteArchiveStream {
+  struct archive_write_deleter {
+    int operator()(struct archive* ptr) const {
+      return archive_write_free(ptr);
+    }
+  };
+  using archive_ptr = std::unique_ptr<struct archive, archive_write_deleter>;
+  struct archive_entry_deleter {
+    void operator()(struct archive_entry* ptr) const {
+      archive_entry_free(ptr);
+    }
+  };
+  using archive_entry_ptr = std::unique_ptr<struct archive_entry, archive_entry_deleter>;
+
+  archive_ptr createWriteArchive();
+
+ public:
+  WriteArchiveStreamImpl(int compress_level, CompressionFormat compress_format, std::shared_ptr<OutputStream> sink)
+    : compress_level_(compress_level),
+      compress_format_(compress_format),
+      sink_(std::move(sink)) {
+    arch_ = createWriteArchive();
+  }
+
+  int compress_level_;
+  CompressionFormat compress_format_;
+  std::shared_ptr<io::OutputStream> sink_;
+  int status_{0};
+  archive_ptr arch_;
+  archive_entry_ptr arch_entry_;
+  std::shared_ptr<core::logging::Logger> logger_ = core::logging::LoggerFactory<WriteArchiveStreamImpl>::getLogger();

Review comment:
       do these all need to be public?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a change in pull request #1224: MINIFICPP-1698 - Make archive read/write agent-wide available

Posted by GitBox <gi...@apache.org>.
adamdebreceni commented on a change in pull request #1224:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1224#discussion_r778085695



##########
File path: extensions/libarchive/WriteArchiveStream.h
##########
@@ -0,0 +1,86 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <memory>
+#include <utility>
+#include <string>
+
+#include "io/ArchiveStream.h"
+#include "archive_entry.h"
+#include "archive.h"
+#include "utils/Enum.h"
+#include "core/Core.h"
+#include "logging/LoggerConfiguration.h"
+
+namespace org::apache::nifi::minifi::io {
+
+SMART_ENUM(CompressionFormat,
+  (GZIP, "gzip"),
+  (LZMA, "lzma"),
+  (XZ_LZMA2, "xz-lzma2"),
+  (BZIP2, "bzip2")
+)
+
+class WriteArchiveStreamImpl: public WriteArchiveStream {
+  struct archive_write_deleter {
+    int operator()(struct archive* ptr) const {
+      return archive_write_free(ptr);
+    }
+  };
+  using archive_ptr = std::unique_ptr<struct archive, archive_write_deleter>;
+  struct archive_entry_deleter {
+    void operator()(struct archive_entry* ptr) const {
+      archive_entry_free(ptr);
+    }
+  };
+  using archive_entry_ptr = std::unique_ptr<struct archive_entry, archive_entry_deleter>;
+
+  archive_ptr createWriteArchive();
+
+ public:
+  WriteArchiveStreamImpl(int compress_level, CompressionFormat compress_format, std::shared_ptr<OutputStream> sink)
+    : compress_level_(compress_level),
+      compress_format_(compress_format),
+      sink_(std::move(sink)) {
+    arch_ = createWriteArchive();
+  }
+
+  int compress_level_;
+  CompressionFormat compress_format_;
+  std::shared_ptr<io::OutputStream> sink_;
+  int status_{0};
+  archive_ptr arch_;
+  archive_entry_ptr arch_entry_;
+  std::shared_ptr<core::logging::Logger> logger_ = core::logging::LoggerFactory<WriteArchiveStreamImpl>::getLogger();

Review comment:
       made them private




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org