You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by GitBox <gi...@apache.org> on 2022/01/04 13:16:28 UTC

[GitHub] [nifi-minifi-cpp] fgerlits commented on a change in pull request #1224: MINIFICPP-1698 - Make archive read/write agent-wide available

fgerlits commented on a change in pull request #1224:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1224#discussion_r778014273



##########
File path: extensions/libarchive/ReadArchiveStream.h
##########
@@ -0,0 +1,91 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <memory>
+#include <utility>
+
+#include "io/OutputStream.h"
+#include "io/ArchiveStream.h"
+#include "core/logging/LoggerConfiguration.h"
+
+#include "archive_entry.h"
+#include "archive.h"
+
+namespace org::apache::nifi::minifi::io {
+
+class ReadArchiveStreamImpl : public ReadArchiveStream {
+  struct archive_read_deleter {
+    int operator()(struct archive* ptr) const {
+      return archive_read_free(ptr);
+    }
+  };
+  using archive_ptr = std::unique_ptr<struct archive, archive_read_deleter>;
+
+  class BufferedReader {
+   public:
+    explicit BufferedReader(std::shared_ptr<InputStream> input) : input_(std::move(input)) {}
+
+    std::optional<gsl::span<const uint8_t>> readChunk() {
+      size_t result = input_->read(buffer_.data(), buffer_.size());
+      if (io::isError(result)) {
+        return std::nullopt;
+      }
+      return gsl::span<const uint8_t>(buffer_.data(), result);
+    }
+
+   private:
+    std::shared_ptr<InputStream> input_;
+    std::array<uint8_t, 4096> buffer_;
+  };
+
+  archive_ptr createReadArchive();
+
+ public:
+  explicit ReadArchiveStreamImpl(std::shared_ptr<InputStream> input) : reader_(std::move(input)) {
+    arch_ = createReadArchive();
+  }
+
+  std::shared_ptr<InputStream> input_;

Review comment:
       it looks like `input_` has been moved inside `BufferedReader`, and can be removed from here

##########
File path: extensions/libarchive/WriteArchiveStream.cpp
##########
@@ -0,0 +1,131 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "WriteArchiveStream.h"
+
+#include <utility>
+#include <string>
+
+namespace org::apache::nifi::minifi::io {
+
+WriteArchiveStreamImpl::archive_ptr WriteArchiveStreamImpl::createWriteArchive() {
+  archive_ptr arch{archive_write_new()};
+  if (!arch) {
+    logger_->log_error("Failed to create write archive");
+    return nullptr;
+  }
+
+  int result;
+
+  result = archive_write_set_format_ustar(arch.get());
+  if (result != ARCHIVE_OK) {
+    logger_->log_error("Archive write set format ustar error %s", archive_error_string(arch.get()));
+    return nullptr;
+  }
+  if (compress_format_ == CompressionFormat::GZIP) {
+    result = archive_write_add_filter_gzip(arch.get());
+    if (result != ARCHIVE_OK) {
+      logger_->log_error("Archive write add filter gzip error %s", archive_error_string(arch.get()));
+      return nullptr;
+    }
+    std::string option = "gzip:compression-level=" + std::to_string(compress_level_);
+    result = archive_write_set_options(arch.get(), option.c_str());
+    if (result != ARCHIVE_OK) {
+      logger_->log_error("Archive write set options error %s", archive_error_string(arch.get()));
+      return nullptr;
+    }
+  } else if (compress_format_ == CompressionFormat::BZIP2) {
+    result = archive_write_add_filter_bzip2(arch.get());
+    if (result != ARCHIVE_OK) {
+      logger_->log_error("Archive write add filter bzip2 error %s", archive_error_string(arch.get()));
+      return nullptr;
+    }
+  } else if (compress_format_ == CompressionFormat::LZMA) {
+    result = archive_write_add_filter_lzma(arch.get());
+    if (result != ARCHIVE_OK) {
+      logger_->log_error("Archive write add filter lzma error %s", archive_error_string(arch.get()));
+      return nullptr;
+    }
+  } else if (compress_format_ == CompressionFormat::XZ_LZMA2) {
+    result = archive_write_add_filter_xz(arch.get());
+    if (result != ARCHIVE_OK) {
+      logger_->log_error("Archive write add filter xz error %s", archive_error_string(arch.get()));
+      return nullptr;
+    }
+  } else {
+    logger_->log_error("Archive write unsupported compression format");
+    return nullptr;
+  }
+  result = archive_write_set_bytes_per_block(arch.get(), 0);
+  if (result != ARCHIVE_OK) {
+    logger_->log_error("Archive write set bytes per block error %s", archive_error_string(arch.get()));
+    return nullptr;
+  }
+  result = archive_write_open(arch.get(), sink_.get(), nullptr, archive_write, nullptr);
+  if (result != ARCHIVE_OK) {
+    logger_->log_error("Archive write open error %s", archive_error_string(arch.get()));
+    return nullptr;
+  }
+  return arch;
+}
+
+bool WriteArchiveStreamImpl::newEntry(const EntryInfo& info) {
+  if (!arch_) {
+    return false;
+  }
+  arch_entry_.reset(archive_entry_new());
+  if (!arch_entry_) {
+    logger_->log_error("Failed to create archive entry");
+    return false;
+  }
+  archive_entry_set_pathname(arch_entry_.get(), info.filename.c_str());
+  archive_entry_set_size(arch_entry_.get(), info.size);
+  archive_entry_set_mode(arch_entry_.get(), S_IFREG | 0755);
+
+  int result = archive_write_header(arch_.get(), arch_entry_.get());
+  if (result != ARCHIVE_OK) {
+    logger_->log_error("Archive write header error %s", archive_error_string(arch_.get()));
+    return false;
+  }
+  return true;
+}
+
+size_t WriteArchiveStreamImpl::write(const uint8_t* data, size_t len) {
+  if (!arch_ || !arch_entry_) {
+    return STREAM_ERROR;
+  }
+
+  if (len == 0) {
+    return 0;
+  }
+  gsl_Expects(data);
+
+  int result = archive_write_data(arch_.get(), data, len);
+  if (result < 0) {
+    logger_->log_error("Archive write data error %s", archive_error_string(arch_.get()));
+    arch_entry_.reset();
+    arch_.reset();
+    return STREAM_ERROR;
+  }
+
+  return len;

Review comment:
       Should we return `result`?  I would expect the return value to be the number of bytes written, not the size of the input.  This is what other `OutputStream::write()` implementations do.

##########
File path: extensions/libarchive/ReadArchiveStream.h
##########
@@ -0,0 +1,91 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <memory>
+#include <utility>
+
+#include "io/OutputStream.h"
+#include "io/ArchiveStream.h"
+#include "core/logging/LoggerConfiguration.h"
+
+#include "archive_entry.h"
+#include "archive.h"
+
+namespace org::apache::nifi::minifi::io {
+
+class ReadArchiveStreamImpl : public ReadArchiveStream {
+  struct archive_read_deleter {
+    int operator()(struct archive* ptr) const {
+      return archive_read_free(ptr);
+    }
+  };
+  using archive_ptr = std::unique_ptr<struct archive, archive_read_deleter>;
+
+  class BufferedReader {
+   public:
+    explicit BufferedReader(std::shared_ptr<InputStream> input) : input_(std::move(input)) {}
+
+    std::optional<gsl::span<const uint8_t>> readChunk() {
+      size_t result = input_->read(buffer_.data(), buffer_.size());
+      if (io::isError(result)) {
+        return std::nullopt;
+      }
+      return gsl::span<const uint8_t>(buffer_.data(), result);
+    }
+
+   private:
+    std::shared_ptr<InputStream> input_;
+    std::array<uint8_t, 4096> buffer_;
+  };
+
+  archive_ptr createReadArchive();
+
+ public:
+  explicit ReadArchiveStreamImpl(std::shared_ptr<InputStream> input) : reader_(std::move(input)) {
+    arch_ = createReadArchive();
+  }
+
+  std::shared_ptr<InputStream> input_;
+  std::shared_ptr<core::logging::Logger> logger_ = core::logging::LoggerFactory<ReadArchiveStream>::getLogger();
+  BufferedReader reader_;
+  archive_ptr arch_;

Review comment:
       do all these members need to be public?

##########
File path: extensions/libarchive/WriteArchiveStream.h
##########
@@ -0,0 +1,86 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <memory>
+#include <utility>
+#include <string>
+
+#include "io/ArchiveStream.h"
+#include "archive_entry.h"
+#include "archive.h"
+#include "utils/Enum.h"
+#include "core/Core.h"
+#include "logging/LoggerConfiguration.h"
+
+namespace org::apache::nifi::minifi::io {
+
+SMART_ENUM(CompressionFormat,
+  (GZIP, "gzip"),
+  (LZMA, "lzma"),
+  (XZ_LZMA2, "xz-lzma2"),
+  (BZIP2, "bzip2")
+)
+
+class WriteArchiveStreamImpl: public WriteArchiveStream {
+  struct archive_write_deleter {
+    int operator()(struct archive* ptr) const {
+      return archive_write_free(ptr);
+    }
+  };
+  using archive_ptr = std::unique_ptr<struct archive, archive_write_deleter>;
+  struct archive_entry_deleter {
+    void operator()(struct archive_entry* ptr) const {
+      archive_entry_free(ptr);
+    }
+  };
+  using archive_entry_ptr = std::unique_ptr<struct archive_entry, archive_entry_deleter>;
+
+  archive_ptr createWriteArchive();
+
+ public:
+  WriteArchiveStreamImpl(int compress_level, CompressionFormat compress_format, std::shared_ptr<OutputStream> sink)
+    : compress_level_(compress_level),
+      compress_format_(compress_format),
+      sink_(std::move(sink)) {
+    arch_ = createWriteArchive();
+  }
+
+  int compress_level_;
+  CompressionFormat compress_format_;
+  std::shared_ptr<io::OutputStream> sink_;
+  int status_{0};
+  archive_ptr arch_;
+  archive_entry_ptr arch_entry_;
+  std::shared_ptr<core::logging::Logger> logger_ = core::logging::LoggerFactory<WriteArchiveStreamImpl>::getLogger();

Review comment:
       do these all need to be public?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org