You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by "fgerlits (via GitHub)" <gi...@apache.org> on 2023/06/16 15:30:35 UTC

[GitHub] [nifi-minifi-cpp] fgerlits commented on a diff in pull request #1586: MINIFICPP-2127 Add multipart upload support for PutS3Object processor

fgerlits commented on code in PR #1586:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1586#discussion_r1231175779


##########
docker/test/integration/features/steps/steps.py:
##########
@@ -275,6 +275,11 @@ def step_impl(context, content, path):
     context.test.add_test_data(path, content)
 
 
+@given("a file with {size} content is present in \"{path}\"")

Review Comment:
   I think
   ```suggestion
   @given("a file of size {size} is present in \"{path}\"")
   ```
   would be clearer



##########
extensions/aws/processors/PutS3Object.h:
##########
@@ -59,6 +64,11 @@ class PutS3Object : public S3Processor {
   static const core::Property WriteACLUserList;
   static const core::Property CannedACL;
   static const core::Property UsePathStyleAccess;
+  static const core::Property MultipartThreshold;
+  static const core::Property MultipartPartSize;

Review Comment:
   I don't know if "part" is standard AWS terminology, but if it isn't, then "chunk" may be a better word for this.



##########
docker/test/integration/cluster/checkers/AwsChecker.py:
##########
@@ -29,6 +29,16 @@ def check_s3_server_object_data(self, container_name, test_data):
         (code, file_data) = self.container_communicator.execute_command(container_name, ["cat", s3_mock_dir + "/binaryData"])
         return code == 0 and file_data == test_data
 
+    @retry_check()
+    def check_s3_server_object_hash(self, container_name: str, expected_file_hash: str):
+        (code, output) = self.container_communicator.execute_command(container_name, ["find", "/s3mockroot/test_bucket", "-mindepth", "1", "-maxdepth", "1", "-type", "d"])
+        if code != 0:
+            return False
+        s3_mock_dir = output.strip()
+        (code, md5_output) = self.container_communicator.execute_command(container_name, ["md5sum", s3_mock_dir + "/binaryData"])
+        file_hash = md5_output.split(' ')[0].strip()
+        return code == 0 and file_hash == expected_file_hash

Review Comment:
   Can `md5_output` be `None` if `code != 0`?  It may be safer to check `code` first, return if non-zero, and then compute and check `file_hash`.



##########
extensions/aws/s3/MultipartUploadStateStorage.cpp:
##########
@@ -0,0 +1,165 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "MultipartUploadStateStorage.h"
+
+#include <unordered_map>
+
+#include "utils/StringUtils.h"
+
+namespace org::apache::nifi::minifi::aws::s3 {
+
+MultipartUploadStateStorage::MultipartUploadStateStorage(const std::string& state_directory, const std::string& state_id) {
+  if (state_directory.empty()) {
+    char format[] = "/var/tmp/nifi-minifi-cpp.s3-multipart-upload.XXXXXX";
+    state_file_path_ = minifi::utils::file::FileUtils::create_temp_directory(format);
+  } else {
+    state_file_path_ = std::filesystem::path(state_directory) / std::string(state_id + "-s3-multipart-upload-state.properties");
+    if (!std::filesystem::exists(state_file_path_)) {
+      std::filesystem::create_directories(state_file_path_.parent_path());
+      std::ofstream ofs(state_file_path_);

Review Comment:
   Why do we create this zero-length file here, and it it's needed, then why isn't it needed in the `state_directory.empty()` branch?



##########
libminifi/include/utils/TimeUtil.h:
##########
@@ -189,6 +189,20 @@ inline bool unit_matches<std::chrono::days>(const std::string& unit) {
   return unit == "d" || unit == "day" || unit == "days";
 }
 
+template<>
+inline bool unit_matches<std::chrono::weeks>(const std::string& unit) {
+  return unit == "w" || unit == "wk" || unit == "wks" || unit == "week" || unit == "weeks";
+}
+
+template<>
+inline bool unit_matches<std::chrono::months>(const std::string& unit) {
+  return unit == "month" || unit == "months";
+}
+
+template<>
+inline bool unit_matches<std::chrono::years>(const std::string& unit) {
+  return unit == "y" || unit == "year" || unit == "years";
+}

Review Comment:
   we could add "mth", "mths", and "yr", "yrs"



##########
extensions/aws/s3/MultipartUploadStateStorage.cpp:
##########
@@ -0,0 +1,165 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "MultipartUploadStateStorage.h"
+
+#include <unordered_map>
+
+#include "utils/StringUtils.h"
+
+namespace org::apache::nifi::minifi::aws::s3 {
+
+MultipartUploadStateStorage::MultipartUploadStateStorage(const std::string& state_directory, const std::string& state_id) {
+  if (state_directory.empty()) {
+    char format[] = "/var/tmp/nifi-minifi-cpp.s3-multipart-upload.XXXXXX";
+    state_file_path_ = minifi::utils::file::FileUtils::create_temp_directory(format);
+  } else {
+    state_file_path_ = std::filesystem::path(state_directory) / std::string(state_id + "-s3-multipart-upload-state.properties");
+    if (!std::filesystem::exists(state_file_path_)) {
+      std::filesystem::create_directories(state_file_path_.parent_path());
+      std::ofstream ofs(state_file_path_);
+    } else {
+      loadFile();
+    }
+  }
+}
+
+void MultipartUploadStateStorage::storeState(const std::string& bucket, const std::string& key, const MultipartUploadState& state) {
+  std::lock_guard<std::mutex> lock(state_mutex_);
+  std::string state_key = bucket + "/" + key;
+  state_[state_key + ".upload_id"] = state.upload_id;
+  state_[state_key + ".upload_time"] = std::to_string(state.upload_time.Millis());
+  state_[state_key + ".uploaded_parts"] = std::to_string(state.uploaded_parts);
+  state_[state_key + ".uploaded_size"] = std::to_string(state.uploaded_size);
+  state_[state_key + ".part_size"] = std::to_string(state.part_size);
+  state_[state_key + ".full_size"] = std::to_string(state.full_size);
+  state_[state_key + ".uploaded_etags"] = minifi::utils::StringUtils::join(";", state.uploaded_etags);
+  commitChanges();
+  logger_->log_debug("Updated multipart upload state with key %s", state_key);
+}
+
+std::optional<MultipartUploadState> MultipartUploadStateStorage::getState(const std::string& bucket, const std::string& key) const {
+  std::string state_key = bucket + "/" + key;
+  if (!state_.contains(state_key + ".upload_id")) {
+    logger_->log_warn("Failed to get state: Multipart upload state was not found for key '%s'", state_key);
+    return std::nullopt;
+  }
+
+  std::lock_guard<std::mutex> lock(state_mutex_);
+  MultipartUploadState state;
+  state.upload_id = state_.at(state_key + ".upload_id");
+
+  int64_t stored_upload_time = 0;
+  core::Property::StringToInt(state_.at(state_key + ".upload_time"), stored_upload_time);
+  state.upload_time = Aws::Utils::DateTime(stored_upload_time);
+
+  core::Property::StringToInt(state_.at(state_key + ".uploaded_parts"), state.uploaded_parts);
+  core::Property::StringToInt(state_.at(state_key + ".uploaded_size"), state.uploaded_size);
+  core::Property::StringToInt(state_.at(state_key + ".part_size"), state.part_size);
+  core::Property::StringToInt(state_.at(state_key + ".full_size"), state.full_size);
+  state.uploaded_etags = minifi::utils::StringUtils::splitAndTrimRemovingEmpty(state_.at(state_key + ".uploaded_etags"), ";");
+  return state;
+}
+
+void MultipartUploadStateStorage::removeState(const std::string& bucket, const std::string& key) {
+  std::lock_guard<std::mutex> lock(state_mutex_);
+  std::string state_key = bucket + "/" + key;
+  if (!state_.contains(state_key + ".upload_id")) {
+    logger_->log_warn("Multipart upload state was not found for key '%s'", state_key);
+    return;
+  }
+
+  removeKey(state_key);
+  commitChanges();
+  logger_->log_debug("Removed multipart upload state with key %s", state_key);
+}
+
+void MultipartUploadStateStorage::removeAgedStates(std::chrono::milliseconds multipart_upload_max_age_threshold) {
+  std::lock_guard<std::mutex> lock(state_mutex_);
+  auto age_off_time = Aws::Utils::DateTime::Now() - multipart_upload_max_age_threshold;
+
+  std::vector<std::string> keys_to_remove;
+  for (const auto& [property_key, value] : state_) {
+    if (!minifi::utils::StringUtils::endsWith(property_key, ".upload_time")) {
+      continue;
+    }
+    if (!state_.contains(property_key)) {
+      logger_->log_error("Could not retrieve value for multipart upload cache key '%s'", property_key);
+      continue;
+    }
+    int64_t stored_upload_time{};
+    if (!core::Property::StringToInt(value, stored_upload_time)) {
+      logger_->log_error("Multipart upload cache key '%s' has invalid value '%s'", property_key, value);
+      continue;
+    }
+    auto upload_time = Aws::Utils::DateTime(stored_upload_time);
+    if (upload_time < age_off_time) {
+      auto state_key_and_property_name = minifi::utils::StringUtils::split(property_key, ".");
+      if (state_key_and_property_name.size() < 2) {
+        logger_->log_error("Invalid property '%s'", property_key);
+        continue;
+      }

Review Comment:
   This error condition is not possible, since we know that `property_key` ends with `".upload_time"`.  Maybe we could check instead that the state key is non-empty, but I don't think that's needed, either.



##########
extensions/aws/s3/S3Wrapper.cpp:
##########
@@ -75,42 +69,155 @@ std::string S3Wrapper::getEncryptionString(Aws::S3::Model::ServerSideEncryption
   return "";
 }
 
-std::optional<PutObjectResult> S3Wrapper::putObject(const PutObjectRequestParameters& put_object_params, const std::shared_ptr<Aws::IOStream>& data_stream) {
-  Aws::S3::Model::PutObjectRequest request;
-  request.SetBucket(put_object_params.bucket);
-  request.SetKey(put_object_params.object_key);
-  request.SetStorageClass(STORAGE_CLASS_MAP.at(put_object_params.storage_class));
-  request.SetServerSideEncryption(SERVER_SIDE_ENCRYPTION_MAP.at(put_object_params.server_side_encryption));
-  request.SetContentType(put_object_params.content_type);
-  request.SetMetadata(put_object_params.user_metadata_map);
+std::shared_ptr<Aws::StringStream> S3Wrapper::readFlowFileStream(const std::shared_ptr<io::InputStream>& stream, uint64_t read_limit, uint64_t& read_size_out) {
+  std::vector<std::byte> buffer;
+  buffer.resize(BUFFER_SIZE);
+  auto data_stream = std::make_shared<Aws::StringStream>();
+  uint64_t read_size = 0;
+  while (read_size < read_limit) {
+    const auto next_read_size = (std::min)(read_limit - read_size, BUFFER_SIZE);
+    const auto read_ret = stream->read(gsl::make_span(buffer).subspan(0, next_read_size));
+    if (io::isError(read_ret)) {
+      throw StreamReadException("Reading flow file inputstream failed!");
+    }
+    if (read_ret > 0) {
+      data_stream->write(reinterpret_cast<char*>(buffer.data()), gsl::narrow<std::streamsize>(next_read_size));
+      read_size += read_ret;
+    } else {
+      break;
+    }
+  }
+  read_size_out = read_size;
+  return data_stream;
+}
+
+std::optional<PutObjectResult> S3Wrapper::putObject(const PutObjectRequestParameters& put_object_params, const std::shared_ptr<io::InputStream>& stream, uint64_t flow_size) {
+  uint64_t read_size{};
+  auto data_stream = readFlowFileStream(stream, flow_size, read_size);
+  auto request = createPutObjectRequest<Aws::S3::Model::PutObjectRequest>(put_object_params);
   request.SetBody(data_stream);
-  request.SetGrantFullControl(put_object_params.fullcontrol_user_list);
-  request.SetGrantRead(put_object_params.read_permission_user_list);
-  request.SetGrantReadACP(put_object_params.read_acl_user_list);
-  request.SetGrantWriteACP(put_object_params.write_acl_user_list);
-  setCannedAcl(request, put_object_params.canned_acl);
 
   auto aws_result = request_sender_->sendPutObjectRequest(request, put_object_params.credentials, put_object_params.client_config, put_object_params.use_virtual_addressing);
   if (!aws_result) {
     return std::nullopt;
   }
 
-  PutObjectResult result;
-  // Etags are returned by AWS in quoted form that should be removed
-  result.etag = minifi::utils::StringUtils::removeFramingCharacters(aws_result->GetETag(), '"');
-  result.version = aws_result->GetVersionId();
+  return createPutObjectResult(*aws_result);
+}
 
-  // GetExpiration returns a string pair with a date and a ruleid in 'expiry-date=\"<DATE>\", rule-id=\"<RULEID>\"' format
-  // s3.expiration only needs the date member of this pair
-  result.expiration = getExpiration(aws_result->GetExpiration()).expiration_time;
-  result.ssealgorithm = getEncryptionString(aws_result->GetServerSideEncryption());
+std::optional<S3Wrapper::UploadPartsResult> S3Wrapper::uploadParts(const PutObjectRequestParameters& put_object_params, const std::shared_ptr<io::InputStream>& stream,
+    MultipartUploadState upload_state) {
+  stream->seek(upload_state.uploaded_size);
+  S3Wrapper::UploadPartsResult result;
+  result.upload_id = upload_state.upload_id;
+  result.part_etags = upload_state.uploaded_etags;
+  const auto flow_size = upload_state.full_size - upload_state.uploaded_size;
+  const size_t part_count = flow_size % upload_state.part_size == 0 ? flow_size / upload_state.part_size : flow_size / upload_state.part_size + 1;

Review Comment:
   I think this would be more readable:
   ```suggestion
     const auto div_ceil = [](size_t n, size_t d) { if (n % d == 0) return n / d; else return n / d + 1; };
     const size_t part_count = div_ceil(flow_size, upload_state.part_size);
   ```



##########
extensions/aws/s3/S3Wrapper.cpp:
##########
@@ -75,42 +69,155 @@ std::string S3Wrapper::getEncryptionString(Aws::S3::Model::ServerSideEncryption
   return "";
 }
 
-std::optional<PutObjectResult> S3Wrapper::putObject(const PutObjectRequestParameters& put_object_params, const std::shared_ptr<Aws::IOStream>& data_stream) {
-  Aws::S3::Model::PutObjectRequest request;
-  request.SetBucket(put_object_params.bucket);
-  request.SetKey(put_object_params.object_key);
-  request.SetStorageClass(STORAGE_CLASS_MAP.at(put_object_params.storage_class));
-  request.SetServerSideEncryption(SERVER_SIDE_ENCRYPTION_MAP.at(put_object_params.server_side_encryption));
-  request.SetContentType(put_object_params.content_type);
-  request.SetMetadata(put_object_params.user_metadata_map);
+std::shared_ptr<Aws::StringStream> S3Wrapper::readFlowFileStream(const std::shared_ptr<io::InputStream>& stream, uint64_t read_limit, uint64_t& read_size_out) {
+  std::vector<std::byte> buffer;
+  buffer.resize(BUFFER_SIZE);
+  auto data_stream = std::make_shared<Aws::StringStream>();
+  uint64_t read_size = 0;
+  while (read_size < read_limit) {
+    const auto next_read_size = (std::min)(read_limit - read_size, BUFFER_SIZE);
+    const auto read_ret = stream->read(gsl::make_span(buffer).subspan(0, next_read_size));
+    if (io::isError(read_ret)) {
+      throw StreamReadException("Reading flow file inputstream failed!");
+    }
+    if (read_ret > 0) {
+      data_stream->write(reinterpret_cast<char*>(buffer.data()), gsl::narrow<std::streamsize>(next_read_size));
+      read_size += read_ret;
+    } else {
+      break;
+    }
+  }
+  read_size_out = read_size;
+  return data_stream;
+}
+
+std::optional<PutObjectResult> S3Wrapper::putObject(const PutObjectRequestParameters& put_object_params, const std::shared_ptr<io::InputStream>& stream, uint64_t flow_size) {
+  uint64_t read_size{};
+  auto data_stream = readFlowFileStream(stream, flow_size, read_size);
+  auto request = createPutObjectRequest<Aws::S3::Model::PutObjectRequest>(put_object_params);
   request.SetBody(data_stream);
-  request.SetGrantFullControl(put_object_params.fullcontrol_user_list);
-  request.SetGrantRead(put_object_params.read_permission_user_list);
-  request.SetGrantReadACP(put_object_params.read_acl_user_list);
-  request.SetGrantWriteACP(put_object_params.write_acl_user_list);
-  setCannedAcl(request, put_object_params.canned_acl);
 
   auto aws_result = request_sender_->sendPutObjectRequest(request, put_object_params.credentials, put_object_params.client_config, put_object_params.use_virtual_addressing);
   if (!aws_result) {
     return std::nullopt;
   }
 
-  PutObjectResult result;
-  // Etags are returned by AWS in quoted form that should be removed
-  result.etag = minifi::utils::StringUtils::removeFramingCharacters(aws_result->GetETag(), '"');
-  result.version = aws_result->GetVersionId();
+  return createPutObjectResult(*aws_result);
+}
 
-  // GetExpiration returns a string pair with a date and a ruleid in 'expiry-date=\"<DATE>\", rule-id=\"<RULEID>\"' format
-  // s3.expiration only needs the date member of this pair
-  result.expiration = getExpiration(aws_result->GetExpiration()).expiration_time;
-  result.ssealgorithm = getEncryptionString(aws_result->GetServerSideEncryption());
+std::optional<S3Wrapper::UploadPartsResult> S3Wrapper::uploadParts(const PutObjectRequestParameters& put_object_params, const std::shared_ptr<io::InputStream>& stream,
+    MultipartUploadState upload_state) {
+  stream->seek(upload_state.uploaded_size);
+  S3Wrapper::UploadPartsResult result;
+  result.upload_id = upload_state.upload_id;
+  result.part_etags = upload_state.uploaded_etags;
+  const auto flow_size = upload_state.full_size - upload_state.uploaded_size;
+  const size_t part_count = flow_size % upload_state.part_size == 0 ? flow_size / upload_state.part_size : flow_size / upload_state.part_size + 1;
+  size_t total_read = 0;
+  const size_t start_part = upload_state.uploaded_parts + 1;
+  const size_t last_part = start_part + part_count - 1;
+  for (size_t i = start_part; i <= last_part; ++i) {

Review Comment:
   this loop body is long enough (for me) to forget by the end of it what `i` was, so I would rename it to something like `part_number`



##########
extensions/aws/s3/S3Wrapper.cpp:
##########
@@ -297,4 +404,57 @@ FetchObjectResult S3Wrapper::fillFetchObjectResult(const GetObjectRequestParamet
   return result;
 }
 
+void S3Wrapper::addListMultipartUploadResults(const Aws::Vector<Aws::S3::Model::MultipartUpload>& uploads, std::optional<std::chrono::milliseconds> max_upload_age,
+    std::vector<MultipartUpload>& filtered_uploads) {
+  const auto now = Aws::Utils::DateTime::Now();
+  for (const auto& upload : uploads) {
+    if (max_upload_age && now - upload.GetInitiated() <= *max_upload_age) {
+      logger_->log_debug("Multipart upload with key '%s' and upload id '%s' did not meet the age limit", upload.GetKey(), upload.GetUploadId());
+      continue;
+    }
+
+    MultipartUpload filtered_upload;
+    filtered_upload.key = upload.GetKey();
+    filtered_upload.upload_id = upload.GetUploadId();
+    filtered_uploads.push_back(filtered_upload);
+  }
+}
+
+std::optional<std::vector<MultipartUpload>> S3Wrapper::listMultipartUploads(const ListMultipartUploadsRequestParameters& params) {
+  std::vector<MultipartUpload> result;
+  std::optional<Aws::S3::Model::ListMultipartUploadsResult> aws_result;
+  Aws::S3::Model::ListMultipartUploadsRequest request;
+  request.SetBucket(params.bucket);
+  do {
+    aws_result = request_sender_->sendListMultipartUploadsRequest(request, params.credentials, params.client_config, params.use_virtual_addressing);
+    if (!aws_result) {
+      return std::nullopt;
+    }
+    const auto& uploads = aws_result->GetUploads();
+    logger_->log_debug("AWS S3 List operation returned %zu multipart uploads. This result is%s truncated.", uploads.size(), aws_result->GetIsTruncated() ? "" : " not");
+    addListMultipartUploadResults(uploads, params.upload_max_age, result);
+    if (aws_result->GetIsTruncated()) {
+      request.SetKeyMarker(aws_result->GetNextKeyMarker());
+    }
+  } while (aws_result->GetIsTruncated());
+
+  return result;
+}
+
+bool S3Wrapper::abortMultipartUpload(const AbortMultipartUploadRequestParameters& params) {
+  Aws::S3::Model::AbortMultipartUploadRequest request;
+  request.WithBucket(params.bucket)
+    .WithKey(params.key)
+    .WithUploadId(params.upload_id);
+  return request_sender_->sendAbortMultipartUploadRequest(request, params.credentials, params.client_config, params.use_virtual_addressing);
+}
+
+void S3Wrapper::initailizeMultipartUploadStateStorage(const std::string& multipart_temp_dir, const std::string& state_id) {

Review Comment:
   typo: "initailize" -> "initialize"



##########
extensions/aws/s3/S3Wrapper.cpp:
##########
@@ -75,42 +69,155 @@ std::string S3Wrapper::getEncryptionString(Aws::S3::Model::ServerSideEncryption
   return "";
 }
 
-std::optional<PutObjectResult> S3Wrapper::putObject(const PutObjectRequestParameters& put_object_params, const std::shared_ptr<Aws::IOStream>& data_stream) {
-  Aws::S3::Model::PutObjectRequest request;
-  request.SetBucket(put_object_params.bucket);
-  request.SetKey(put_object_params.object_key);
-  request.SetStorageClass(STORAGE_CLASS_MAP.at(put_object_params.storage_class));
-  request.SetServerSideEncryption(SERVER_SIDE_ENCRYPTION_MAP.at(put_object_params.server_side_encryption));
-  request.SetContentType(put_object_params.content_type);
-  request.SetMetadata(put_object_params.user_metadata_map);
+std::shared_ptr<Aws::StringStream> S3Wrapper::readFlowFileStream(const std::shared_ptr<io::InputStream>& stream, uint64_t read_limit, uint64_t& read_size_out) {
+  std::vector<std::byte> buffer;
+  buffer.resize(BUFFER_SIZE);

Review Comment:
   why not 
   ```suggestion
     std::array<std::byte, BUFFER_SIZE> buffer{};
   ```
   ?



##########
extensions/aws/s3/S3Wrapper.cpp:
##########
@@ -297,4 +404,57 @@ FetchObjectResult S3Wrapper::fillFetchObjectResult(const GetObjectRequestParamet
   return result;
 }
 
+void S3Wrapper::addListMultipartUploadResults(const Aws::Vector<Aws::S3::Model::MultipartUpload>& uploads, std::optional<std::chrono::milliseconds> max_upload_age,
+    std::vector<MultipartUpload>& filtered_uploads) {
+  const auto now = Aws::Utils::DateTime::Now();
+  for (const auto& upload : uploads) {
+    if (max_upload_age && now - upload.GetInitiated() <= *max_upload_age) {
+      logger_->log_debug("Multipart upload with key '%s' and upload id '%s' did not meet the age limit", upload.GetKey(), upload.GetUploadId());
+      continue;
+    }

Review Comment:
   Isn't this backwards?  We want to list the uploads where `upload.GetInitiated()` is `>= now - *max_upload_age`, and skip those where it is `<` (ie. which are too old).



##########
extensions/aws/s3/S3Wrapper.cpp:
##########
@@ -75,42 +69,155 @@ std::string S3Wrapper::getEncryptionString(Aws::S3::Model::ServerSideEncryption
   return "";
 }
 
-std::optional<PutObjectResult> S3Wrapper::putObject(const PutObjectRequestParameters& put_object_params, const std::shared_ptr<Aws::IOStream>& data_stream) {
-  Aws::S3::Model::PutObjectRequest request;
-  request.SetBucket(put_object_params.bucket);
-  request.SetKey(put_object_params.object_key);
-  request.SetStorageClass(STORAGE_CLASS_MAP.at(put_object_params.storage_class));
-  request.SetServerSideEncryption(SERVER_SIDE_ENCRYPTION_MAP.at(put_object_params.server_side_encryption));
-  request.SetContentType(put_object_params.content_type);
-  request.SetMetadata(put_object_params.user_metadata_map);
+std::shared_ptr<Aws::StringStream> S3Wrapper::readFlowFileStream(const std::shared_ptr<io::InputStream>& stream, uint64_t read_limit, uint64_t& read_size_out) {
+  std::vector<std::byte> buffer;
+  buffer.resize(BUFFER_SIZE);
+  auto data_stream = std::make_shared<Aws::StringStream>();
+  uint64_t read_size = 0;
+  while (read_size < read_limit) {
+    const auto next_read_size = (std::min)(read_limit - read_size, BUFFER_SIZE);
+    const auto read_ret = stream->read(gsl::make_span(buffer).subspan(0, next_read_size));
+    if (io::isError(read_ret)) {
+      throw StreamReadException("Reading flow file inputstream failed!");
+    }
+    if (read_ret > 0) {
+      data_stream->write(reinterpret_cast<char*>(buffer.data()), gsl::narrow<std::streamsize>(next_read_size));
+      read_size += read_ret;
+    } else {
+      break;
+    }
+  }
+  read_size_out = read_size;
+  return data_stream;
+}
+
+std::optional<PutObjectResult> S3Wrapper::putObject(const PutObjectRequestParameters& put_object_params, const std::shared_ptr<io::InputStream>& stream, uint64_t flow_size) {
+  uint64_t read_size{};
+  auto data_stream = readFlowFileStream(stream, flow_size, read_size);
+  auto request = createPutObjectRequest<Aws::S3::Model::PutObjectRequest>(put_object_params);
   request.SetBody(data_stream);
-  request.SetGrantFullControl(put_object_params.fullcontrol_user_list);
-  request.SetGrantRead(put_object_params.read_permission_user_list);
-  request.SetGrantReadACP(put_object_params.read_acl_user_list);
-  request.SetGrantWriteACP(put_object_params.write_acl_user_list);
-  setCannedAcl(request, put_object_params.canned_acl);
 
   auto aws_result = request_sender_->sendPutObjectRequest(request, put_object_params.credentials, put_object_params.client_config, put_object_params.use_virtual_addressing);
   if (!aws_result) {
     return std::nullopt;
   }
 
-  PutObjectResult result;
-  // Etags are returned by AWS in quoted form that should be removed
-  result.etag = minifi::utils::StringUtils::removeFramingCharacters(aws_result->GetETag(), '"');
-  result.version = aws_result->GetVersionId();
+  return createPutObjectResult(*aws_result);
+}
 
-  // GetExpiration returns a string pair with a date and a ruleid in 'expiry-date=\"<DATE>\", rule-id=\"<RULEID>\"' format
-  // s3.expiration only needs the date member of this pair
-  result.expiration = getExpiration(aws_result->GetExpiration()).expiration_time;
-  result.ssealgorithm = getEncryptionString(aws_result->GetServerSideEncryption());
+std::optional<S3Wrapper::UploadPartsResult> S3Wrapper::uploadParts(const PutObjectRequestParameters& put_object_params, const std::shared_ptr<io::InputStream>& stream,
+    MultipartUploadState upload_state) {
+  stream->seek(upload_state.uploaded_size);
+  S3Wrapper::UploadPartsResult result;
+  result.upload_id = upload_state.upload_id;
+  result.part_etags = upload_state.uploaded_etags;
+  const auto flow_size = upload_state.full_size - upload_state.uploaded_size;
+  const size_t part_count = flow_size % upload_state.part_size == 0 ? flow_size / upload_state.part_size : flow_size / upload_state.part_size + 1;
+  size_t total_read = 0;
+  const size_t start_part = upload_state.uploaded_parts + 1;
+  const size_t last_part = start_part + part_count - 1;
+  for (size_t i = start_part; i <= last_part; ++i) {
+    uint64_t read_size{};
+    const auto remaining = flow_size - total_read;
+    const auto next_read_size = remaining < upload_state.part_size ? remaining : upload_state.part_size;
+    auto stream_ptr = readFlowFileStream(stream, next_read_size, read_size);
+    total_read += read_size;
+
+    Aws::S3::Model::UploadPartRequest upload_part_request;
+    upload_part_request.WithBucket(put_object_params.bucket)
+      .WithKey(put_object_params.object_key)
+      .WithPartNumber(i)
+      .WithUploadId(upload_state.upload_id);
+    upload_part_request.SetBody(stream_ptr);
+
+    Aws::Utils::ByteBuffer part_md5(Aws::Utils::HashingUtils::CalculateMD5(*stream_ptr));
+    upload_part_request.SetContentMD5(Aws::Utils::HashingUtils::Base64Encode(part_md5));
+
+    auto upload_part_result = request_sender_->sendUploadPartRequest(upload_part_request, put_object_params.credentials, put_object_params.client_config, put_object_params.use_virtual_addressing);
+    if (!upload_part_result) {
+      logger_->log_error("Failed to upload part %d of %d of S3 object with key '%s'", i, last_part, put_object_params.object_key);
+      return std::nullopt;
+    }
+    result.part_etags.push_back(upload_part_result->GetETag());
+    upload_state.uploaded_etags.push_back(upload_part_result->GetETag());
+    upload_state.uploaded_parts += 1;
+    upload_state.uploaded_size += read_size;
+    multipart_upload_storage_->storeState(put_object_params.bucket, put_object_params.object_key, upload_state);
+    logger_->log_info("Uploaded part %d of %d S3 object with key '%s'", i, last_part, put_object_params.object_key);
+  }
+
+  multipart_upload_storage_->removeState(put_object_params.bucket, put_object_params.object_key);
   return result;
 }
 
+std::optional<Aws::S3::Model::CompleteMultipartUploadResult> S3Wrapper::completeMultipartUpload(const PutObjectRequestParameters& put_object_params,
+    const S3Wrapper::UploadPartsResult& upload_parts_result) {
+  Aws::S3::Model::CompleteMultipartUploadRequest complete_multipart_upload_request;
+  complete_multipart_upload_request.WithBucket(put_object_params.bucket)
+    .WithKey(put_object_params.object_key)
+    .WithUploadId(upload_parts_result.upload_id);
+
+  Aws::S3::Model::CompletedMultipartUpload completed_multipart_upload;
+  for (size_t i = 0; i < upload_parts_result.part_etags.size(); ++i) {
+    Aws::S3::Model::CompletedPart part;
+    part.WithETag(upload_parts_result.part_etags[i])
+      .WithPartNumber(i + 1);
+    completed_multipart_upload.AddParts(part);
+  }
+
+  complete_multipart_upload_request.SetMultipartUpload(completed_multipart_upload);
+
+  return request_sender_->sendCompleteMultipartUploadRequest(complete_multipart_upload_request, put_object_params.credentials,
+    put_object_params.client_config, put_object_params.use_virtual_addressing);
+}
+
+bool S3Wrapper::multipartUploadExistsInS3(const PutObjectRequestParameters& put_object_params) {
+  ListMultipartUploadsRequestParameters params(put_object_params.credentials, put_object_params.client_config);
+  params.bucket = put_object_params.bucket;
+  auto pending_uploads = listMultipartUploads(params);
+  if (!pending_uploads) {
+    return false;
+  }
+
+  return ranges::find_if(*pending_uploads, [&](const auto& upload) { return upload.key == put_object_params.object_key; }) != pending_uploads->end();

Review Comment:
   slightly shorter:
   ```suggestion
     return ranges::any_of(*pending_uploads, [&](const auto& upload) { return upload.key == put_object_params.object_key; });
   ```



##########
extensions/aws/s3/S3Wrapper.cpp:
##########
@@ -75,42 +69,155 @@ std::string S3Wrapper::getEncryptionString(Aws::S3::Model::ServerSideEncryption
   return "";
 }
 
-std::optional<PutObjectResult> S3Wrapper::putObject(const PutObjectRequestParameters& put_object_params, const std::shared_ptr<Aws::IOStream>& data_stream) {
-  Aws::S3::Model::PutObjectRequest request;
-  request.SetBucket(put_object_params.bucket);
-  request.SetKey(put_object_params.object_key);
-  request.SetStorageClass(STORAGE_CLASS_MAP.at(put_object_params.storage_class));
-  request.SetServerSideEncryption(SERVER_SIDE_ENCRYPTION_MAP.at(put_object_params.server_side_encryption));
-  request.SetContentType(put_object_params.content_type);
-  request.SetMetadata(put_object_params.user_metadata_map);
+std::shared_ptr<Aws::StringStream> S3Wrapper::readFlowFileStream(const std::shared_ptr<io::InputStream>& stream, uint64_t read_limit, uint64_t& read_size_out) {
+  std::vector<std::byte> buffer;
+  buffer.resize(BUFFER_SIZE);
+  auto data_stream = std::make_shared<Aws::StringStream>();
+  uint64_t read_size = 0;
+  while (read_size < read_limit) {
+    const auto next_read_size = (std::min)(read_limit - read_size, BUFFER_SIZE);
+    const auto read_ret = stream->read(gsl::make_span(buffer).subspan(0, next_read_size));
+    if (io::isError(read_ret)) {
+      throw StreamReadException("Reading flow file inputstream failed!");
+    }
+    if (read_ret > 0) {
+      data_stream->write(reinterpret_cast<char*>(buffer.data()), gsl::narrow<std::streamsize>(next_read_size));

Review Comment:
   should this be
   ```suggestion
         data_stream->write(reinterpret_cast<char*>(buffer.data()), gsl::narrow<std::streamsize>(read_ret));
   ```
   ?



##########
extensions/aws/s3/MultipartUploadStateStorage.cpp:
##########
@@ -0,0 +1,165 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "MultipartUploadStateStorage.h"
+
+#include <unordered_map>
+
+#include "utils/StringUtils.h"
+
+namespace org::apache::nifi::minifi::aws::s3 {
+
+MultipartUploadStateStorage::MultipartUploadStateStorage(const std::string& state_directory, const std::string& state_id) {
+  if (state_directory.empty()) {
+    char format[] = "/var/tmp/nifi-minifi-cpp.s3-multipart-upload.XXXXXX";
+    state_file_path_ = minifi::utils::file::FileUtils::create_temp_directory(format);
+  } else {
+    state_file_path_ = std::filesystem::path(state_directory) / std::string(state_id + "-s3-multipart-upload-state.properties");
+    if (!std::filesystem::exists(state_file_path_)) {
+      std::filesystem::create_directories(state_file_path_.parent_path());
+      std::ofstream ofs(state_file_path_);
+    } else {
+      loadFile();
+    }
+  }
+}
+
+void MultipartUploadStateStorage::storeState(const std::string& bucket, const std::string& key, const MultipartUploadState& state) {
+  std::lock_guard<std::mutex> lock(state_mutex_);
+  std::string state_key = bucket + "/" + key;
+  state_[state_key + ".upload_id"] = state.upload_id;
+  state_[state_key + ".upload_time"] = std::to_string(state.upload_time.Millis());
+  state_[state_key + ".uploaded_parts"] = std::to_string(state.uploaded_parts);
+  state_[state_key + ".uploaded_size"] = std::to_string(state.uploaded_size);
+  state_[state_key + ".part_size"] = std::to_string(state.part_size);
+  state_[state_key + ".full_size"] = std::to_string(state.full_size);
+  state_[state_key + ".uploaded_etags"] = minifi::utils::StringUtils::join(";", state.uploaded_etags);
+  commitChanges();
+  logger_->log_debug("Updated multipart upload state with key %s", state_key);
+}
+
+std::optional<MultipartUploadState> MultipartUploadStateStorage::getState(const std::string& bucket, const std::string& key) const {
+  std::string state_key = bucket + "/" + key;
+  if (!state_.contains(state_key + ".upload_id")) {
+    logger_->log_warn("Failed to get state: Multipart upload state was not found for key '%s'", state_key);
+    return std::nullopt;
+  }
+
+  std::lock_guard<std::mutex> lock(state_mutex_);
+  MultipartUploadState state;
+  state.upload_id = state_.at(state_key + ".upload_id");
+
+  int64_t stored_upload_time = 0;
+  core::Property::StringToInt(state_.at(state_key + ".upload_time"), stored_upload_time);
+  state.upload_time = Aws::Utils::DateTime(stored_upload_time);
+
+  core::Property::StringToInt(state_.at(state_key + ".uploaded_parts"), state.uploaded_parts);
+  core::Property::StringToInt(state_.at(state_key + ".uploaded_size"), state.uploaded_size);
+  core::Property::StringToInt(state_.at(state_key + ".part_size"), state.part_size);
+  core::Property::StringToInt(state_.at(state_key + ".full_size"), state.full_size);
+  state.uploaded_etags = minifi::utils::StringUtils::splitAndTrimRemovingEmpty(state_.at(state_key + ".uploaded_etags"), ";");
+  return state;
+}
+
+void MultipartUploadStateStorage::removeState(const std::string& bucket, const std::string& key) {
+  std::lock_guard<std::mutex> lock(state_mutex_);
+  std::string state_key = bucket + "/" + key;
+  if (!state_.contains(state_key + ".upload_id")) {
+    logger_->log_warn("Multipart upload state was not found for key '%s'", state_key);
+    return;
+  }
+
+  removeKey(state_key);
+  commitChanges();
+  logger_->log_debug("Removed multipart upload state with key %s", state_key);
+}
+
+void MultipartUploadStateStorage::removeAgedStates(std::chrono::milliseconds multipart_upload_max_age_threshold) {
+  std::lock_guard<std::mutex> lock(state_mutex_);
+  auto age_off_time = Aws::Utils::DateTime::Now() - multipart_upload_max_age_threshold;
+
+  std::vector<std::string> keys_to_remove;
+  for (const auto& [property_key, value] : state_) {
+    if (!minifi::utils::StringUtils::endsWith(property_key, ".upload_time")) {
+      continue;
+    }
+    if (!state_.contains(property_key)) {
+      logger_->log_error("Could not retrieve value for multipart upload cache key '%s'", property_key);
+      continue;
+    }

Review Comment:
   I don't think this check is necessary:  `property_key` came up as a key while iterating over `state_`, so it must exist as a key in `state_`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org