You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by "lordgamez (via GitHub)" <gi...@apache.org> on 2023/06/07 16:21:42 UTC

[GitHub] [nifi-minifi-cpp] lordgamez opened a new pull request, #1586: MINIFICPP-2127 Add multipart upload support for PutS3Object processor

lordgamez opened a new pull request, #1586:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1586

   https://issues.apache.org/jira/browse/MINIFICPP-2127
   
   ------------
   Thank you for submitting a contribution to Apache NiFi - MiNiFi C++.
   
   In order to streamline the review of the contribution we ask you
   to ensure the following steps have been taken:
   
   ### For all changes:
   - [ ] Is there a JIRA ticket associated with this PR? Is it referenced
        in the commit message?
   
   - [ ] Does your PR title start with MINIFICPP-XXXX where XXXX is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character.
   
   - [ ] Has your PR been rebased against the latest commit within the target branch (typically main)?
   
   - [ ] Is your initial contribution a single, squashed commit?
   
   ### For code changes:
   - [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)?
   - [ ] If applicable, have you updated the LICENSE file?
   - [ ] If applicable, have you updated the NOTICE file?
   
   ### For documentation related changes:
   - [ ] Have you ensured that format looks appropriate for the output in which it is rendered?
   
   ### Note:
   Please ensure that once the PR is submitted, you check GitHub Actions CI results for build issues and submit an update to your PR as soon as possible.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a diff in pull request #1586: MINIFICPP-2127 Add multipart upload support for PutS3Object processor

Posted by "adamdebreceni (via GitHub)" <gi...@apache.org>.
adamdebreceni commented on code in PR #1586:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1586#discussion_r1258161080


##########
extensions/aws/s3/MultipartUploadStateStorage.cpp:
##########
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "MultipartUploadStateStorage.h"
+
+#include "utils/StringUtils.h"
+
+namespace org::apache::nifi::minifi::aws::s3 {
+
+void MultipartUploadStateStorage::storeState(const std::string& bucket, const std::string& key, const MultipartUploadState& state) {
+  std::unordered_map<std::string, std::string> stored_state;
+  state_manager_->get(stored_state);
+  std::string state_key = bucket + "/" + key;
+  stored_state[state_key + ".upload_id"] = state.upload_id;
+  stored_state[state_key + ".upload_time"] = std::to_string(state.upload_time.Millis());
+  stored_state[state_key + ".uploaded_parts"] = std::to_string(state.uploaded_parts);
+  stored_state[state_key + ".uploaded_size"] = std::to_string(state.uploaded_size);
+  stored_state[state_key + ".part_size"] = std::to_string(state.part_size);
+  stored_state[state_key + ".full_size"] = std::to_string(state.full_size);
+  stored_state[state_key + ".uploaded_etags"] = minifi::utils::StringUtils::join(";", state.uploaded_etags);
+  state_manager_->set(stored_state);
+  state_manager_->commit();
+  state_manager_->persist();

Review Comment:
   sorry, I meant that we `get` modify then `set` the whole sequence should be synchronized, you are right that the `get` and `set` themselves are synchronized



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] lordgamez commented on a diff in pull request #1586: MINIFICPP-2127 Add multipart upload support for PutS3Object processor

Posted by "lordgamez (via GitHub)" <gi...@apache.org>.
lordgamez commented on code in PR #1586:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1586#discussion_r1258086012


##########
extensions/aws/s3/S3Wrapper.cpp:
##########
@@ -75,42 +69,160 @@ std::string S3Wrapper::getEncryptionString(Aws::S3::Model::ServerSideEncryption
   return "";
 }
 
-std::optional<PutObjectResult> S3Wrapper::putObject(const PutObjectRequestParameters& put_object_params, const std::shared_ptr<Aws::IOStream>& data_stream) {
-  Aws::S3::Model::PutObjectRequest request;
-  request.SetBucket(put_object_params.bucket);
-  request.SetKey(put_object_params.object_key);
-  request.SetStorageClass(STORAGE_CLASS_MAP.at(put_object_params.storage_class));
-  request.SetServerSideEncryption(SERVER_SIDE_ENCRYPTION_MAP.at(put_object_params.server_side_encryption));
-  request.SetContentType(put_object_params.content_type);
-  request.SetMetadata(put_object_params.user_metadata_map);
+std::shared_ptr<Aws::StringStream> S3Wrapper::readFlowFileStream(const std::shared_ptr<io::InputStream>& stream, uint64_t read_limit, uint64_t& read_size_out) {
+  std::array<std::byte, BUFFER_SIZE> buffer{};
+  auto data_stream = std::make_shared<Aws::StringStream>();
+  uint64_t read_size = 0;
+  while (read_size < read_limit) {
+    const auto next_read_size = (std::min)(read_limit - read_size, BUFFER_SIZE);
+    const auto read_ret = stream->read(gsl::make_span(buffer).subspan(0, next_read_size));
+    if (io::isError(read_ret)) {
+      throw StreamReadException("Reading flow file inputstream failed!");
+    }
+    if (read_ret > 0) {
+      data_stream->write(reinterpret_cast<char*>(buffer.data()), gsl::narrow<std::streamsize>(read_ret));
+      read_size += read_ret;
+    } else {
+      break;
+    }
+  }
+  read_size_out = read_size;
+  return data_stream;
+}
+
+std::optional<PutObjectResult> S3Wrapper::putObject(const PutObjectRequestParameters& put_object_params, const std::shared_ptr<io::InputStream>& stream, uint64_t flow_size) {
+  uint64_t read_size{};
+  auto data_stream = readFlowFileStream(stream, flow_size, read_size);
+  auto request = createPutObjectRequest<Aws::S3::Model::PutObjectRequest>(put_object_params);
   request.SetBody(data_stream);
-  request.SetGrantFullControl(put_object_params.fullcontrol_user_list);
-  request.SetGrantRead(put_object_params.read_permission_user_list);
-  request.SetGrantReadACP(put_object_params.read_acl_user_list);
-  request.SetGrantWriteACP(put_object_params.write_acl_user_list);
-  setCannedAcl(request, put_object_params.canned_acl);
 
   auto aws_result = request_sender_->sendPutObjectRequest(request, put_object_params.credentials, put_object_params.client_config, put_object_params.use_virtual_addressing);
   if (!aws_result) {
     return std::nullopt;
   }
 
-  PutObjectResult result;
-  // Etags are returned by AWS in quoted form that should be removed
-  result.etag = minifi::utils::StringUtils::removeFramingCharacters(aws_result->GetETag(), '"');
-  result.version = aws_result->GetVersionId();
+  return createPutObjectResult(*aws_result);
+}
 
-  // GetExpiration returns a string pair with a date and a ruleid in 'expiry-date=\"<DATE>\", rule-id=\"<RULEID>\"' format
-  // s3.expiration only needs the date member of this pair
-  result.expiration = getExpiration(aws_result->GetExpiration()).expiration_time;
-  result.ssealgorithm = getEncryptionString(aws_result->GetServerSideEncryption());
+std::optional<S3Wrapper::UploadPartsResult> S3Wrapper::uploadParts(const PutObjectRequestParameters& put_object_params, const std::shared_ptr<io::InputStream>& stream,
+    MultipartUploadState upload_state) {
+  stream->seek(upload_state.uploaded_size);
+  S3Wrapper::UploadPartsResult result;
+  result.upload_id = upload_state.upload_id;
+  result.part_etags = upload_state.uploaded_etags;
+  const auto flow_size = upload_state.full_size - upload_state.uploaded_size;
+  const auto div_ceil = [](size_t n, size_t d) {
+    if (n % d == 0)
+      return n / d;
+    else
+      return n / d + 1;
+  };
+  const size_t part_count = div_ceil(flow_size, upload_state.part_size);
+  size_t total_read = 0;
+  const size_t start_part = upload_state.uploaded_parts + 1;
+  const size_t last_part = start_part + part_count - 1;
+  for (size_t part_number = start_part; part_number <= last_part; ++part_number) {
+    uint64_t read_size{};
+    const auto remaining = flow_size - total_read;
+    const auto next_read_size = remaining < upload_state.part_size ? remaining : upload_state.part_size;

Review Comment:
   Updated in 7f27cd5bfdf869971ebab9f706b82405c0e16913



##########
extensions/aws/s3/MultipartUploadStateStorage.cpp:
##########
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "MultipartUploadStateStorage.h"
+
+#include "utils/StringUtils.h"
+
+namespace org::apache::nifi::minifi::aws::s3 {
+
+void MultipartUploadStateStorage::storeState(const std::string& bucket, const std::string& key, const MultipartUploadState& state) {
+  std::unordered_map<std::string, std::string> stored_state;
+  state_manager_->get(stored_state);
+  std::string state_key = bucket + "/" + key;
+  stored_state[state_key + ".upload_id"] = state.upload_id;
+  stored_state[state_key + ".upload_time"] = std::to_string(state.upload_time.Millis());
+  stored_state[state_key + ".uploaded_parts"] = std::to_string(state.uploaded_parts);
+  stored_state[state_key + ".uploaded_size"] = std::to_string(state.uploaded_size);
+  stored_state[state_key + ".part_size"] = std::to_string(state.part_size);
+  stored_state[state_key + ".full_size"] = std::to_string(state.full_size);
+  stored_state[state_key + ".uploaded_etags"] = minifi::utils::StringUtils::join(";", state.uploaded_etags);
+  state_manager_->set(stored_state);
+  state_manager_->commit();
+  state_manager_->persist();
+}
+
+std::optional<MultipartUploadState> MultipartUploadStateStorage::getState(const std::string& bucket, const std::string& key) const {
+  std::unordered_map<std::string, std::string> state_map;
+  if (!state_manager_->get(state_map)) {
+    logger_->log_warn("No previous multipart upload state was associated with this processor.");
+    return std::nullopt;
+  }
+  std::string state_key = bucket + "/" + key;
+  if (!state_map.contains(state_key + ".upload_id")) {
+    logger_->log_warn("Multipart upload state was not found for key '%s'", state_key);
+    return std::nullopt;
+  }
+
+  MultipartUploadState state;
+  state.upload_id = state_map[state_key + ".upload_id"];
+
+  int64_t stored_upload_time = 0;
+  core::Property::StringToInt(state_map[state_key + ".upload_time"], stored_upload_time);
+  state.upload_time = Aws::Utils::DateTime(stored_upload_time);
+
+  core::Property::StringToInt(state_map[state_key + ".uploaded_parts"], state.uploaded_parts);
+  core::Property::StringToInt(state_map[state_key + ".uploaded_size"], state.uploaded_size);
+  core::Property::StringToInt(state_map[state_key + ".part_size"], state.part_size);
+  core::Property::StringToInt(state_map[state_key + ".full_size"], state.full_size);
+  state.uploaded_etags = minifi::utils::StringUtils::splitAndTrimRemovingEmpty(state_map[state_key + ".uploaded_etags"], ";");
+  return state;
+}
+
+void MultipartUploadStateStorage::removeKey(const std::string& state_key, std::unordered_map<std::string, std::string>& state_map) {
+  state_map.erase(state_key + ".upload_id");
+  state_map.erase(state_key + ".upload_time");
+  state_map.erase(state_key + ".uploaded_parts");
+  state_map.erase(state_key + ".uploaded_size");
+  state_map.erase(state_key + ".part_size");
+  state_map.erase(state_key + ".full_size");
+  state_map.erase(state_key + ".uploaded_etags");
+}
+
+void MultipartUploadStateStorage::removeState(const std::string& bucket, const std::string& key) {
+  std::unordered_map<std::string, std::string> state_map;
+  if (!state_manager_->get(state_map)) {
+    logger_->log_warn("No previous multipart upload state was associated with this processor.");
+    return;
+  }
+  std::string state_key = bucket + "/" + key;
+  if (!state_map.contains(state_key + ".upload_id")) {
+    logger_->log_warn("Multipart upload state was not found for key '%s'", state_key);
+    return;
+  }
+
+  removeKey(state_key, state_map);
+  state_manager_->set(state_map);
+  state_manager_->commit();
+  state_manager_->persist();
+}
+
+void MultipartUploadStateStorage::removeAgedStates(std::chrono::milliseconds multipart_upload_max_age_threshold) {
+  std::unordered_map<std::string, std::string> state_map;
+  if (!state_manager_->get(state_map)) {
+    logger_->log_warn("No previous multipart upload state was associated with this processor.");
+    return;
+  }
+  auto age_off_time = Aws::Utils::DateTime::Now() - multipart_upload_max_age_threshold;
+
+  std::vector<std::string> keys_to_remove;
+  for (const auto& [property_key, value] : state_map) {
+    if (!minifi::utils::StringUtils::endsWith(property_key, ".upload_time")) {
+      continue;
+    }
+    int64_t stored_upload_time{};
+    if (!core::Property::StringToInt(value, stored_upload_time)) {
+      logger_->log_error("Multipart upload cache key '%s' has invalid value '%s'", property_key, value);
+      continue;
+    }
+    auto upload_time = Aws::Utils::DateTime(stored_upload_time);
+    if (upload_time < age_off_time) {
+      auto state_key_and_property_name = minifi::utils::StringUtils::split(property_key, ".");
+      keys_to_remove.push_back(state_key_and_property_name[0]);

Review Comment:
   Good point, updated in 7f27cd5bfdf869971ebab9f706b82405c0e16913



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] lordgamez commented on a diff in pull request #1586: MINIFICPP-2127 Add multipart upload support for PutS3Object processor

Posted by "lordgamez (via GitHub)" <gi...@apache.org>.
lordgamez commented on code in PR #1586:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1586#discussion_r1279479619


##########
extensions/aws/processors/PutS3Object.h:
##########
@@ -26,6 +26,8 @@
 #include <string>
 #include <utility>
 #include <vector>
+#include <chrono>
+#include <atomic>

Review Comment:
   Removed in 59dd271565f5ea1d64cce787dc29305fbe8667e3



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] lordgamez commented on pull request #1586: MINIFICPP-2127 Add multipart upload support for PutS3Object processor

Posted by "lordgamez (via GitHub)" <gi...@apache.org>.
lordgamez commented on PR #1586:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1586#issuecomment-1625231439

   @szaszm Changed the state manager to be able to use it for multipart upload states in b91b756565c30966ac69828115d5a3fc6b0908cb


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] lordgamez commented on a diff in pull request #1586: MINIFICPP-2127 Add multipart upload support for PutS3Object processor

Posted by "lordgamez (via GitHub)" <gi...@apache.org>.
lordgamez commented on code in PR #1586:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1586#discussion_r1259296115


##########
extensions/aws/s3/S3Wrapper.cpp:
##########
@@ -75,42 +71,158 @@ std::string S3Wrapper::getEncryptionString(Aws::S3::Model::ServerSideEncryption
   return "";
 }
 
-std::optional<PutObjectResult> S3Wrapper::putObject(const PutObjectRequestParameters& put_object_params, const std::shared_ptr<Aws::IOStream>& data_stream) {
-  Aws::S3::Model::PutObjectRequest request;
-  request.SetBucket(put_object_params.bucket);
-  request.SetKey(put_object_params.object_key);
-  request.SetStorageClass(STORAGE_CLASS_MAP.at(put_object_params.storage_class));
-  request.SetServerSideEncryption(SERVER_SIDE_ENCRYPTION_MAP.at(put_object_params.server_side_encryption));
-  request.SetContentType(put_object_params.content_type);
-  request.SetMetadata(put_object_params.user_metadata_map);
+std::shared_ptr<Aws::StringStream> S3Wrapper::readFlowFileStream(const std::shared_ptr<io::InputStream>& stream, uint64_t read_limit, uint64_t& read_size_out) {

Review Comment:
   The result of `readFlowFileStream` is used in S3 request objects' `SetBody` member which requires `shared_ptr`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] lordgamez commented on a diff in pull request #1586: MINIFICPP-2127 Add multipart upload support for PutS3Object processor

Posted by "lordgamez (via GitHub)" <gi...@apache.org>.
lordgamez commented on code in PR #1586:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1586#discussion_r1232009977


##########
extensions/aws/processors/PutS3Object.cpp:
##########
@@ -77,7 +78,31 @@ void PutS3Object::onSchedule(const std::shared_ptr<core::ProcessContext> &contex
     use_virtual_addressing_ = !*use_path_style_access;
   }
 
+  context->getProperty(MultipartThreshold.getName(), multipart_threshold_);
+  if (multipart_threshold_ > getMaxUploadSize() || multipart_threshold_ < getMinPartSize()) {
+    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Multipart Threshold is not between the valid 5MB and 5GB range!");
+  }
+  logger_->log_debug("PutS3Object: Multipart Threshold %" PRIu64, multipart_threshold_);
+  context->getProperty(MultipartPartSize.getName(), multipart_size_);
+  if (multipart_size_ > getMaxUploadSize() || multipart_size_ < getMinPartSize()) {
+    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Multipart Part Size is not between the valid 5MB and 5GB range!");
+  }
+  logger_->log_debug("PutS3Object: Multipart Size %" PRIu64, multipart_size_);
+
+
+  multipart_upload_ageoff_interval_ = minifi::utils::getRequiredPropertyOrThrow<core::TimePeriodValue>(*context, MultipartUploadAgeOffInterval.getName()).getMilliseconds();
+  logger_->log_debug("PutS3Object: Multipart Upload Ageoff Interval %" PRIu64 " ms", multipart_upload_ageoff_interval_.count());
+
+  multipart_upload_max_age_threshold_ = minifi::utils::getRequiredPropertyOrThrow<core::TimePeriodValue>(*context, MultipartUploadMaxAgeThreshold.getName()).getMilliseconds();
+  logger_->log_debug("PutS3Object: Multipart Upload Max Age Threshold %" PRIu64 " ms", multipart_upload_max_age_threshold_.count());
+
   fillUserMetadata(context);
+
+  std::string multipart_temp_dir;
+  context->getProperty(TemporaryDirectoryMultipartState.getName(), multipart_temp_dir);
+
+
+  s3_wrapper_.initailizeMultipartUploadStateStorage(multipart_temp_dir, getUUIDStr());

Review Comment:
   I agree that it would be optimal to use the state manager. In its current form however as a single transaction is tied to a single onTrigger session it will probably take a larger effort to figure out how should commit and rollback work with multiple states in a single session. Because of this I wouldn't make it part of this PR.
   
   IMO we shouldn't give up continuing failed multipart uploads after a restart as it could be a useful feature for the user if for any reason the system is restarted, the process is killed or maybe the MiNiFi service is restarted. I would also like to make it configurable to avoid any directory permission issues or just to give the user the option to decide where MiNiFi should be allowed to write on the system. Because of this I would go with option 3.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] lordgamez commented on a diff in pull request #1586: MINIFICPP-2127 Add multipart upload support for PutS3Object processor

Posted by "lordgamez (via GitHub)" <gi...@apache.org>.
lordgamez commented on code in PR #1586:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1586#discussion_r1234870455


##########
extensions/aws/processors/PutS3Object.cpp:
##########
@@ -77,7 +78,31 @@ void PutS3Object::onSchedule(const std::shared_ptr<core::ProcessContext> &contex
     use_virtual_addressing_ = !*use_path_style_access;
   }
 
+  context->getProperty(MultipartThreshold.getName(), multipart_threshold_);
+  if (multipart_threshold_ > getMaxUploadSize() || multipart_threshold_ < getMinPartSize()) {
+    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Multipart Threshold is not between the valid 5MB and 5GB range!");
+  }
+  logger_->log_debug("PutS3Object: Multipart Threshold %" PRIu64, multipart_threshold_);
+  context->getProperty(MultipartPartSize.getName(), multipart_size_);
+  if (multipart_size_ > getMaxUploadSize() || multipart_size_ < getMinPartSize()) {
+    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Multipart Part Size is not between the valid 5MB and 5GB range!");
+  }
+  logger_->log_debug("PutS3Object: Multipart Size %" PRIu64, multipart_size_);
+
+
+  multipart_upload_ageoff_interval_ = minifi::utils::getRequiredPropertyOrThrow<core::TimePeriodValue>(*context, MultipartUploadAgeOffInterval.getName()).getMilliseconds();
+  logger_->log_debug("PutS3Object: Multipart Upload Ageoff Interval %" PRIu64 " ms", multipart_upload_ageoff_interval_.count());
+
+  multipart_upload_max_age_threshold_ = minifi::utils::getRequiredPropertyOrThrow<core::TimePeriodValue>(*context, MultipartUploadMaxAgeThreshold.getName()).getMilliseconds();
+  logger_->log_debug("PutS3Object: Multipart Upload Max Age Threshold %" PRIu64 " ms", multipart_upload_max_age_threshold_.count());
+
   fillUserMetadata(context);
+
+  std::string multipart_temp_dir;
+  context->getProperty(TemporaryDirectoryMultipartState.getName(), multipart_temp_dir);
+
+
+  s3_wrapper_.initailizeMultipartUploadStateStorage(multipart_temp_dir, getUUIDStr());

Review Comment:
   Added a jira ticket for the state management change https://issues.apache.org/jira/browse/MINIFICPP-2140



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] lordgamez commented on a diff in pull request #1586: MINIFICPP-2127 Add multipart upload support for PutS3Object processor

Posted by "lordgamez (via GitHub)" <gi...@apache.org>.
lordgamez commented on code in PR #1586:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1586#discussion_r1273249255


##########
extensions/aws/processors/PutS3Object.cpp:
##########
@@ -186,6 +250,48 @@ void PutS3Object::setAttributes(
   }
 }
 
+void PutS3Object::ageOffMultipartUploads(const CommonProperties &common_properties) {
+  const auto now = std::chrono::system_clock::now();
+  if (now - last_ageoff_time_.load() < multipart_upload_ageoff_interval_) {
+    logger_->log_debug("Multipart Upload Age off interval still in progress, not checking obsolete multipart uploads.");
+    return;
+  }
+
+  logger_->log_trace("Listing aged off multipart uploads still in progress.");
+  aws::s3::ListMultipartUploadsRequestParameters list_params(common_properties.credentials, *client_config_);
+  list_params.setClientConfig(common_properties.proxy, common_properties.endpoint_override_url);
+  list_params.bucket = common_properties.bucket;
+  list_params.age_off_limit = multipart_upload_max_age_threshold_;
+  list_params.use_virtual_addressing = use_virtual_addressing_;
+  auto aged_off_uploads_in_progress = s3_wrapper_.listMultipartUploads(list_params);
+  if (!aged_off_uploads_in_progress) {
+    logger_->log_error("Listing aged off multipart uploads failed!");
+    return;
+  }
+
+  logger_->log_info("Found %d aged off pending multipart upload jobs in bucket '%s'", aged_off_uploads_in_progress->size(), common_properties.bucket);
+  size_t aborted = 0;
+  for (const auto& upload : *aged_off_uploads_in_progress) {
+    logger_->log_info("Aborting multipart upload with key '%s' and upload id '%s' in bucket '%s'", upload.key, upload.upload_id, common_properties.bucket);

Review Comment:
   Good point, updated in bd5d6d1b50c528061ffc34529e213039bd6dbf7a



##########
extensions/aws/processors/PutS3Object.cpp:
##########
@@ -186,6 +250,48 @@ void PutS3Object::setAttributes(
   }
 }
 
+void PutS3Object::ageOffMultipartUploads(const CommonProperties &common_properties) {
+  const auto now = std::chrono::system_clock::now();
+  if (now - last_ageoff_time_.load() < multipart_upload_ageoff_interval_) {
+    logger_->log_debug("Multipart Upload Age off interval still in progress, not checking obsolete multipart uploads.");
+    return;
+  }
+
+  logger_->log_trace("Listing aged off multipart uploads still in progress.");
+  aws::s3::ListMultipartUploadsRequestParameters list_params(common_properties.credentials, *client_config_);
+  list_params.setClientConfig(common_properties.proxy, common_properties.endpoint_override_url);
+  list_params.bucket = common_properties.bucket;
+  list_params.age_off_limit = multipart_upload_max_age_threshold_;
+  list_params.use_virtual_addressing = use_virtual_addressing_;
+  auto aged_off_uploads_in_progress = s3_wrapper_.listMultipartUploads(list_params);
+  if (!aged_off_uploads_in_progress) {
+    logger_->log_error("Listing aged off multipart uploads failed!");
+    return;
+  }
+
+  logger_->log_info("Found %d aged off pending multipart upload jobs in bucket '%s'", aged_off_uploads_in_progress->size(), common_properties.bucket);
+  size_t aborted = 0;
+  for (const auto& upload : *aged_off_uploads_in_progress) {
+    logger_->log_info("Aborting multipart upload with key '%s' and upload id '%s' in bucket '%s'", upload.key, upload.upload_id, common_properties.bucket);
+    aws::s3::AbortMultipartUploadRequestParameters abort_params(common_properties.credentials, *client_config_);
+    abort_params.setClientConfig(common_properties.proxy, common_properties.endpoint_override_url);
+    abort_params.bucket = common_properties.bucket;
+    abort_params.key = upload.key;
+    abort_params.upload_id = upload.upload_id;
+    abort_params.use_virtual_addressing = use_virtual_addressing_;
+    if (!s3_wrapper_.abortMultipartUpload(abort_params)) {
+       logger_->log_error("Failed to abort multipart upload with key '%s' and upload id '%s' in bucket '%s'", abort_params.key, abort_params.upload_id, abort_params.bucket);
+       continue;
+    }
+    ++aborted;
+  }
+  if (aborted > 0) {
+    logger_->log_info("Aborted %d pending multipart upload jobs in bucket '%s'", aborted, common_properties.bucket);
+  }
+  s3_wrapper_.ageOffLocalS3MultipartUploadStates(multipart_upload_max_age_threshold_);
+  last_ageoff_time_ = now;

Review Comment:
   I didn't think it was necessary as in scenario where there are overlapping onTriggers the worst case scenario would be that the aged off uploads are requested to be deleted multiple times. Of course it could be improved to avoid this scenario, I didn't see how this could be done with a single atomic `compare_and_exchange` call, so I removed the atomic variable and added a mutex for this block instead in bd5d6d1b50c528061ffc34529e213039bd6dbf7a



##########
extensions/aws/processors/PutS3Object.cpp:
##########
@@ -200,13 +306,15 @@ void PutS3Object::onTrigger(const std::shared_ptr<core::ProcessContext> &context
     return;
   }
 
+  ageOffMultipartUploads(*common_properties);
+
   auto put_s3_request_params = buildPutS3RequestParams(context, flow_file, *common_properties);
   if (!put_s3_request_params) {
     session->transfer(flow_file, Failure);
     return;
   }
 
-  PutS3Object::ReadCallback callback(flow_file->getSize(), *put_s3_request_params, s3_wrapper_);
+  ReadCallback callback(flow_file->getSize(), *put_s3_request_params, s3_wrapper_, multipart_threshold_, multipart_size_, *logger_);

Review Comment:
   You are right at this point this ReadCallback is getting too many parameters and it is simpler to have a single lambda, updated in bd5d6d1b50c528061ffc34529e213039bd6dbf7a



##########
extensions/aws/s3/S3Wrapper.cpp:
##########
@@ -263,17 +372,17 @@ std::optional<HeadObjectResult> S3Wrapper::headObject(const HeadObjectRequestPar
 template<typename ListRequest>
 ListRequest S3Wrapper::createListRequest(const ListRequestParameters& params) {
   ListRequest request;
-  request.SetBucket(params.bucket);
-  request.SetDelimiter(params.delimiter);
-  request.SetPrefix(params.prefix);
+  request.WithBucket(params.bucket)
+    .WithDelimiter(params.delimiter)
+    .WithPrefix(params.prefix);

Review Comment:
   According to the examples I saw like [this](https://github.com/aws/aws-sdk-cpp/blob/6cc48868b7558265fd095ce338ce37320a2968e2/tests/aws-cpp-sdk-s3-crt-integration-tests/BucketAndObjectOperationTest.cpp#L400) it should be safe, as it looks to be equivalent to the `Set*` methods, the only difference being that the returning references makes it simpler to [chain these calls together](https://github.com/aws/aws-sdk-cpp/blob/6cc48868b7558265fd095ce338ce37320a2968e2/tests/aws-cpp-sdk-cloudfront-integration-tests/CloudfrontOperationTest.cpp#L182). But the version you suggested looks better in my opinion so I updated the `With*` calls in bd5d6d1b50c528061ffc34529e213039bd6dbf7a



##########
extensions/aws/s3/S3Wrapper.cpp:
##########
@@ -235,8 +344,8 @@ std::optional<std::vector<ListedObjectAttributes>> S3Wrapper::listBucket(const L
 
 std::optional<std::map<std::string, std::string>> S3Wrapper::getObjectTags(const GetObjectTagsParameters& params) {
   Aws::S3::Model::GetObjectTaggingRequest request;
-  request.SetBucket(params.bucket);
-  request.SetKey(params.object_key);
+  request.WithBucket(params.bucket)
+    .WithKey(params.object_key);

Review Comment:
   Updated in bd5d6d1b50c528061ffc34529e213039bd6dbf7a



##########
extensions/aws/s3/S3Wrapper.cpp:
##########
@@ -298,4 +407,58 @@ FetchObjectResult S3Wrapper::fillFetchObjectResult(const GetObjectRequestParamet
   return result;
 }
 
+void S3Wrapper::addListMultipartUploadResults(const Aws::Vector<Aws::S3::Model::MultipartUpload>& uploads, std::optional<std::chrono::milliseconds> age_off_limit,
+    std::vector<MultipartUpload>& filtered_uploads) {
+  const auto now = Aws::Utils::DateTime::Now();
+  for (const auto& upload : uploads) {
+    // if age_off_limit is set only list the aged off uploads
+    if (age_off_limit && now - upload.GetInitiated() <= *age_off_limit) {
+      logger_->log_debug("Multipart upload with key '%s' and upload id '%s' has not aged off yet", upload.GetKey(), upload.GetUploadId());
+      continue;
+    }
+
+    MultipartUpload filtered_upload;
+    filtered_upload.key = upload.GetKey();
+    filtered_upload.upload_id = upload.GetUploadId();
+    filtered_uploads.push_back(filtered_upload);

Review Comment:
   Updated in bd5d6d1b50c528061ffc34529e213039bd6dbf7a



##########
extensions/aws/s3/S3Wrapper.h:
##########
@@ -195,31 +198,118 @@ struct ListedObjectAttributes : public minifi::utils::ListedObject {
 using HeadObjectRequestParameters = GetObjectRequestParameters;
 using GetObjectTagsParameters = DeleteObjectRequestParameters;
 
+struct ListMultipartUploadsRequestParameters : public RequestParameters {
+  ListMultipartUploadsRequestParameters(const Aws::Auth::AWSCredentials& creds, const Aws::Client::ClientConfiguration& config)
+    : RequestParameters(creds, config) {}
+  std::string bucket;
+  std::optional<std::chrono::milliseconds> age_off_limit;  // if set, only list the aged off uploads
+  bool use_virtual_addressing = true;
+};
+
+struct MultipartUpload {
+  std::string key;
+  std::string upload_id;
+};
+
+struct AbortMultipartUploadRequestParameters : public RequestParameters {
+  AbortMultipartUploadRequestParameters(const Aws::Auth::AWSCredentials& creds, const Aws::Client::ClientConfiguration& config)
+    : RequestParameters(creds, config) {}
+  std::string bucket;
+  std::string key;
+  std::string upload_id;
+  bool use_virtual_addressing = true;
+};
+
+class StreamReadException : public Exception {
+ public:
+  explicit StreamReadException(const std::string& error) : Exception(GENERAL_EXCEPTION, error) {}
+};
+
 class S3Wrapper {
  public:
+  static constexpr uint64_t BUFFER_SIZE = 4_KiB;
+
   S3Wrapper();
   explicit S3Wrapper(std::unique_ptr<S3RequestSender>&& request_sender);
 
-  std::optional<PutObjectResult> putObject(const PutObjectRequestParameters& put_object_params, const std::shared_ptr<Aws::IOStream>& data_stream);
+  std::optional<PutObjectResult> putObject(const PutObjectRequestParameters& put_object_params, const std::shared_ptr<io::InputStream>& stream, uint64_t flow_size);
+  std::optional<PutObjectResult> putObjectMultipart(const PutObjectRequestParameters& put_object_params, const std::shared_ptr<io::InputStream>& stream, uint64_t flow_size, uint64_t multipart_size);
   bool deleteObject(const DeleteObjectRequestParameters& params);
   std::optional<GetObjectResult> getObject(const GetObjectRequestParameters& get_object_params, io::OutputStream& out_body);
   std::optional<std::vector<ListedObjectAttributes>> listBucket(const ListRequestParameters& params);
   std::optional<std::map<std::string, std::string>> getObjectTags(const GetObjectTagsParameters& params);
   std::optional<HeadObjectResult> headObject(const HeadObjectRequestParameters& head_object_params);
+  std::optional<std::vector<MultipartUpload>> listMultipartUploads(const ListMultipartUploadsRequestParameters& params);
+  bool abortMultipartUpload(const AbortMultipartUploadRequestParameters& params);
+  void ageOffLocalS3MultipartUploadStates(std::chrono::milliseconds multipart_upload_max_age_threshold);
+  void initializeMultipartUploadStateStorage(gsl::not_null<minifi::core::StateManager*> state_manager);
 
   virtual ~S3Wrapper() = default;
 
  private:
+  struct UploadPartsResult {
+    std::string upload_id;
+    std::vector<std::string> part_etags;
+  };
+
   static Expiration getExpiration(const std::string& expiration);
 
-  void setCannedAcl(Aws::S3::Model::PutObjectRequest& request, const std::string& canned_acl) const;
+  template<typename RequestType>
+  void setCannedAcl(RequestType& request, const std::string& canned_acl) const {
+    if (canned_acl.empty()) return;
+
+    const auto it = ranges::find(CANNED_ACL_MAP, canned_acl, [](const auto& kv) { return kv.first; });
+    if (it == CANNED_ACL_MAP.end()) return;
+
+    logger_->log_debug("Setting AWS canned ACL [%s]", canned_acl);
+    request.SetACL(it->second);
+  }
+
+  template<typename RequestType>
+  RequestType createPutObjectRequest(const PutObjectRequestParameters& put_object_params) {
+    RequestType request;
+    request.WithBucket(put_object_params.bucket)

Review Comment:
   Updated in bd5d6d1b50c528061ffc34529e213039bd6dbf7a



##########
extensions/aws/s3/S3Wrapper.h:
##########
@@ -195,31 +198,118 @@ struct ListedObjectAttributes : public minifi::utils::ListedObject {
 using HeadObjectRequestParameters = GetObjectRequestParameters;
 using GetObjectTagsParameters = DeleteObjectRequestParameters;
 
+struct ListMultipartUploadsRequestParameters : public RequestParameters {
+  ListMultipartUploadsRequestParameters(const Aws::Auth::AWSCredentials& creds, const Aws::Client::ClientConfiguration& config)
+    : RequestParameters(creds, config) {}
+  std::string bucket;
+  std::optional<std::chrono::milliseconds> age_off_limit;  // if set, only list the aged off uploads
+  bool use_virtual_addressing = true;
+};
+
+struct MultipartUpload {
+  std::string key;
+  std::string upload_id;
+};
+
+struct AbortMultipartUploadRequestParameters : public RequestParameters {
+  AbortMultipartUploadRequestParameters(const Aws::Auth::AWSCredentials& creds, const Aws::Client::ClientConfiguration& config)
+    : RequestParameters(creds, config) {}
+  std::string bucket;
+  std::string key;
+  std::string upload_id;
+  bool use_virtual_addressing = true;
+};
+
+class StreamReadException : public Exception {
+ public:
+  explicit StreamReadException(const std::string& error) : Exception(GENERAL_EXCEPTION, error) {}
+};
+
 class S3Wrapper {
  public:
+  static constexpr uint64_t BUFFER_SIZE = 4_KiB;
+
   S3Wrapper();
   explicit S3Wrapper(std::unique_ptr<S3RequestSender>&& request_sender);
 
-  std::optional<PutObjectResult> putObject(const PutObjectRequestParameters& put_object_params, const std::shared_ptr<Aws::IOStream>& data_stream);
+  std::optional<PutObjectResult> putObject(const PutObjectRequestParameters& put_object_params, const std::shared_ptr<io::InputStream>& stream, uint64_t flow_size);
+  std::optional<PutObjectResult> putObjectMultipart(const PutObjectRequestParameters& put_object_params, const std::shared_ptr<io::InputStream>& stream, uint64_t flow_size, uint64_t multipart_size);
   bool deleteObject(const DeleteObjectRequestParameters& params);
   std::optional<GetObjectResult> getObject(const GetObjectRequestParameters& get_object_params, io::OutputStream& out_body);
   std::optional<std::vector<ListedObjectAttributes>> listBucket(const ListRequestParameters& params);
   std::optional<std::map<std::string, std::string>> getObjectTags(const GetObjectTagsParameters& params);
   std::optional<HeadObjectResult> headObject(const HeadObjectRequestParameters& head_object_params);
+  std::optional<std::vector<MultipartUpload>> listMultipartUploads(const ListMultipartUploadsRequestParameters& params);
+  bool abortMultipartUpload(const AbortMultipartUploadRequestParameters& params);
+  void ageOffLocalS3MultipartUploadStates(std::chrono::milliseconds multipart_upload_max_age_threshold);
+  void initializeMultipartUploadStateStorage(gsl::not_null<minifi::core::StateManager*> state_manager);
 
   virtual ~S3Wrapper() = default;
 
  private:
+  struct UploadPartsResult {
+    std::string upload_id;
+    std::vector<std::string> part_etags;
+  };
+
   static Expiration getExpiration(const std::string& expiration);
 
-  void setCannedAcl(Aws::S3::Model::PutObjectRequest& request, const std::string& canned_acl) const;
+  template<typename RequestType>
+  void setCannedAcl(RequestType& request, const std::string& canned_acl) const {
+    if (canned_acl.empty()) return;
+
+    const auto it = ranges::find(CANNED_ACL_MAP, canned_acl, [](const auto& kv) { return kv.first; });
+    if (it == CANNED_ACL_MAP.end()) return;
+
+    logger_->log_debug("Setting AWS canned ACL [%s]", canned_acl);
+    request.SetACL(it->second);
+  }
+
+  template<typename RequestType>
+  RequestType createPutObjectRequest(const PutObjectRequestParameters& put_object_params) {
+    RequestType request;
+    request.WithBucket(put_object_params.bucket)
+      .WithKey(put_object_params.object_key)
+      .WithStorageClass(minifi::utils::at(STORAGE_CLASS_MAP, put_object_params.storage_class))
+      .WithServerSideEncryption(minifi::utils::at(SERVER_SIDE_ENCRYPTION_MAP, put_object_params.server_side_encryption))
+      .WithMetadata(put_object_params.user_metadata_map)
+      .WithGrantFullControl(put_object_params.fullcontrol_user_list)
+      .WithGrantRead(put_object_params.read_permission_user_list)
+      .WithGrantReadACP(put_object_params.read_acl_user_list)
+      .WithGrantWriteACP(put_object_params.write_acl_user_list);
+    request.SetContentType(put_object_params.content_type);
+    setCannedAcl<RequestType>(request, put_object_params.canned_acl);
+    return request;
+  }
+
+  template<typename ResultType>
+  PutObjectResult createPutObjectResult(const ResultType& upload_result) {
+    PutObjectResult put_object_result;
+    // Etags are returned by AWS in quoted form that should be removed
+    put_object_result.etag = minifi::utils::StringUtils::removeFramingCharacters(upload_result.GetETag(), '"');
+    put_object_result.version = upload_result.GetVersionId();
+
+    // GetExpiration returns a string pair with a date and a ruleid in 'expiry-date=\"<DATE>\", rule-id=\"<RULEID>\"' format
+    // s3.expiration only needs the date member of this pair
+    put_object_result.expiration = getExpiration(upload_result.GetExpiration()).expiration_time;
+    put_object_result.ssealgorithm = getEncryptionString(upload_result.GetServerSideEncryption());
+    return put_object_result;
+  }

Review Comment:
   Updated in bd5d6d1b50c528061ffc34529e213039bd6dbf7a



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a diff in pull request #1586: MINIFICPP-2127 Add multipart upload support for PutS3Object processor

Posted by "adamdebreceni (via GitHub)" <gi...@apache.org>.
adamdebreceni commented on code in PR #1586:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1586#discussion_r1258046049


##########
extensions/aws/s3/MultipartUploadStateStorage.cpp:
##########
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "MultipartUploadStateStorage.h"
+
+#include "utils/StringUtils.h"
+
+namespace org::apache::nifi::minifi::aws::s3 {
+
+void MultipartUploadStateStorage::storeState(const std::string& bucket, const std::string& key, const MultipartUploadState& state) {
+  std::unordered_map<std::string, std::string> stored_state;
+  state_manager_->get(stored_state);
+  std::string state_key = bucket + "/" + key;
+  stored_state[state_key + ".upload_id"] = state.upload_id;
+  stored_state[state_key + ".upload_time"] = std::to_string(state.upload_time.Millis());
+  stored_state[state_key + ".uploaded_parts"] = std::to_string(state.uploaded_parts);
+  stored_state[state_key + ".uploaded_size"] = std::to_string(state.uploaded_size);
+  stored_state[state_key + ".part_size"] = std::to_string(state.part_size);
+  stored_state[state_key + ".full_size"] = std::to_string(state.full_size);
+  stored_state[state_key + ".uploaded_etags"] = minifi::utils::StringUtils::join(";", state.uploaded_etags);
+  state_manager_->set(stored_state);
+  state_manager_->commit();
+  state_manager_->persist();
+}
+
+std::optional<MultipartUploadState> MultipartUploadStateStorage::getState(const std::string& bucket, const std::string& key) const {
+  std::unordered_map<std::string, std::string> state_map;
+  if (!state_manager_->get(state_map)) {
+    logger_->log_warn("No previous multipart upload state was associated with this processor.");
+    return std::nullopt;
+  }
+  std::string state_key = bucket + "/" + key;
+  if (!state_map.contains(state_key + ".upload_id")) {
+    logger_->log_warn("Multipart upload state was not found for key '%s'", state_key);
+    return std::nullopt;
+  }
+
+  MultipartUploadState state;
+  state.upload_id = state_map[state_key + ".upload_id"];
+
+  int64_t stored_upload_time = 0;
+  core::Property::StringToInt(state_map[state_key + ".upload_time"], stored_upload_time);
+  state.upload_time = Aws::Utils::DateTime(stored_upload_time);
+
+  core::Property::StringToInt(state_map[state_key + ".uploaded_parts"], state.uploaded_parts);
+  core::Property::StringToInt(state_map[state_key + ".uploaded_size"], state.uploaded_size);
+  core::Property::StringToInt(state_map[state_key + ".part_size"], state.part_size);
+  core::Property::StringToInt(state_map[state_key + ".full_size"], state.full_size);
+  state.uploaded_etags = minifi::utils::StringUtils::splitAndTrimRemovingEmpty(state_map[state_key + ".uploaded_etags"], ";");
+  return state;
+}
+
+void MultipartUploadStateStorage::removeKey(const std::string& state_key, std::unordered_map<std::string, std::string>& state_map) {
+  state_map.erase(state_key + ".upload_id");
+  state_map.erase(state_key + ".upload_time");
+  state_map.erase(state_key + ".uploaded_parts");
+  state_map.erase(state_key + ".uploaded_size");
+  state_map.erase(state_key + ".part_size");
+  state_map.erase(state_key + ".full_size");
+  state_map.erase(state_key + ".uploaded_etags");
+}
+
+void MultipartUploadStateStorage::removeState(const std::string& bucket, const std::string& key) {
+  std::unordered_map<std::string, std::string> state_map;
+  if (!state_manager_->get(state_map)) {
+    logger_->log_warn("No previous multipart upload state was associated with this processor.");
+    return;
+  }
+  std::string state_key = bucket + "/" + key;
+  if (!state_map.contains(state_key + ".upload_id")) {
+    logger_->log_warn("Multipart upload state was not found for key '%s'", state_key);
+    return;
+  }
+
+  removeKey(state_key, state_map);
+  state_manager_->set(state_map);
+  state_manager_->commit();
+  state_manager_->persist();
+}
+
+void MultipartUploadStateStorage::removeAgedStates(std::chrono::milliseconds multipart_upload_max_age_threshold) {
+  std::unordered_map<std::string, std::string> state_map;
+  if (!state_manager_->get(state_map)) {
+    logger_->log_warn("No previous multipart upload state was associated with this processor.");
+    return;
+  }
+  auto age_off_time = Aws::Utils::DateTime::Now() - multipart_upload_max_age_threshold;
+
+  std::vector<std::string> keys_to_remove;
+  for (const auto& [property_key, value] : state_map) {
+    if (!minifi::utils::StringUtils::endsWith(property_key, ".upload_time")) {
+      continue;
+    }
+    int64_t stored_upload_time{};
+    if (!core::Property::StringToInt(value, stored_upload_time)) {
+      logger_->log_error("Multipart upload cache key '%s' has invalid value '%s'", property_key, value);
+      continue;
+    }
+    auto upload_time = Aws::Utils::DateTime(stored_upload_time);
+    if (upload_time < age_off_time) {
+      auto state_key_and_property_name = minifi::utils::StringUtils::split(property_key, ".");
+      keys_to_remove.push_back(state_key_and_property_name[0]);

Review Comment:
   should we just strip the ".upload_time" suffix instead? as I understand the `property_key` is `<bucket>/<object key>.upload_time` and `object key` is either the `ObjectKey` property set or the `filename` of the flow file, so in the latter case could contain a `.` character for the extension (or hidden file)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] fgerlits commented on a diff in pull request #1586: MINIFICPP-2127 Add multipart upload support for PutS3Object processor

Posted by "fgerlits (via GitHub)" <gi...@apache.org>.
fgerlits commented on code in PR #1586:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1586#discussion_r1231175779


##########
docker/test/integration/features/steps/steps.py:
##########
@@ -275,6 +275,11 @@ def step_impl(context, content, path):
     context.test.add_test_data(path, content)
 
 
+@given("a file with {size} content is present in \"{path}\"")

Review Comment:
   I think
   ```suggestion
   @given("a file of size {size} is present in \"{path}\"")
   ```
   would be clearer



##########
extensions/aws/processors/PutS3Object.h:
##########
@@ -59,6 +64,11 @@ class PutS3Object : public S3Processor {
   static const core::Property WriteACLUserList;
   static const core::Property CannedACL;
   static const core::Property UsePathStyleAccess;
+  static const core::Property MultipartThreshold;
+  static const core::Property MultipartPartSize;

Review Comment:
   I don't know if "part" is standard AWS terminology, but if it isn't, then "chunk" may be a better word for this.



##########
docker/test/integration/cluster/checkers/AwsChecker.py:
##########
@@ -29,6 +29,16 @@ def check_s3_server_object_data(self, container_name, test_data):
         (code, file_data) = self.container_communicator.execute_command(container_name, ["cat", s3_mock_dir + "/binaryData"])
         return code == 0 and file_data == test_data
 
+    @retry_check()
+    def check_s3_server_object_hash(self, container_name: str, expected_file_hash: str):
+        (code, output) = self.container_communicator.execute_command(container_name, ["find", "/s3mockroot/test_bucket", "-mindepth", "1", "-maxdepth", "1", "-type", "d"])
+        if code != 0:
+            return False
+        s3_mock_dir = output.strip()
+        (code, md5_output) = self.container_communicator.execute_command(container_name, ["md5sum", s3_mock_dir + "/binaryData"])
+        file_hash = md5_output.split(' ')[0].strip()
+        return code == 0 and file_hash == expected_file_hash

Review Comment:
   Can `md5_output` be `None` if `code != 0`?  It may be safer to check `code` first, return if non-zero, and then compute and check `file_hash`.



##########
extensions/aws/s3/MultipartUploadStateStorage.cpp:
##########
@@ -0,0 +1,165 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "MultipartUploadStateStorage.h"
+
+#include <unordered_map>
+
+#include "utils/StringUtils.h"
+
+namespace org::apache::nifi::minifi::aws::s3 {
+
+MultipartUploadStateStorage::MultipartUploadStateStorage(const std::string& state_directory, const std::string& state_id) {
+  if (state_directory.empty()) {
+    char format[] = "/var/tmp/nifi-minifi-cpp.s3-multipart-upload.XXXXXX";
+    state_file_path_ = minifi::utils::file::FileUtils::create_temp_directory(format);
+  } else {
+    state_file_path_ = std::filesystem::path(state_directory) / std::string(state_id + "-s3-multipart-upload-state.properties");
+    if (!std::filesystem::exists(state_file_path_)) {
+      std::filesystem::create_directories(state_file_path_.parent_path());
+      std::ofstream ofs(state_file_path_);

Review Comment:
   Why do we create this zero-length file here, and it it's needed, then why isn't it needed in the `state_directory.empty()` branch?



##########
libminifi/include/utils/TimeUtil.h:
##########
@@ -189,6 +189,20 @@ inline bool unit_matches<std::chrono::days>(const std::string& unit) {
   return unit == "d" || unit == "day" || unit == "days";
 }
 
+template<>
+inline bool unit_matches<std::chrono::weeks>(const std::string& unit) {
+  return unit == "w" || unit == "wk" || unit == "wks" || unit == "week" || unit == "weeks";
+}
+
+template<>
+inline bool unit_matches<std::chrono::months>(const std::string& unit) {
+  return unit == "month" || unit == "months";
+}
+
+template<>
+inline bool unit_matches<std::chrono::years>(const std::string& unit) {
+  return unit == "y" || unit == "year" || unit == "years";
+}

Review Comment:
   we could add "mth", "mths", and "yr", "yrs"



##########
extensions/aws/s3/MultipartUploadStateStorage.cpp:
##########
@@ -0,0 +1,165 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "MultipartUploadStateStorage.h"
+
+#include <unordered_map>
+
+#include "utils/StringUtils.h"
+
+namespace org::apache::nifi::minifi::aws::s3 {
+
+MultipartUploadStateStorage::MultipartUploadStateStorage(const std::string& state_directory, const std::string& state_id) {
+  if (state_directory.empty()) {
+    char format[] = "/var/tmp/nifi-minifi-cpp.s3-multipart-upload.XXXXXX";
+    state_file_path_ = minifi::utils::file::FileUtils::create_temp_directory(format);
+  } else {
+    state_file_path_ = std::filesystem::path(state_directory) / std::string(state_id + "-s3-multipart-upload-state.properties");
+    if (!std::filesystem::exists(state_file_path_)) {
+      std::filesystem::create_directories(state_file_path_.parent_path());
+      std::ofstream ofs(state_file_path_);
+    } else {
+      loadFile();
+    }
+  }
+}
+
+void MultipartUploadStateStorage::storeState(const std::string& bucket, const std::string& key, const MultipartUploadState& state) {
+  std::lock_guard<std::mutex> lock(state_mutex_);
+  std::string state_key = bucket + "/" + key;
+  state_[state_key + ".upload_id"] = state.upload_id;
+  state_[state_key + ".upload_time"] = std::to_string(state.upload_time.Millis());
+  state_[state_key + ".uploaded_parts"] = std::to_string(state.uploaded_parts);
+  state_[state_key + ".uploaded_size"] = std::to_string(state.uploaded_size);
+  state_[state_key + ".part_size"] = std::to_string(state.part_size);
+  state_[state_key + ".full_size"] = std::to_string(state.full_size);
+  state_[state_key + ".uploaded_etags"] = minifi::utils::StringUtils::join(";", state.uploaded_etags);
+  commitChanges();
+  logger_->log_debug("Updated multipart upload state with key %s", state_key);
+}
+
+std::optional<MultipartUploadState> MultipartUploadStateStorage::getState(const std::string& bucket, const std::string& key) const {
+  std::string state_key = bucket + "/" + key;
+  if (!state_.contains(state_key + ".upload_id")) {
+    logger_->log_warn("Failed to get state: Multipart upload state was not found for key '%s'", state_key);
+    return std::nullopt;
+  }
+
+  std::lock_guard<std::mutex> lock(state_mutex_);
+  MultipartUploadState state;
+  state.upload_id = state_.at(state_key + ".upload_id");
+
+  int64_t stored_upload_time = 0;
+  core::Property::StringToInt(state_.at(state_key + ".upload_time"), stored_upload_time);
+  state.upload_time = Aws::Utils::DateTime(stored_upload_time);
+
+  core::Property::StringToInt(state_.at(state_key + ".uploaded_parts"), state.uploaded_parts);
+  core::Property::StringToInt(state_.at(state_key + ".uploaded_size"), state.uploaded_size);
+  core::Property::StringToInt(state_.at(state_key + ".part_size"), state.part_size);
+  core::Property::StringToInt(state_.at(state_key + ".full_size"), state.full_size);
+  state.uploaded_etags = minifi::utils::StringUtils::splitAndTrimRemovingEmpty(state_.at(state_key + ".uploaded_etags"), ";");
+  return state;
+}
+
+void MultipartUploadStateStorage::removeState(const std::string& bucket, const std::string& key) {
+  std::lock_guard<std::mutex> lock(state_mutex_);
+  std::string state_key = bucket + "/" + key;
+  if (!state_.contains(state_key + ".upload_id")) {
+    logger_->log_warn("Multipart upload state was not found for key '%s'", state_key);
+    return;
+  }
+
+  removeKey(state_key);
+  commitChanges();
+  logger_->log_debug("Removed multipart upload state with key %s", state_key);
+}
+
+void MultipartUploadStateStorage::removeAgedStates(std::chrono::milliseconds multipart_upload_max_age_threshold) {
+  std::lock_guard<std::mutex> lock(state_mutex_);
+  auto age_off_time = Aws::Utils::DateTime::Now() - multipart_upload_max_age_threshold;
+
+  std::vector<std::string> keys_to_remove;
+  for (const auto& [property_key, value] : state_) {
+    if (!minifi::utils::StringUtils::endsWith(property_key, ".upload_time")) {
+      continue;
+    }
+    if (!state_.contains(property_key)) {
+      logger_->log_error("Could not retrieve value for multipart upload cache key '%s'", property_key);
+      continue;
+    }
+    int64_t stored_upload_time{};
+    if (!core::Property::StringToInt(value, stored_upload_time)) {
+      logger_->log_error("Multipart upload cache key '%s' has invalid value '%s'", property_key, value);
+      continue;
+    }
+    auto upload_time = Aws::Utils::DateTime(stored_upload_time);
+    if (upload_time < age_off_time) {
+      auto state_key_and_property_name = minifi::utils::StringUtils::split(property_key, ".");
+      if (state_key_and_property_name.size() < 2) {
+        logger_->log_error("Invalid property '%s'", property_key);
+        continue;
+      }

Review Comment:
   This error condition is not possible, since we know that `property_key` ends with `".upload_time"`.  Maybe we could check instead that the state key is non-empty, but I don't think that's needed, either.



##########
extensions/aws/s3/S3Wrapper.cpp:
##########
@@ -75,42 +69,155 @@ std::string S3Wrapper::getEncryptionString(Aws::S3::Model::ServerSideEncryption
   return "";
 }
 
-std::optional<PutObjectResult> S3Wrapper::putObject(const PutObjectRequestParameters& put_object_params, const std::shared_ptr<Aws::IOStream>& data_stream) {
-  Aws::S3::Model::PutObjectRequest request;
-  request.SetBucket(put_object_params.bucket);
-  request.SetKey(put_object_params.object_key);
-  request.SetStorageClass(STORAGE_CLASS_MAP.at(put_object_params.storage_class));
-  request.SetServerSideEncryption(SERVER_SIDE_ENCRYPTION_MAP.at(put_object_params.server_side_encryption));
-  request.SetContentType(put_object_params.content_type);
-  request.SetMetadata(put_object_params.user_metadata_map);
+std::shared_ptr<Aws::StringStream> S3Wrapper::readFlowFileStream(const std::shared_ptr<io::InputStream>& stream, uint64_t read_limit, uint64_t& read_size_out) {
+  std::vector<std::byte> buffer;
+  buffer.resize(BUFFER_SIZE);
+  auto data_stream = std::make_shared<Aws::StringStream>();
+  uint64_t read_size = 0;
+  while (read_size < read_limit) {
+    const auto next_read_size = (std::min)(read_limit - read_size, BUFFER_SIZE);
+    const auto read_ret = stream->read(gsl::make_span(buffer).subspan(0, next_read_size));
+    if (io::isError(read_ret)) {
+      throw StreamReadException("Reading flow file inputstream failed!");
+    }
+    if (read_ret > 0) {
+      data_stream->write(reinterpret_cast<char*>(buffer.data()), gsl::narrow<std::streamsize>(next_read_size));
+      read_size += read_ret;
+    } else {
+      break;
+    }
+  }
+  read_size_out = read_size;
+  return data_stream;
+}
+
+std::optional<PutObjectResult> S3Wrapper::putObject(const PutObjectRequestParameters& put_object_params, const std::shared_ptr<io::InputStream>& stream, uint64_t flow_size) {
+  uint64_t read_size{};
+  auto data_stream = readFlowFileStream(stream, flow_size, read_size);
+  auto request = createPutObjectRequest<Aws::S3::Model::PutObjectRequest>(put_object_params);
   request.SetBody(data_stream);
-  request.SetGrantFullControl(put_object_params.fullcontrol_user_list);
-  request.SetGrantRead(put_object_params.read_permission_user_list);
-  request.SetGrantReadACP(put_object_params.read_acl_user_list);
-  request.SetGrantWriteACP(put_object_params.write_acl_user_list);
-  setCannedAcl(request, put_object_params.canned_acl);
 
   auto aws_result = request_sender_->sendPutObjectRequest(request, put_object_params.credentials, put_object_params.client_config, put_object_params.use_virtual_addressing);
   if (!aws_result) {
     return std::nullopt;
   }
 
-  PutObjectResult result;
-  // Etags are returned by AWS in quoted form that should be removed
-  result.etag = minifi::utils::StringUtils::removeFramingCharacters(aws_result->GetETag(), '"');
-  result.version = aws_result->GetVersionId();
+  return createPutObjectResult(*aws_result);
+}
 
-  // GetExpiration returns a string pair with a date and a ruleid in 'expiry-date=\"<DATE>\", rule-id=\"<RULEID>\"' format
-  // s3.expiration only needs the date member of this pair
-  result.expiration = getExpiration(aws_result->GetExpiration()).expiration_time;
-  result.ssealgorithm = getEncryptionString(aws_result->GetServerSideEncryption());
+std::optional<S3Wrapper::UploadPartsResult> S3Wrapper::uploadParts(const PutObjectRequestParameters& put_object_params, const std::shared_ptr<io::InputStream>& stream,
+    MultipartUploadState upload_state) {
+  stream->seek(upload_state.uploaded_size);
+  S3Wrapper::UploadPartsResult result;
+  result.upload_id = upload_state.upload_id;
+  result.part_etags = upload_state.uploaded_etags;
+  const auto flow_size = upload_state.full_size - upload_state.uploaded_size;
+  const size_t part_count = flow_size % upload_state.part_size == 0 ? flow_size / upload_state.part_size : flow_size / upload_state.part_size + 1;

Review Comment:
   I think this would be more readable:
   ```suggestion
     const auto div_ceil = [](size_t n, size_t d) { if (n % d == 0) return n / d; else return n / d + 1; };
     const size_t part_count = div_ceil(flow_size, upload_state.part_size);
   ```



##########
extensions/aws/s3/S3Wrapper.cpp:
##########
@@ -75,42 +69,155 @@ std::string S3Wrapper::getEncryptionString(Aws::S3::Model::ServerSideEncryption
   return "";
 }
 
-std::optional<PutObjectResult> S3Wrapper::putObject(const PutObjectRequestParameters& put_object_params, const std::shared_ptr<Aws::IOStream>& data_stream) {
-  Aws::S3::Model::PutObjectRequest request;
-  request.SetBucket(put_object_params.bucket);
-  request.SetKey(put_object_params.object_key);
-  request.SetStorageClass(STORAGE_CLASS_MAP.at(put_object_params.storage_class));
-  request.SetServerSideEncryption(SERVER_SIDE_ENCRYPTION_MAP.at(put_object_params.server_side_encryption));
-  request.SetContentType(put_object_params.content_type);
-  request.SetMetadata(put_object_params.user_metadata_map);
+std::shared_ptr<Aws::StringStream> S3Wrapper::readFlowFileStream(const std::shared_ptr<io::InputStream>& stream, uint64_t read_limit, uint64_t& read_size_out) {
+  std::vector<std::byte> buffer;
+  buffer.resize(BUFFER_SIZE);
+  auto data_stream = std::make_shared<Aws::StringStream>();
+  uint64_t read_size = 0;
+  while (read_size < read_limit) {
+    const auto next_read_size = (std::min)(read_limit - read_size, BUFFER_SIZE);
+    const auto read_ret = stream->read(gsl::make_span(buffer).subspan(0, next_read_size));
+    if (io::isError(read_ret)) {
+      throw StreamReadException("Reading flow file inputstream failed!");
+    }
+    if (read_ret > 0) {
+      data_stream->write(reinterpret_cast<char*>(buffer.data()), gsl::narrow<std::streamsize>(next_read_size));
+      read_size += read_ret;
+    } else {
+      break;
+    }
+  }
+  read_size_out = read_size;
+  return data_stream;
+}
+
+std::optional<PutObjectResult> S3Wrapper::putObject(const PutObjectRequestParameters& put_object_params, const std::shared_ptr<io::InputStream>& stream, uint64_t flow_size) {
+  uint64_t read_size{};
+  auto data_stream = readFlowFileStream(stream, flow_size, read_size);
+  auto request = createPutObjectRequest<Aws::S3::Model::PutObjectRequest>(put_object_params);
   request.SetBody(data_stream);
-  request.SetGrantFullControl(put_object_params.fullcontrol_user_list);
-  request.SetGrantRead(put_object_params.read_permission_user_list);
-  request.SetGrantReadACP(put_object_params.read_acl_user_list);
-  request.SetGrantWriteACP(put_object_params.write_acl_user_list);
-  setCannedAcl(request, put_object_params.canned_acl);
 
   auto aws_result = request_sender_->sendPutObjectRequest(request, put_object_params.credentials, put_object_params.client_config, put_object_params.use_virtual_addressing);
   if (!aws_result) {
     return std::nullopt;
   }
 
-  PutObjectResult result;
-  // Etags are returned by AWS in quoted form that should be removed
-  result.etag = minifi::utils::StringUtils::removeFramingCharacters(aws_result->GetETag(), '"');
-  result.version = aws_result->GetVersionId();
+  return createPutObjectResult(*aws_result);
+}
 
-  // GetExpiration returns a string pair with a date and a ruleid in 'expiry-date=\"<DATE>\", rule-id=\"<RULEID>\"' format
-  // s3.expiration only needs the date member of this pair
-  result.expiration = getExpiration(aws_result->GetExpiration()).expiration_time;
-  result.ssealgorithm = getEncryptionString(aws_result->GetServerSideEncryption());
+std::optional<S3Wrapper::UploadPartsResult> S3Wrapper::uploadParts(const PutObjectRequestParameters& put_object_params, const std::shared_ptr<io::InputStream>& stream,
+    MultipartUploadState upload_state) {
+  stream->seek(upload_state.uploaded_size);
+  S3Wrapper::UploadPartsResult result;
+  result.upload_id = upload_state.upload_id;
+  result.part_etags = upload_state.uploaded_etags;
+  const auto flow_size = upload_state.full_size - upload_state.uploaded_size;
+  const size_t part_count = flow_size % upload_state.part_size == 0 ? flow_size / upload_state.part_size : flow_size / upload_state.part_size + 1;
+  size_t total_read = 0;
+  const size_t start_part = upload_state.uploaded_parts + 1;
+  const size_t last_part = start_part + part_count - 1;
+  for (size_t i = start_part; i <= last_part; ++i) {

Review Comment:
   this loop body is long enough (for me) to forget by the end of it what `i` was, so I would rename it to something like `part_number`



##########
extensions/aws/s3/S3Wrapper.cpp:
##########
@@ -297,4 +404,57 @@ FetchObjectResult S3Wrapper::fillFetchObjectResult(const GetObjectRequestParamet
   return result;
 }
 
+void S3Wrapper::addListMultipartUploadResults(const Aws::Vector<Aws::S3::Model::MultipartUpload>& uploads, std::optional<std::chrono::milliseconds> max_upload_age,
+    std::vector<MultipartUpload>& filtered_uploads) {
+  const auto now = Aws::Utils::DateTime::Now();
+  for (const auto& upload : uploads) {
+    if (max_upload_age && now - upload.GetInitiated() <= *max_upload_age) {
+      logger_->log_debug("Multipart upload with key '%s' and upload id '%s' did not meet the age limit", upload.GetKey(), upload.GetUploadId());
+      continue;
+    }
+
+    MultipartUpload filtered_upload;
+    filtered_upload.key = upload.GetKey();
+    filtered_upload.upload_id = upload.GetUploadId();
+    filtered_uploads.push_back(filtered_upload);
+  }
+}
+
+std::optional<std::vector<MultipartUpload>> S3Wrapper::listMultipartUploads(const ListMultipartUploadsRequestParameters& params) {
+  std::vector<MultipartUpload> result;
+  std::optional<Aws::S3::Model::ListMultipartUploadsResult> aws_result;
+  Aws::S3::Model::ListMultipartUploadsRequest request;
+  request.SetBucket(params.bucket);
+  do {
+    aws_result = request_sender_->sendListMultipartUploadsRequest(request, params.credentials, params.client_config, params.use_virtual_addressing);
+    if (!aws_result) {
+      return std::nullopt;
+    }
+    const auto& uploads = aws_result->GetUploads();
+    logger_->log_debug("AWS S3 List operation returned %zu multipart uploads. This result is%s truncated.", uploads.size(), aws_result->GetIsTruncated() ? "" : " not");
+    addListMultipartUploadResults(uploads, params.upload_max_age, result);
+    if (aws_result->GetIsTruncated()) {
+      request.SetKeyMarker(aws_result->GetNextKeyMarker());
+    }
+  } while (aws_result->GetIsTruncated());
+
+  return result;
+}
+
+bool S3Wrapper::abortMultipartUpload(const AbortMultipartUploadRequestParameters& params) {
+  Aws::S3::Model::AbortMultipartUploadRequest request;
+  request.WithBucket(params.bucket)
+    .WithKey(params.key)
+    .WithUploadId(params.upload_id);
+  return request_sender_->sendAbortMultipartUploadRequest(request, params.credentials, params.client_config, params.use_virtual_addressing);
+}
+
+void S3Wrapper::initailizeMultipartUploadStateStorage(const std::string& multipart_temp_dir, const std::string& state_id) {

Review Comment:
   typo: "initailize" -> "initialize"



##########
extensions/aws/s3/S3Wrapper.cpp:
##########
@@ -75,42 +69,155 @@ std::string S3Wrapper::getEncryptionString(Aws::S3::Model::ServerSideEncryption
   return "";
 }
 
-std::optional<PutObjectResult> S3Wrapper::putObject(const PutObjectRequestParameters& put_object_params, const std::shared_ptr<Aws::IOStream>& data_stream) {
-  Aws::S3::Model::PutObjectRequest request;
-  request.SetBucket(put_object_params.bucket);
-  request.SetKey(put_object_params.object_key);
-  request.SetStorageClass(STORAGE_CLASS_MAP.at(put_object_params.storage_class));
-  request.SetServerSideEncryption(SERVER_SIDE_ENCRYPTION_MAP.at(put_object_params.server_side_encryption));
-  request.SetContentType(put_object_params.content_type);
-  request.SetMetadata(put_object_params.user_metadata_map);
+std::shared_ptr<Aws::StringStream> S3Wrapper::readFlowFileStream(const std::shared_ptr<io::InputStream>& stream, uint64_t read_limit, uint64_t& read_size_out) {
+  std::vector<std::byte> buffer;
+  buffer.resize(BUFFER_SIZE);

Review Comment:
   why not 
   ```suggestion
     std::array<std::byte, BUFFER_SIZE> buffer{};
   ```
   ?



##########
extensions/aws/s3/S3Wrapper.cpp:
##########
@@ -297,4 +404,57 @@ FetchObjectResult S3Wrapper::fillFetchObjectResult(const GetObjectRequestParamet
   return result;
 }
 
+void S3Wrapper::addListMultipartUploadResults(const Aws::Vector<Aws::S3::Model::MultipartUpload>& uploads, std::optional<std::chrono::milliseconds> max_upload_age,
+    std::vector<MultipartUpload>& filtered_uploads) {
+  const auto now = Aws::Utils::DateTime::Now();
+  for (const auto& upload : uploads) {
+    if (max_upload_age && now - upload.GetInitiated() <= *max_upload_age) {
+      logger_->log_debug("Multipart upload with key '%s' and upload id '%s' did not meet the age limit", upload.GetKey(), upload.GetUploadId());
+      continue;
+    }

Review Comment:
   Isn't this backwards?  We want to list the uploads where `upload.GetInitiated()` is `>= now - *max_upload_age`, and skip those where it is `<` (ie. which are too old).



##########
extensions/aws/s3/S3Wrapper.cpp:
##########
@@ -75,42 +69,155 @@ std::string S3Wrapper::getEncryptionString(Aws::S3::Model::ServerSideEncryption
   return "";
 }
 
-std::optional<PutObjectResult> S3Wrapper::putObject(const PutObjectRequestParameters& put_object_params, const std::shared_ptr<Aws::IOStream>& data_stream) {
-  Aws::S3::Model::PutObjectRequest request;
-  request.SetBucket(put_object_params.bucket);
-  request.SetKey(put_object_params.object_key);
-  request.SetStorageClass(STORAGE_CLASS_MAP.at(put_object_params.storage_class));
-  request.SetServerSideEncryption(SERVER_SIDE_ENCRYPTION_MAP.at(put_object_params.server_side_encryption));
-  request.SetContentType(put_object_params.content_type);
-  request.SetMetadata(put_object_params.user_metadata_map);
+std::shared_ptr<Aws::StringStream> S3Wrapper::readFlowFileStream(const std::shared_ptr<io::InputStream>& stream, uint64_t read_limit, uint64_t& read_size_out) {
+  std::vector<std::byte> buffer;
+  buffer.resize(BUFFER_SIZE);
+  auto data_stream = std::make_shared<Aws::StringStream>();
+  uint64_t read_size = 0;
+  while (read_size < read_limit) {
+    const auto next_read_size = (std::min)(read_limit - read_size, BUFFER_SIZE);
+    const auto read_ret = stream->read(gsl::make_span(buffer).subspan(0, next_read_size));
+    if (io::isError(read_ret)) {
+      throw StreamReadException("Reading flow file inputstream failed!");
+    }
+    if (read_ret > 0) {
+      data_stream->write(reinterpret_cast<char*>(buffer.data()), gsl::narrow<std::streamsize>(next_read_size));
+      read_size += read_ret;
+    } else {
+      break;
+    }
+  }
+  read_size_out = read_size;
+  return data_stream;
+}
+
+std::optional<PutObjectResult> S3Wrapper::putObject(const PutObjectRequestParameters& put_object_params, const std::shared_ptr<io::InputStream>& stream, uint64_t flow_size) {
+  uint64_t read_size{};
+  auto data_stream = readFlowFileStream(stream, flow_size, read_size);
+  auto request = createPutObjectRequest<Aws::S3::Model::PutObjectRequest>(put_object_params);
   request.SetBody(data_stream);
-  request.SetGrantFullControl(put_object_params.fullcontrol_user_list);
-  request.SetGrantRead(put_object_params.read_permission_user_list);
-  request.SetGrantReadACP(put_object_params.read_acl_user_list);
-  request.SetGrantWriteACP(put_object_params.write_acl_user_list);
-  setCannedAcl(request, put_object_params.canned_acl);
 
   auto aws_result = request_sender_->sendPutObjectRequest(request, put_object_params.credentials, put_object_params.client_config, put_object_params.use_virtual_addressing);
   if (!aws_result) {
     return std::nullopt;
   }
 
-  PutObjectResult result;
-  // Etags are returned by AWS in quoted form that should be removed
-  result.etag = minifi::utils::StringUtils::removeFramingCharacters(aws_result->GetETag(), '"');
-  result.version = aws_result->GetVersionId();
+  return createPutObjectResult(*aws_result);
+}
 
-  // GetExpiration returns a string pair with a date and a ruleid in 'expiry-date=\"<DATE>\", rule-id=\"<RULEID>\"' format
-  // s3.expiration only needs the date member of this pair
-  result.expiration = getExpiration(aws_result->GetExpiration()).expiration_time;
-  result.ssealgorithm = getEncryptionString(aws_result->GetServerSideEncryption());
+std::optional<S3Wrapper::UploadPartsResult> S3Wrapper::uploadParts(const PutObjectRequestParameters& put_object_params, const std::shared_ptr<io::InputStream>& stream,
+    MultipartUploadState upload_state) {
+  stream->seek(upload_state.uploaded_size);
+  S3Wrapper::UploadPartsResult result;
+  result.upload_id = upload_state.upload_id;
+  result.part_etags = upload_state.uploaded_etags;
+  const auto flow_size = upload_state.full_size - upload_state.uploaded_size;
+  const size_t part_count = flow_size % upload_state.part_size == 0 ? flow_size / upload_state.part_size : flow_size / upload_state.part_size + 1;
+  size_t total_read = 0;
+  const size_t start_part = upload_state.uploaded_parts + 1;
+  const size_t last_part = start_part + part_count - 1;
+  for (size_t i = start_part; i <= last_part; ++i) {
+    uint64_t read_size{};
+    const auto remaining = flow_size - total_read;
+    const auto next_read_size = remaining < upload_state.part_size ? remaining : upload_state.part_size;
+    auto stream_ptr = readFlowFileStream(stream, next_read_size, read_size);
+    total_read += read_size;
+
+    Aws::S3::Model::UploadPartRequest upload_part_request;
+    upload_part_request.WithBucket(put_object_params.bucket)
+      .WithKey(put_object_params.object_key)
+      .WithPartNumber(i)
+      .WithUploadId(upload_state.upload_id);
+    upload_part_request.SetBody(stream_ptr);
+
+    Aws::Utils::ByteBuffer part_md5(Aws::Utils::HashingUtils::CalculateMD5(*stream_ptr));
+    upload_part_request.SetContentMD5(Aws::Utils::HashingUtils::Base64Encode(part_md5));
+
+    auto upload_part_result = request_sender_->sendUploadPartRequest(upload_part_request, put_object_params.credentials, put_object_params.client_config, put_object_params.use_virtual_addressing);
+    if (!upload_part_result) {
+      logger_->log_error("Failed to upload part %d of %d of S3 object with key '%s'", i, last_part, put_object_params.object_key);
+      return std::nullopt;
+    }
+    result.part_etags.push_back(upload_part_result->GetETag());
+    upload_state.uploaded_etags.push_back(upload_part_result->GetETag());
+    upload_state.uploaded_parts += 1;
+    upload_state.uploaded_size += read_size;
+    multipart_upload_storage_->storeState(put_object_params.bucket, put_object_params.object_key, upload_state);
+    logger_->log_info("Uploaded part %d of %d S3 object with key '%s'", i, last_part, put_object_params.object_key);
+  }
+
+  multipart_upload_storage_->removeState(put_object_params.bucket, put_object_params.object_key);
   return result;
 }
 
+std::optional<Aws::S3::Model::CompleteMultipartUploadResult> S3Wrapper::completeMultipartUpload(const PutObjectRequestParameters& put_object_params,
+    const S3Wrapper::UploadPartsResult& upload_parts_result) {
+  Aws::S3::Model::CompleteMultipartUploadRequest complete_multipart_upload_request;
+  complete_multipart_upload_request.WithBucket(put_object_params.bucket)
+    .WithKey(put_object_params.object_key)
+    .WithUploadId(upload_parts_result.upload_id);
+
+  Aws::S3::Model::CompletedMultipartUpload completed_multipart_upload;
+  for (size_t i = 0; i < upload_parts_result.part_etags.size(); ++i) {
+    Aws::S3::Model::CompletedPart part;
+    part.WithETag(upload_parts_result.part_etags[i])
+      .WithPartNumber(i + 1);
+    completed_multipart_upload.AddParts(part);
+  }
+
+  complete_multipart_upload_request.SetMultipartUpload(completed_multipart_upload);
+
+  return request_sender_->sendCompleteMultipartUploadRequest(complete_multipart_upload_request, put_object_params.credentials,
+    put_object_params.client_config, put_object_params.use_virtual_addressing);
+}
+
+bool S3Wrapper::multipartUploadExistsInS3(const PutObjectRequestParameters& put_object_params) {
+  ListMultipartUploadsRequestParameters params(put_object_params.credentials, put_object_params.client_config);
+  params.bucket = put_object_params.bucket;
+  auto pending_uploads = listMultipartUploads(params);
+  if (!pending_uploads) {
+    return false;
+  }
+
+  return ranges::find_if(*pending_uploads, [&](const auto& upload) { return upload.key == put_object_params.object_key; }) != pending_uploads->end();

Review Comment:
   slightly shorter:
   ```suggestion
     return ranges::any_of(*pending_uploads, [&](const auto& upload) { return upload.key == put_object_params.object_key; });
   ```



##########
extensions/aws/s3/S3Wrapper.cpp:
##########
@@ -75,42 +69,155 @@ std::string S3Wrapper::getEncryptionString(Aws::S3::Model::ServerSideEncryption
   return "";
 }
 
-std::optional<PutObjectResult> S3Wrapper::putObject(const PutObjectRequestParameters& put_object_params, const std::shared_ptr<Aws::IOStream>& data_stream) {
-  Aws::S3::Model::PutObjectRequest request;
-  request.SetBucket(put_object_params.bucket);
-  request.SetKey(put_object_params.object_key);
-  request.SetStorageClass(STORAGE_CLASS_MAP.at(put_object_params.storage_class));
-  request.SetServerSideEncryption(SERVER_SIDE_ENCRYPTION_MAP.at(put_object_params.server_side_encryption));
-  request.SetContentType(put_object_params.content_type);
-  request.SetMetadata(put_object_params.user_metadata_map);
+std::shared_ptr<Aws::StringStream> S3Wrapper::readFlowFileStream(const std::shared_ptr<io::InputStream>& stream, uint64_t read_limit, uint64_t& read_size_out) {
+  std::vector<std::byte> buffer;
+  buffer.resize(BUFFER_SIZE);
+  auto data_stream = std::make_shared<Aws::StringStream>();
+  uint64_t read_size = 0;
+  while (read_size < read_limit) {
+    const auto next_read_size = (std::min)(read_limit - read_size, BUFFER_SIZE);
+    const auto read_ret = stream->read(gsl::make_span(buffer).subspan(0, next_read_size));
+    if (io::isError(read_ret)) {
+      throw StreamReadException("Reading flow file inputstream failed!");
+    }
+    if (read_ret > 0) {
+      data_stream->write(reinterpret_cast<char*>(buffer.data()), gsl::narrow<std::streamsize>(next_read_size));

Review Comment:
   should this be
   ```suggestion
         data_stream->write(reinterpret_cast<char*>(buffer.data()), gsl::narrow<std::streamsize>(read_ret));
   ```
   ?



##########
extensions/aws/s3/MultipartUploadStateStorage.cpp:
##########
@@ -0,0 +1,165 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "MultipartUploadStateStorage.h"
+
+#include <unordered_map>
+
+#include "utils/StringUtils.h"
+
+namespace org::apache::nifi::minifi::aws::s3 {
+
+MultipartUploadStateStorage::MultipartUploadStateStorage(const std::string& state_directory, const std::string& state_id) {
+  if (state_directory.empty()) {
+    char format[] = "/var/tmp/nifi-minifi-cpp.s3-multipart-upload.XXXXXX";
+    state_file_path_ = minifi::utils::file::FileUtils::create_temp_directory(format);
+  } else {
+    state_file_path_ = std::filesystem::path(state_directory) / std::string(state_id + "-s3-multipart-upload-state.properties");
+    if (!std::filesystem::exists(state_file_path_)) {
+      std::filesystem::create_directories(state_file_path_.parent_path());
+      std::ofstream ofs(state_file_path_);
+    } else {
+      loadFile();
+    }
+  }
+}
+
+void MultipartUploadStateStorage::storeState(const std::string& bucket, const std::string& key, const MultipartUploadState& state) {
+  std::lock_guard<std::mutex> lock(state_mutex_);
+  std::string state_key = bucket + "/" + key;
+  state_[state_key + ".upload_id"] = state.upload_id;
+  state_[state_key + ".upload_time"] = std::to_string(state.upload_time.Millis());
+  state_[state_key + ".uploaded_parts"] = std::to_string(state.uploaded_parts);
+  state_[state_key + ".uploaded_size"] = std::to_string(state.uploaded_size);
+  state_[state_key + ".part_size"] = std::to_string(state.part_size);
+  state_[state_key + ".full_size"] = std::to_string(state.full_size);
+  state_[state_key + ".uploaded_etags"] = minifi::utils::StringUtils::join(";", state.uploaded_etags);
+  commitChanges();
+  logger_->log_debug("Updated multipart upload state with key %s", state_key);
+}
+
+std::optional<MultipartUploadState> MultipartUploadStateStorage::getState(const std::string& bucket, const std::string& key) const {
+  std::string state_key = bucket + "/" + key;
+  if (!state_.contains(state_key + ".upload_id")) {
+    logger_->log_warn("Failed to get state: Multipart upload state was not found for key '%s'", state_key);
+    return std::nullopt;
+  }
+
+  std::lock_guard<std::mutex> lock(state_mutex_);
+  MultipartUploadState state;
+  state.upload_id = state_.at(state_key + ".upload_id");
+
+  int64_t stored_upload_time = 0;
+  core::Property::StringToInt(state_.at(state_key + ".upload_time"), stored_upload_time);
+  state.upload_time = Aws::Utils::DateTime(stored_upload_time);
+
+  core::Property::StringToInt(state_.at(state_key + ".uploaded_parts"), state.uploaded_parts);
+  core::Property::StringToInt(state_.at(state_key + ".uploaded_size"), state.uploaded_size);
+  core::Property::StringToInt(state_.at(state_key + ".part_size"), state.part_size);
+  core::Property::StringToInt(state_.at(state_key + ".full_size"), state.full_size);
+  state.uploaded_etags = minifi::utils::StringUtils::splitAndTrimRemovingEmpty(state_.at(state_key + ".uploaded_etags"), ";");
+  return state;
+}
+
+void MultipartUploadStateStorage::removeState(const std::string& bucket, const std::string& key) {
+  std::lock_guard<std::mutex> lock(state_mutex_);
+  std::string state_key = bucket + "/" + key;
+  if (!state_.contains(state_key + ".upload_id")) {
+    logger_->log_warn("Multipart upload state was not found for key '%s'", state_key);
+    return;
+  }
+
+  removeKey(state_key);
+  commitChanges();
+  logger_->log_debug("Removed multipart upload state with key %s", state_key);
+}
+
+void MultipartUploadStateStorage::removeAgedStates(std::chrono::milliseconds multipart_upload_max_age_threshold) {
+  std::lock_guard<std::mutex> lock(state_mutex_);
+  auto age_off_time = Aws::Utils::DateTime::Now() - multipart_upload_max_age_threshold;
+
+  std::vector<std::string> keys_to_remove;
+  for (const auto& [property_key, value] : state_) {
+    if (!minifi::utils::StringUtils::endsWith(property_key, ".upload_time")) {
+      continue;
+    }
+    if (!state_.contains(property_key)) {
+      logger_->log_error("Could not retrieve value for multipart upload cache key '%s'", property_key);
+      continue;
+    }

Review Comment:
   I don't think this check is necessary:  `property_key` came up as a key while iterating over `state_`, so it must exist as a key in `state_`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] lordgamez commented on a diff in pull request #1586: MINIFICPP-2127 Add multipart upload support for PutS3Object processor

Posted by "lordgamez (via GitHub)" <gi...@apache.org>.
lordgamez commented on code in PR #1586:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1586#discussion_r1234036346


##########
extensions/aws/s3/S3Wrapper.cpp:
##########
@@ -75,42 +69,155 @@ std::string S3Wrapper::getEncryptionString(Aws::S3::Model::ServerSideEncryption
   return "";
 }
 
-std::optional<PutObjectResult> S3Wrapper::putObject(const PutObjectRequestParameters& put_object_params, const std::shared_ptr<Aws::IOStream>& data_stream) {
-  Aws::S3::Model::PutObjectRequest request;
-  request.SetBucket(put_object_params.bucket);
-  request.SetKey(put_object_params.object_key);
-  request.SetStorageClass(STORAGE_CLASS_MAP.at(put_object_params.storage_class));
-  request.SetServerSideEncryption(SERVER_SIDE_ENCRYPTION_MAP.at(put_object_params.server_side_encryption));
-  request.SetContentType(put_object_params.content_type);
-  request.SetMetadata(put_object_params.user_metadata_map);
+std::shared_ptr<Aws::StringStream> S3Wrapper::readFlowFileStream(const std::shared_ptr<io::InputStream>& stream, uint64_t read_limit, uint64_t& read_size_out) {
+  std::vector<std::byte> buffer;
+  buffer.resize(BUFFER_SIZE);
+  auto data_stream = std::make_shared<Aws::StringStream>();
+  uint64_t read_size = 0;
+  while (read_size < read_limit) {
+    const auto next_read_size = (std::min)(read_limit - read_size, BUFFER_SIZE);
+    const auto read_ret = stream->read(gsl::make_span(buffer).subspan(0, next_read_size));
+    if (io::isError(read_ret)) {
+      throw StreamReadException("Reading flow file inputstream failed!");
+    }
+    if (read_ret > 0) {
+      data_stream->write(reinterpret_cast<char*>(buffer.data()), gsl::narrow<std::streamsize>(next_read_size));
+      read_size += read_ret;
+    } else {
+      break;
+    }
+  }
+  read_size_out = read_size;
+  return data_stream;
+}
+
+std::optional<PutObjectResult> S3Wrapper::putObject(const PutObjectRequestParameters& put_object_params, const std::shared_ptr<io::InputStream>& stream, uint64_t flow_size) {
+  uint64_t read_size{};
+  auto data_stream = readFlowFileStream(stream, flow_size, read_size);
+  auto request = createPutObjectRequest<Aws::S3::Model::PutObjectRequest>(put_object_params);
   request.SetBody(data_stream);
-  request.SetGrantFullControl(put_object_params.fullcontrol_user_list);
-  request.SetGrantRead(put_object_params.read_permission_user_list);
-  request.SetGrantReadACP(put_object_params.read_acl_user_list);
-  request.SetGrantWriteACP(put_object_params.write_acl_user_list);
-  setCannedAcl(request, put_object_params.canned_acl);
 
   auto aws_result = request_sender_->sendPutObjectRequest(request, put_object_params.credentials, put_object_params.client_config, put_object_params.use_virtual_addressing);
   if (!aws_result) {
     return std::nullopt;
   }
 
-  PutObjectResult result;
-  // Etags are returned by AWS in quoted form that should be removed
-  result.etag = minifi::utils::StringUtils::removeFramingCharacters(aws_result->GetETag(), '"');
-  result.version = aws_result->GetVersionId();
+  return createPutObjectResult(*aws_result);
+}
 
-  // GetExpiration returns a string pair with a date and a ruleid in 'expiry-date=\"<DATE>\", rule-id=\"<RULEID>\"' format
-  // s3.expiration only needs the date member of this pair
-  result.expiration = getExpiration(aws_result->GetExpiration()).expiration_time;
-  result.ssealgorithm = getEncryptionString(aws_result->GetServerSideEncryption());
+std::optional<S3Wrapper::UploadPartsResult> S3Wrapper::uploadParts(const PutObjectRequestParameters& put_object_params, const std::shared_ptr<io::InputStream>& stream,
+    MultipartUploadState upload_state) {
+  stream->seek(upload_state.uploaded_size);
+  S3Wrapper::UploadPartsResult result;
+  result.upload_id = upload_state.upload_id;
+  result.part_etags = upload_state.uploaded_etags;
+  const auto flow_size = upload_state.full_size - upload_state.uploaded_size;
+  const size_t part_count = flow_size % upload_state.part_size == 0 ? flow_size / upload_state.part_size : flow_size / upload_state.part_size + 1;
+  size_t total_read = 0;
+  const size_t start_part = upload_state.uploaded_parts + 1;
+  const size_t last_part = start_part + part_count - 1;
+  for (size_t i = start_part; i <= last_part; ++i) {
+    uint64_t read_size{};
+    const auto remaining = flow_size - total_read;
+    const auto next_read_size = remaining < upload_state.part_size ? remaining : upload_state.part_size;
+    auto stream_ptr = readFlowFileStream(stream, next_read_size, read_size);
+    total_read += read_size;
+
+    Aws::S3::Model::UploadPartRequest upload_part_request;
+    upload_part_request.WithBucket(put_object_params.bucket)
+      .WithKey(put_object_params.object_key)
+      .WithPartNumber(i)
+      .WithUploadId(upload_state.upload_id);
+    upload_part_request.SetBody(stream_ptr);
+
+    Aws::Utils::ByteBuffer part_md5(Aws::Utils::HashingUtils::CalculateMD5(*stream_ptr));
+    upload_part_request.SetContentMD5(Aws::Utils::HashingUtils::Base64Encode(part_md5));
+
+    auto upload_part_result = request_sender_->sendUploadPartRequest(upload_part_request, put_object_params.credentials, put_object_params.client_config, put_object_params.use_virtual_addressing);
+    if (!upload_part_result) {
+      logger_->log_error("Failed to upload part %d of %d of S3 object with key '%s'", i, last_part, put_object_params.object_key);
+      return std::nullopt;
+    }
+    result.part_etags.push_back(upload_part_result->GetETag());
+    upload_state.uploaded_etags.push_back(upload_part_result->GetETag());
+    upload_state.uploaded_parts += 1;
+    upload_state.uploaded_size += read_size;
+    multipart_upload_storage_->storeState(put_object_params.bucket, put_object_params.object_key, upload_state);
+    logger_->log_info("Uploaded part %d of %d S3 object with key '%s'", i, last_part, put_object_params.object_key);
+  }
+
+  multipart_upload_storage_->removeState(put_object_params.bucket, put_object_params.object_key);
   return result;
 }
 
+std::optional<Aws::S3::Model::CompleteMultipartUploadResult> S3Wrapper::completeMultipartUpload(const PutObjectRequestParameters& put_object_params,
+    const S3Wrapper::UploadPartsResult& upload_parts_result) {
+  Aws::S3::Model::CompleteMultipartUploadRequest complete_multipart_upload_request;
+  complete_multipart_upload_request.WithBucket(put_object_params.bucket)
+    .WithKey(put_object_params.object_key)
+    .WithUploadId(upload_parts_result.upload_id);
+
+  Aws::S3::Model::CompletedMultipartUpload completed_multipart_upload;
+  for (size_t i = 0; i < upload_parts_result.part_etags.size(); ++i) {
+    Aws::S3::Model::CompletedPart part;
+    part.WithETag(upload_parts_result.part_etags[i])
+      .WithPartNumber(i + 1);
+    completed_multipart_upload.AddParts(part);
+  }
+
+  complete_multipart_upload_request.SetMultipartUpload(completed_multipart_upload);
+
+  return request_sender_->sendCompleteMultipartUploadRequest(complete_multipart_upload_request, put_object_params.credentials,
+    put_object_params.client_config, put_object_params.use_virtual_addressing);
+}
+
+bool S3Wrapper::multipartUploadExistsInS3(const PutObjectRequestParameters& put_object_params) {
+  ListMultipartUploadsRequestParameters params(put_object_params.credentials, put_object_params.client_config);
+  params.bucket = put_object_params.bucket;
+  auto pending_uploads = listMultipartUploads(params);
+  if (!pending_uploads) {
+    return false;
+  }
+
+  return ranges::find_if(*pending_uploads, [&](const auto& upload) { return upload.key == put_object_params.object_key; }) != pending_uploads->end();

Review Comment:
   Updated in b45280b22c6f4e74f36cef4fd4b6305f177abe59



##########
extensions/aws/s3/S3Wrapper.cpp:
##########
@@ -297,4 +404,57 @@ FetchObjectResult S3Wrapper::fillFetchObjectResult(const GetObjectRequestParamet
   return result;
 }
 
+void S3Wrapper::addListMultipartUploadResults(const Aws::Vector<Aws::S3::Model::MultipartUpload>& uploads, std::optional<std::chrono::milliseconds> max_upload_age,
+    std::vector<MultipartUpload>& filtered_uploads) {
+  const auto now = Aws::Utils::DateTime::Now();
+  for (const auto& upload : uploads) {
+    if (max_upload_age && now - upload.GetInitiated() <= *max_upload_age) {
+      logger_->log_debug("Multipart upload with key '%s' and upload id '%s' did not meet the age limit", upload.GetKey(), upload.GetUploadId());
+      continue;
+    }

Review Comment:
   No, we actually want to skip those listed uploads that have not reached the age limit yet, because we only want to list the uploads that have already passed the max upload age, to delete the old pending uploads.



##########
extensions/aws/s3/S3Wrapper.cpp:
##########
@@ -297,4 +404,57 @@ FetchObjectResult S3Wrapper::fillFetchObjectResult(const GetObjectRequestParamet
   return result;
 }
 
+void S3Wrapper::addListMultipartUploadResults(const Aws::Vector<Aws::S3::Model::MultipartUpload>& uploads, std::optional<std::chrono::milliseconds> max_upload_age,
+    std::vector<MultipartUpload>& filtered_uploads) {
+  const auto now = Aws::Utils::DateTime::Now();
+  for (const auto& upload : uploads) {
+    if (max_upload_age && now - upload.GetInitiated() <= *max_upload_age) {
+      logger_->log_debug("Multipart upload with key '%s' and upload id '%s' did not meet the age limit", upload.GetKey(), upload.GetUploadId());
+      continue;
+    }
+
+    MultipartUpload filtered_upload;
+    filtered_upload.key = upload.GetKey();
+    filtered_upload.upload_id = upload.GetUploadId();
+    filtered_uploads.push_back(filtered_upload);
+  }
+}
+
+std::optional<std::vector<MultipartUpload>> S3Wrapper::listMultipartUploads(const ListMultipartUploadsRequestParameters& params) {
+  std::vector<MultipartUpload> result;
+  std::optional<Aws::S3::Model::ListMultipartUploadsResult> aws_result;
+  Aws::S3::Model::ListMultipartUploadsRequest request;
+  request.SetBucket(params.bucket);
+  do {
+    aws_result = request_sender_->sendListMultipartUploadsRequest(request, params.credentials, params.client_config, params.use_virtual_addressing);
+    if (!aws_result) {
+      return std::nullopt;
+    }
+    const auto& uploads = aws_result->GetUploads();
+    logger_->log_debug("AWS S3 List operation returned %zu multipart uploads. This result is%s truncated.", uploads.size(), aws_result->GetIsTruncated() ? "" : " not");
+    addListMultipartUploadResults(uploads, params.upload_max_age, result);
+    if (aws_result->GetIsTruncated()) {
+      request.SetKeyMarker(aws_result->GetNextKeyMarker());
+    }
+  } while (aws_result->GetIsTruncated());
+
+  return result;
+}
+
+bool S3Wrapper::abortMultipartUpload(const AbortMultipartUploadRequestParameters& params) {
+  Aws::S3::Model::AbortMultipartUploadRequest request;
+  request.WithBucket(params.bucket)
+    .WithKey(params.key)
+    .WithUploadId(params.upload_id);
+  return request_sender_->sendAbortMultipartUploadRequest(request, params.credentials, params.client_config, params.use_virtual_addressing);
+}
+
+void S3Wrapper::initailizeMultipartUploadStateStorage(const std::string& multipart_temp_dir, const std::string& state_id) {

Review Comment:
   Good catch, updated in b45280b22c6f4e74f36cef4fd4b6305f177abe59



##########
extensions/aws/tests/PutS3ObjectTests.cpp:
##########
@@ -71,7 +87,7 @@ TEST_CASE_METHOD(PutS3ObjectTestsFixture, "Test AWS credential setting", "[awsCr
     setCredentialsService();
   }
 
-  test_controller.runSession(plan, true);
+  test_controller.runSession(plan);

Review Comment:
   I checked and in the clang job `extensions/aws/tests/PutS3ObjectTests.cpp` file was listed as one of the parameters of clang-tidy so it should have run on that file. I'm not sure why it hasn't complained about this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] lordgamez commented on a diff in pull request #1586: MINIFICPP-2127 Add multipart upload support for PutS3Object processor

Posted by "lordgamez (via GitHub)" <gi...@apache.org>.
lordgamez commented on code in PR #1586:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1586#discussion_r1223096466


##########
libminifi/include/utils/TimeUtil.h:
##########
@@ -189,6 +189,20 @@ inline bool unit_matches<std::chrono::days>(const std::string& unit) {
   return unit == "d" || unit == "day" || unit == "days";
 }
 
+template<>
+inline bool unit_matches<std::chrono::weeks>(const std::string& unit) {
+  return unit == "w" || unit == "week" || unit == "weeks";
+}
+
+template<>
+inline bool unit_matches<std::chrono::months>(const std::string& unit) {
+  return unit == "mon" || unit == "month" || unit == "months";
+}
+
+template<>
+inline bool unit_matches<std::chrono::years>(const std::string& unit) {
+  return unit == "y" || unit == "year" || unit == "years";
+}

Review Comment:
   Updated in latest version



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] fgerlits commented on a diff in pull request #1586: MINIFICPP-2127 Add multipart upload support for PutS3Object processor

Posted by "fgerlits (via GitHub)" <gi...@apache.org>.
fgerlits commented on code in PR #1586:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1586#discussion_r1232579628


##########
extensions/aws/tests/PutS3ObjectTests.cpp:
##########
@@ -71,7 +87,7 @@ TEST_CASE_METHOD(PutS3ObjectTestsFixture, "Test AWS credential setting", "[awsCr
     setCredentialsService();
   }
 
-  test_controller.runSession(plan, true);
+  test_controller.runSession(plan);

Review Comment:
   Does clang-tidy not run on this file?  It usually complains about "static method accessed through instance" when it sees code like this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] lordgamez commented on a diff in pull request #1586: MINIFICPP-2127 Add multipart upload support for PutS3Object processor

Posted by "lordgamez (via GitHub)" <gi...@apache.org>.
lordgamez commented on code in PR #1586:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1586#discussion_r1258122129


##########
extensions/aws/s3/MultipartUploadStateStorage.cpp:
##########
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "MultipartUploadStateStorage.h"
+
+#include "utils/StringUtils.h"
+
+namespace org::apache::nifi::minifi::aws::s3 {
+
+void MultipartUploadStateStorage::storeState(const std::string& bucket, const std::string& key, const MultipartUploadState& state) {
+  std::unordered_map<std::string, std::string> stored_state;
+  state_manager_->get(stored_state);
+  std::string state_key = bucket + "/" + key;
+  stored_state[state_key + ".upload_id"] = state.upload_id;
+  stored_state[state_key + ".upload_time"] = std::to_string(state.upload_time.Millis());
+  stored_state[state_key + ".uploaded_parts"] = std::to_string(state.uploaded_parts);
+  stored_state[state_key + ".uploaded_size"] = std::to_string(state.uploaded_size);
+  stored_state[state_key + ".part_size"] = std::to_string(state.part_size);
+  stored_state[state_key + ".full_size"] = std::to_string(state.full_size);
+  stored_state[state_key + ".uploaded_etags"] = minifi::utils::StringUtils::join(";", state.uploaded_etags);
+  state_manager_->set(stored_state);
+  state_manager_->commit();
+  state_manager_->persist();

Review Comment:
   Doesn't the state manager take care of the synchronization?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] lordgamez commented on a diff in pull request #1586: MINIFICPP-2127 Add multipart upload support for PutS3Object processor

Posted by "lordgamez (via GitHub)" <gi...@apache.org>.
lordgamez commented on code in PR #1586:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1586#discussion_r1258158757


##########
extensions/aws/s3/MultipartUploadStateStorage.cpp:
##########
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "MultipartUploadStateStorage.h"
+
+#include "utils/StringUtils.h"
+
+namespace org::apache::nifi::minifi::aws::s3 {
+
+void MultipartUploadStateStorage::storeState(const std::string& bucket, const std::string& key, const MultipartUploadState& state) {
+  std::unordered_map<std::string, std::string> stored_state;
+  state_manager_->get(stored_state);
+  std::string state_key = bucket + "/" + key;
+  stored_state[state_key + ".upload_id"] = state.upload_id;
+  stored_state[state_key + ".upload_time"] = std::to_string(state.upload_time.Millis());
+  stored_state[state_key + ".uploaded_parts"] = std::to_string(state.uploaded_parts);
+  stored_state[state_key + ".uploaded_size"] = std::to_string(state.uploaded_size);
+  stored_state[state_key + ".part_size"] = std::to_string(state.part_size);
+  stored_state[state_key + ".full_size"] = std::to_string(state.full_size);
+  stored_state[state_key + ".uploaded_etags"] = minifi::utils::StringUtils::join(";", state.uploaded_etags);
+  state_manager_->set(stored_state);
+  state_manager_->commit();
+  state_manager_->persist();

Review Comment:
   Yes if the flowfile is processed the second time and the same bucket and key then if some parts are already uploaded then the upload will be continued from the next part.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] lordgamez commented on a diff in pull request #1586: MINIFICPP-2127 Add multipart upload support for PutS3Object processor

Posted by "lordgamez (via GitHub)" <gi...@apache.org>.
lordgamez commented on code in PR #1586:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1586#discussion_r1234249400


##########
extensions/aws/processors/PutS3Object.cpp:
##########
@@ -77,7 +78,31 @@ void PutS3Object::onSchedule(const std::shared_ptr<core::ProcessContext> &contex
     use_virtual_addressing_ = !*use_path_style_access;
   }
 
+  context->getProperty(MultipartThreshold.getName(), multipart_threshold_);
+  if (multipart_threshold_ > getMaxUploadSize() || multipart_threshold_ < getMinPartSize()) {
+    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Multipart Threshold is not between the valid 5MB and 5GB range!");
+  }
+  logger_->log_debug("PutS3Object: Multipart Threshold %" PRIu64, multipart_threshold_);
+  context->getProperty(MultipartPartSize.getName(), multipart_size_);
+  if (multipart_size_ > getMaxUploadSize() || multipart_size_ < getMinPartSize()) {
+    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Multipart Part Size is not between the valid 5MB and 5GB range!");
+  }
+  logger_->log_debug("PutS3Object: Multipart Size %" PRIu64, multipart_size_);
+
+
+  multipart_upload_ageoff_interval_ = minifi::utils::getRequiredPropertyOrThrow<core::TimePeriodValue>(*context, MultipartUploadAgeOffInterval.getName()).getMilliseconds();
+  logger_->log_debug("PutS3Object: Multipart Upload Ageoff Interval %" PRIu64 " ms", multipart_upload_ageoff_interval_.count());
+
+  multipart_upload_max_age_threshold_ = minifi::utils::getRequiredPropertyOrThrow<core::TimePeriodValue>(*context, MultipartUploadMaxAgeThreshold.getName()).getMilliseconds();
+  logger_->log_debug("PutS3Object: Multipart Upload Max Age Threshold %" PRIu64 " ms", multipart_upload_max_age_threshold_.count());
+
   fillUserMetadata(context);
+
+  std::string multipart_temp_dir;
+  context->getProperty(TemporaryDirectoryMultipartState.getName(), multipart_temp_dir);
+
+
+  s3_wrapper_.initailizeMultipartUploadStateStorage(multipart_temp_dir, getUUIDStr());

Review Comment:
   Added a commit with this change: b6e6b78e27ebe883cb55b8d7117ebf02cba3fa50



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] lordgamez commented on a diff in pull request #1586: MINIFICPP-2127 Add multipart upload support for PutS3Object processor

Posted by "lordgamez (via GitHub)" <gi...@apache.org>.
lordgamez commented on code in PR #1586:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1586#discussion_r1234035975


##########
docker/test/integration/cluster/checkers/AwsChecker.py:
##########
@@ -29,6 +29,16 @@ def check_s3_server_object_data(self, container_name, test_data):
         (code, file_data) = self.container_communicator.execute_command(container_name, ["cat", s3_mock_dir + "/binaryData"])
         return code == 0 and file_data == test_data
 
+    @retry_check()
+    def check_s3_server_object_hash(self, container_name: str, expected_file_hash: str):
+        (code, output) = self.container_communicator.execute_command(container_name, ["find", "/s3mockroot/test_bucket", "-mindepth", "1", "-maxdepth", "1", "-type", "d"])
+        if code != 0:
+            return False
+        s3_mock_dir = output.strip()
+        (code, md5_output) = self.container_communicator.execute_command(container_name, ["md5sum", s3_mock_dir + "/binaryData"])
+        file_hash = md5_output.split(' ')[0].strip()
+        return code == 0 and file_hash == expected_file_hash

Review Comment:
   Updated in b45280b22c6f4e74f36cef4fd4b6305f177abe59



##########
docker/test/integration/features/steps/steps.py:
##########
@@ -275,6 +275,11 @@ def step_impl(context, content, path):
     context.test.add_test_data(path, content)
 
 
+@given("a file with {size} content is present in \"{path}\"")

Review Comment:
   Updated in b45280b22c6f4e74f36cef4fd4b6305f177abe59



##########
extensions/aws/s3/MultipartUploadStateStorage.cpp:
##########
@@ -0,0 +1,165 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "MultipartUploadStateStorage.h"
+
+#include <unordered_map>
+
+#include "utils/StringUtils.h"
+
+namespace org::apache::nifi::minifi::aws::s3 {
+
+MultipartUploadStateStorage::MultipartUploadStateStorage(const std::string& state_directory, const std::string& state_id) {
+  if (state_directory.empty()) {
+    char format[] = "/var/tmp/nifi-minifi-cpp.s3-multipart-upload.XXXXXX";
+    state_file_path_ = minifi::utils::file::FileUtils::create_temp_directory(format);
+  } else {
+    state_file_path_ = std::filesystem::path(state_directory) / std::string(state_id + "-s3-multipart-upload-state.properties");
+    if (!std::filesystem::exists(state_file_path_)) {
+      std::filesystem::create_directories(state_file_path_.parent_path());
+      std::ofstream ofs(state_file_path_);

Review Comment:
   Good catch, it shouldn't be needed, I also corrected a mistake in the `state_directory.empty()` branch and added a test for it in b45280b22c6f4e74f36cef4fd4b6305f177abe59



##########
extensions/aws/s3/MultipartUploadStateStorage.cpp:
##########
@@ -0,0 +1,165 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "MultipartUploadStateStorage.h"
+
+#include <unordered_map>
+
+#include "utils/StringUtils.h"
+
+namespace org::apache::nifi::minifi::aws::s3 {
+
+MultipartUploadStateStorage::MultipartUploadStateStorage(const std::string& state_directory, const std::string& state_id) {
+  if (state_directory.empty()) {
+    char format[] = "/var/tmp/nifi-minifi-cpp.s3-multipart-upload.XXXXXX";
+    state_file_path_ = minifi::utils::file::FileUtils::create_temp_directory(format);
+  } else {
+    state_file_path_ = std::filesystem::path(state_directory) / std::string(state_id + "-s3-multipart-upload-state.properties");
+    if (!std::filesystem::exists(state_file_path_)) {
+      std::filesystem::create_directories(state_file_path_.parent_path());
+      std::ofstream ofs(state_file_path_);
+    } else {
+      loadFile();
+    }
+  }
+}
+
+void MultipartUploadStateStorage::storeState(const std::string& bucket, const std::string& key, const MultipartUploadState& state) {
+  std::lock_guard<std::mutex> lock(state_mutex_);
+  std::string state_key = bucket + "/" + key;
+  state_[state_key + ".upload_id"] = state.upload_id;
+  state_[state_key + ".upload_time"] = std::to_string(state.upload_time.Millis());
+  state_[state_key + ".uploaded_parts"] = std::to_string(state.uploaded_parts);
+  state_[state_key + ".uploaded_size"] = std::to_string(state.uploaded_size);
+  state_[state_key + ".part_size"] = std::to_string(state.part_size);
+  state_[state_key + ".full_size"] = std::to_string(state.full_size);
+  state_[state_key + ".uploaded_etags"] = minifi::utils::StringUtils::join(";", state.uploaded_etags);
+  commitChanges();
+  logger_->log_debug("Updated multipart upload state with key %s", state_key);
+}
+
+std::optional<MultipartUploadState> MultipartUploadStateStorage::getState(const std::string& bucket, const std::string& key) const {
+  std::string state_key = bucket + "/" + key;
+  if (!state_.contains(state_key + ".upload_id")) {
+    logger_->log_warn("Failed to get state: Multipart upload state was not found for key '%s'", state_key);
+    return std::nullopt;
+  }
+
+  std::lock_guard<std::mutex> lock(state_mutex_);
+  MultipartUploadState state;
+  state.upload_id = state_.at(state_key + ".upload_id");
+
+  int64_t stored_upload_time = 0;
+  core::Property::StringToInt(state_.at(state_key + ".upload_time"), stored_upload_time);
+  state.upload_time = Aws::Utils::DateTime(stored_upload_time);
+
+  core::Property::StringToInt(state_.at(state_key + ".uploaded_parts"), state.uploaded_parts);
+  core::Property::StringToInt(state_.at(state_key + ".uploaded_size"), state.uploaded_size);
+  core::Property::StringToInt(state_.at(state_key + ".part_size"), state.part_size);
+  core::Property::StringToInt(state_.at(state_key + ".full_size"), state.full_size);
+  state.uploaded_etags = minifi::utils::StringUtils::splitAndTrimRemovingEmpty(state_.at(state_key + ".uploaded_etags"), ";");
+  return state;
+}
+
+void MultipartUploadStateStorage::removeState(const std::string& bucket, const std::string& key) {
+  std::lock_guard<std::mutex> lock(state_mutex_);
+  std::string state_key = bucket + "/" + key;
+  if (!state_.contains(state_key + ".upload_id")) {
+    logger_->log_warn("Multipart upload state was not found for key '%s'", state_key);
+    return;
+  }
+
+  removeKey(state_key);
+  commitChanges();
+  logger_->log_debug("Removed multipart upload state with key %s", state_key);
+}
+
+void MultipartUploadStateStorage::removeAgedStates(std::chrono::milliseconds multipart_upload_max_age_threshold) {
+  std::lock_guard<std::mutex> lock(state_mutex_);
+  auto age_off_time = Aws::Utils::DateTime::Now() - multipart_upload_max_age_threshold;
+
+  std::vector<std::string> keys_to_remove;
+  for (const auto& [property_key, value] : state_) {
+    if (!minifi::utils::StringUtils::endsWith(property_key, ".upload_time")) {
+      continue;
+    }
+    if (!state_.contains(property_key)) {
+      logger_->log_error("Could not retrieve value for multipart upload cache key '%s'", property_key);
+      continue;
+    }

Review Comment:
   Good point, removed in b45280b22c6f4e74f36cef4fd4b6305f177abe59



##########
extensions/aws/s3/MultipartUploadStateStorage.cpp:
##########
@@ -0,0 +1,165 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "MultipartUploadStateStorage.h"
+
+#include <unordered_map>
+
+#include "utils/StringUtils.h"
+
+namespace org::apache::nifi::minifi::aws::s3 {
+
+MultipartUploadStateStorage::MultipartUploadStateStorage(const std::string& state_directory, const std::string& state_id) {
+  if (state_directory.empty()) {
+    char format[] = "/var/tmp/nifi-minifi-cpp.s3-multipart-upload.XXXXXX";
+    state_file_path_ = minifi::utils::file::FileUtils::create_temp_directory(format);
+  } else {
+    state_file_path_ = std::filesystem::path(state_directory) / std::string(state_id + "-s3-multipart-upload-state.properties");
+    if (!std::filesystem::exists(state_file_path_)) {
+      std::filesystem::create_directories(state_file_path_.parent_path());
+      std::ofstream ofs(state_file_path_);
+    } else {
+      loadFile();
+    }
+  }
+}
+
+void MultipartUploadStateStorage::storeState(const std::string& bucket, const std::string& key, const MultipartUploadState& state) {
+  std::lock_guard<std::mutex> lock(state_mutex_);
+  std::string state_key = bucket + "/" + key;
+  state_[state_key + ".upload_id"] = state.upload_id;
+  state_[state_key + ".upload_time"] = std::to_string(state.upload_time.Millis());
+  state_[state_key + ".uploaded_parts"] = std::to_string(state.uploaded_parts);
+  state_[state_key + ".uploaded_size"] = std::to_string(state.uploaded_size);
+  state_[state_key + ".part_size"] = std::to_string(state.part_size);
+  state_[state_key + ".full_size"] = std::to_string(state.full_size);
+  state_[state_key + ".uploaded_etags"] = minifi::utils::StringUtils::join(";", state.uploaded_etags);
+  commitChanges();
+  logger_->log_debug("Updated multipart upload state with key %s", state_key);
+}
+
+std::optional<MultipartUploadState> MultipartUploadStateStorage::getState(const std::string& bucket, const std::string& key) const {
+  std::string state_key = bucket + "/" + key;
+  if (!state_.contains(state_key + ".upload_id")) {
+    logger_->log_warn("Failed to get state: Multipart upload state was not found for key '%s'", state_key);
+    return std::nullopt;
+  }
+
+  std::lock_guard<std::mutex> lock(state_mutex_);
+  MultipartUploadState state;
+  state.upload_id = state_.at(state_key + ".upload_id");
+
+  int64_t stored_upload_time = 0;
+  core::Property::StringToInt(state_.at(state_key + ".upload_time"), stored_upload_time);
+  state.upload_time = Aws::Utils::DateTime(stored_upload_time);
+
+  core::Property::StringToInt(state_.at(state_key + ".uploaded_parts"), state.uploaded_parts);
+  core::Property::StringToInt(state_.at(state_key + ".uploaded_size"), state.uploaded_size);
+  core::Property::StringToInt(state_.at(state_key + ".part_size"), state.part_size);
+  core::Property::StringToInt(state_.at(state_key + ".full_size"), state.full_size);
+  state.uploaded_etags = minifi::utils::StringUtils::splitAndTrimRemovingEmpty(state_.at(state_key + ".uploaded_etags"), ";");
+  return state;
+}
+
+void MultipartUploadStateStorage::removeState(const std::string& bucket, const std::string& key) {
+  std::lock_guard<std::mutex> lock(state_mutex_);
+  std::string state_key = bucket + "/" + key;
+  if (!state_.contains(state_key + ".upload_id")) {
+    logger_->log_warn("Multipart upload state was not found for key '%s'", state_key);
+    return;
+  }
+
+  removeKey(state_key);
+  commitChanges();
+  logger_->log_debug("Removed multipart upload state with key %s", state_key);
+}
+
+void MultipartUploadStateStorage::removeAgedStates(std::chrono::milliseconds multipart_upload_max_age_threshold) {
+  std::lock_guard<std::mutex> lock(state_mutex_);
+  auto age_off_time = Aws::Utils::DateTime::Now() - multipart_upload_max_age_threshold;
+
+  std::vector<std::string> keys_to_remove;
+  for (const auto& [property_key, value] : state_) {
+    if (!minifi::utils::StringUtils::endsWith(property_key, ".upload_time")) {
+      continue;
+    }
+    if (!state_.contains(property_key)) {
+      logger_->log_error("Could not retrieve value for multipart upload cache key '%s'", property_key);
+      continue;
+    }
+    int64_t stored_upload_time{};
+    if (!core::Property::StringToInt(value, stored_upload_time)) {
+      logger_->log_error("Multipart upload cache key '%s' has invalid value '%s'", property_key, value);
+      continue;
+    }
+    auto upload_time = Aws::Utils::DateTime(stored_upload_time);
+    if (upload_time < age_off_time) {
+      auto state_key_and_property_name = minifi::utils::StringUtils::split(property_key, ".");
+      if (state_key_and_property_name.size() < 2) {
+        logger_->log_error("Invalid property '%s'", property_key);
+        continue;
+      }

Review Comment:
   Removed in b45280b22c6f4e74f36cef4fd4b6305f177abe59



##########
libminifi/include/utils/TimeUtil.h:
##########
@@ -189,6 +189,20 @@ inline bool unit_matches<std::chrono::days>(const std::string& unit) {
   return unit == "d" || unit == "day" || unit == "days";
 }
 
+template<>
+inline bool unit_matches<std::chrono::weeks>(const std::string& unit) {
+  return unit == "w" || unit == "wk" || unit == "wks" || unit == "week" || unit == "weeks";
+}
+
+template<>
+inline bool unit_matches<std::chrono::months>(const std::string& unit) {
+  return unit == "month" || unit == "months";
+}
+
+template<>
+inline bool unit_matches<std::chrono::years>(const std::string& unit) {
+  return unit == "y" || unit == "year" || unit == "years";
+}

Review Comment:
   Added in b45280b22c6f4e74f36cef4fd4b6305f177abe59



##########
extensions/aws/s3/S3Wrapper.cpp:
##########
@@ -75,42 +69,155 @@ std::string S3Wrapper::getEncryptionString(Aws::S3::Model::ServerSideEncryption
   return "";
 }
 
-std::optional<PutObjectResult> S3Wrapper::putObject(const PutObjectRequestParameters& put_object_params, const std::shared_ptr<Aws::IOStream>& data_stream) {
-  Aws::S3::Model::PutObjectRequest request;
-  request.SetBucket(put_object_params.bucket);
-  request.SetKey(put_object_params.object_key);
-  request.SetStorageClass(STORAGE_CLASS_MAP.at(put_object_params.storage_class));
-  request.SetServerSideEncryption(SERVER_SIDE_ENCRYPTION_MAP.at(put_object_params.server_side_encryption));
-  request.SetContentType(put_object_params.content_type);
-  request.SetMetadata(put_object_params.user_metadata_map);
+std::shared_ptr<Aws::StringStream> S3Wrapper::readFlowFileStream(const std::shared_ptr<io::InputStream>& stream, uint64_t read_limit, uint64_t& read_size_out) {
+  std::vector<std::byte> buffer;
+  buffer.resize(BUFFER_SIZE);

Review Comment:
   I see that in the project we use both implementations, but maybe it feels a bit better to use `std::array` instead. As Marton mentioned for our use cases there shouldn't be any significant differences between the two versions. Updated in b45280b22c6f4e74f36cef4fd4b6305f177abe59



##########
extensions/aws/s3/S3Wrapper.cpp:
##########
@@ -75,42 +69,155 @@ std::string S3Wrapper::getEncryptionString(Aws::S3::Model::ServerSideEncryption
   return "";
 }
 
-std::optional<PutObjectResult> S3Wrapper::putObject(const PutObjectRequestParameters& put_object_params, const std::shared_ptr<Aws::IOStream>& data_stream) {
-  Aws::S3::Model::PutObjectRequest request;
-  request.SetBucket(put_object_params.bucket);
-  request.SetKey(put_object_params.object_key);
-  request.SetStorageClass(STORAGE_CLASS_MAP.at(put_object_params.storage_class));
-  request.SetServerSideEncryption(SERVER_SIDE_ENCRYPTION_MAP.at(put_object_params.server_side_encryption));
-  request.SetContentType(put_object_params.content_type);
-  request.SetMetadata(put_object_params.user_metadata_map);
+std::shared_ptr<Aws::StringStream> S3Wrapper::readFlowFileStream(const std::shared_ptr<io::InputStream>& stream, uint64_t read_limit, uint64_t& read_size_out) {
+  std::vector<std::byte> buffer;
+  buffer.resize(BUFFER_SIZE);
+  auto data_stream = std::make_shared<Aws::StringStream>();
+  uint64_t read_size = 0;
+  while (read_size < read_limit) {
+    const auto next_read_size = (std::min)(read_limit - read_size, BUFFER_SIZE);
+    const auto read_ret = stream->read(gsl::make_span(buffer).subspan(0, next_read_size));
+    if (io::isError(read_ret)) {
+      throw StreamReadException("Reading flow file inputstream failed!");
+    }
+    if (read_ret > 0) {
+      data_stream->write(reinterpret_cast<char*>(buffer.data()), gsl::narrow<std::streamsize>(next_read_size));

Review Comment:
   I think you are right, although the two should be the same, but it's safer to use the actual read bytes. Updated in b45280b22c6f4e74f36cef4fd4b6305f177abe59



##########
extensions/aws/s3/S3Wrapper.cpp:
##########
@@ -75,42 +69,155 @@ std::string S3Wrapper::getEncryptionString(Aws::S3::Model::ServerSideEncryption
   return "";
 }
 
-std::optional<PutObjectResult> S3Wrapper::putObject(const PutObjectRequestParameters& put_object_params, const std::shared_ptr<Aws::IOStream>& data_stream) {
-  Aws::S3::Model::PutObjectRequest request;
-  request.SetBucket(put_object_params.bucket);
-  request.SetKey(put_object_params.object_key);
-  request.SetStorageClass(STORAGE_CLASS_MAP.at(put_object_params.storage_class));
-  request.SetServerSideEncryption(SERVER_SIDE_ENCRYPTION_MAP.at(put_object_params.server_side_encryption));
-  request.SetContentType(put_object_params.content_type);
-  request.SetMetadata(put_object_params.user_metadata_map);
+std::shared_ptr<Aws::StringStream> S3Wrapper::readFlowFileStream(const std::shared_ptr<io::InputStream>& stream, uint64_t read_limit, uint64_t& read_size_out) {
+  std::vector<std::byte> buffer;
+  buffer.resize(BUFFER_SIZE);
+  auto data_stream = std::make_shared<Aws::StringStream>();
+  uint64_t read_size = 0;
+  while (read_size < read_limit) {
+    const auto next_read_size = (std::min)(read_limit - read_size, BUFFER_SIZE);
+    const auto read_ret = stream->read(gsl::make_span(buffer).subspan(0, next_read_size));
+    if (io::isError(read_ret)) {
+      throw StreamReadException("Reading flow file inputstream failed!");
+    }
+    if (read_ret > 0) {
+      data_stream->write(reinterpret_cast<char*>(buffer.data()), gsl::narrow<std::streamsize>(next_read_size));
+      read_size += read_ret;
+    } else {
+      break;
+    }
+  }
+  read_size_out = read_size;
+  return data_stream;
+}
+
+std::optional<PutObjectResult> S3Wrapper::putObject(const PutObjectRequestParameters& put_object_params, const std::shared_ptr<io::InputStream>& stream, uint64_t flow_size) {
+  uint64_t read_size{};
+  auto data_stream = readFlowFileStream(stream, flow_size, read_size);
+  auto request = createPutObjectRequest<Aws::S3::Model::PutObjectRequest>(put_object_params);
   request.SetBody(data_stream);
-  request.SetGrantFullControl(put_object_params.fullcontrol_user_list);
-  request.SetGrantRead(put_object_params.read_permission_user_list);
-  request.SetGrantReadACP(put_object_params.read_acl_user_list);
-  request.SetGrantWriteACP(put_object_params.write_acl_user_list);
-  setCannedAcl(request, put_object_params.canned_acl);
 
   auto aws_result = request_sender_->sendPutObjectRequest(request, put_object_params.credentials, put_object_params.client_config, put_object_params.use_virtual_addressing);
   if (!aws_result) {
     return std::nullopt;
   }
 
-  PutObjectResult result;
-  // Etags are returned by AWS in quoted form that should be removed
-  result.etag = minifi::utils::StringUtils::removeFramingCharacters(aws_result->GetETag(), '"');
-  result.version = aws_result->GetVersionId();
+  return createPutObjectResult(*aws_result);
+}
 
-  // GetExpiration returns a string pair with a date and a ruleid in 'expiry-date=\"<DATE>\", rule-id=\"<RULEID>\"' format
-  // s3.expiration only needs the date member of this pair
-  result.expiration = getExpiration(aws_result->GetExpiration()).expiration_time;
-  result.ssealgorithm = getEncryptionString(aws_result->GetServerSideEncryption());
+std::optional<S3Wrapper::UploadPartsResult> S3Wrapper::uploadParts(const PutObjectRequestParameters& put_object_params, const std::shared_ptr<io::InputStream>& stream,
+    MultipartUploadState upload_state) {
+  stream->seek(upload_state.uploaded_size);
+  S3Wrapper::UploadPartsResult result;
+  result.upload_id = upload_state.upload_id;
+  result.part_etags = upload_state.uploaded_etags;
+  const auto flow_size = upload_state.full_size - upload_state.uploaded_size;
+  const size_t part_count = flow_size % upload_state.part_size == 0 ? flow_size / upload_state.part_size : flow_size / upload_state.part_size + 1;

Review Comment:
   Updated in b45280b22c6f4e74f36cef4fd4b6305f177abe59



##########
extensions/aws/s3/S3Wrapper.cpp:
##########
@@ -75,42 +69,155 @@ std::string S3Wrapper::getEncryptionString(Aws::S3::Model::ServerSideEncryption
   return "";
 }
 
-std::optional<PutObjectResult> S3Wrapper::putObject(const PutObjectRequestParameters& put_object_params, const std::shared_ptr<Aws::IOStream>& data_stream) {
-  Aws::S3::Model::PutObjectRequest request;
-  request.SetBucket(put_object_params.bucket);
-  request.SetKey(put_object_params.object_key);
-  request.SetStorageClass(STORAGE_CLASS_MAP.at(put_object_params.storage_class));
-  request.SetServerSideEncryption(SERVER_SIDE_ENCRYPTION_MAP.at(put_object_params.server_side_encryption));
-  request.SetContentType(put_object_params.content_type);
-  request.SetMetadata(put_object_params.user_metadata_map);
+std::shared_ptr<Aws::StringStream> S3Wrapper::readFlowFileStream(const std::shared_ptr<io::InputStream>& stream, uint64_t read_limit, uint64_t& read_size_out) {
+  std::vector<std::byte> buffer;
+  buffer.resize(BUFFER_SIZE);
+  auto data_stream = std::make_shared<Aws::StringStream>();
+  uint64_t read_size = 0;
+  while (read_size < read_limit) {
+    const auto next_read_size = (std::min)(read_limit - read_size, BUFFER_SIZE);
+    const auto read_ret = stream->read(gsl::make_span(buffer).subspan(0, next_read_size));
+    if (io::isError(read_ret)) {
+      throw StreamReadException("Reading flow file inputstream failed!");
+    }
+    if (read_ret > 0) {
+      data_stream->write(reinterpret_cast<char*>(buffer.data()), gsl::narrow<std::streamsize>(next_read_size));
+      read_size += read_ret;
+    } else {
+      break;
+    }
+  }
+  read_size_out = read_size;
+  return data_stream;
+}
+
+std::optional<PutObjectResult> S3Wrapper::putObject(const PutObjectRequestParameters& put_object_params, const std::shared_ptr<io::InputStream>& stream, uint64_t flow_size) {
+  uint64_t read_size{};
+  auto data_stream = readFlowFileStream(stream, flow_size, read_size);
+  auto request = createPutObjectRequest<Aws::S3::Model::PutObjectRequest>(put_object_params);
   request.SetBody(data_stream);
-  request.SetGrantFullControl(put_object_params.fullcontrol_user_list);
-  request.SetGrantRead(put_object_params.read_permission_user_list);
-  request.SetGrantReadACP(put_object_params.read_acl_user_list);
-  request.SetGrantWriteACP(put_object_params.write_acl_user_list);
-  setCannedAcl(request, put_object_params.canned_acl);
 
   auto aws_result = request_sender_->sendPutObjectRequest(request, put_object_params.credentials, put_object_params.client_config, put_object_params.use_virtual_addressing);
   if (!aws_result) {
     return std::nullopt;
   }
 
-  PutObjectResult result;
-  // Etags are returned by AWS in quoted form that should be removed
-  result.etag = minifi::utils::StringUtils::removeFramingCharacters(aws_result->GetETag(), '"');
-  result.version = aws_result->GetVersionId();
+  return createPutObjectResult(*aws_result);
+}
 
-  // GetExpiration returns a string pair with a date and a ruleid in 'expiry-date=\"<DATE>\", rule-id=\"<RULEID>\"' format
-  // s3.expiration only needs the date member of this pair
-  result.expiration = getExpiration(aws_result->GetExpiration()).expiration_time;
-  result.ssealgorithm = getEncryptionString(aws_result->GetServerSideEncryption());
+std::optional<S3Wrapper::UploadPartsResult> S3Wrapper::uploadParts(const PutObjectRequestParameters& put_object_params, const std::shared_ptr<io::InputStream>& stream,
+    MultipartUploadState upload_state) {
+  stream->seek(upload_state.uploaded_size);
+  S3Wrapper::UploadPartsResult result;
+  result.upload_id = upload_state.upload_id;
+  result.part_etags = upload_state.uploaded_etags;
+  const auto flow_size = upload_state.full_size - upload_state.uploaded_size;
+  const size_t part_count = flow_size % upload_state.part_size == 0 ? flow_size / upload_state.part_size : flow_size / upload_state.part_size + 1;
+  size_t total_read = 0;
+  const size_t start_part = upload_state.uploaded_parts + 1;
+  const size_t last_part = start_part + part_count - 1;
+  for (size_t i = start_part; i <= last_part; ++i) {

Review Comment:
   Renamed in b45280b22c6f4e74f36cef4fd4b6305f177abe59



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] szaszm commented on a diff in pull request #1586: MINIFICPP-2127 Add multipart upload support for PutS3Object processor

Posted by "szaszm (via GitHub)" <gi...@apache.org>.
szaszm commented on code in PR #1586:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1586#discussion_r1231981938


##########
extensions/aws/processors/PutS3Object.cpp:
##########
@@ -77,7 +78,31 @@ void PutS3Object::onSchedule(const std::shared_ptr<core::ProcessContext> &contex
     use_virtual_addressing_ = !*use_path_style_access;
   }
 
+  context->getProperty(MultipartThreshold.getName(), multipart_threshold_);
+  if (multipart_threshold_ > getMaxUploadSize() || multipart_threshold_ < getMinPartSize()) {
+    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Multipart Threshold is not between the valid 5MB and 5GB range!");
+  }
+  logger_->log_debug("PutS3Object: Multipart Threshold %" PRIu64, multipart_threshold_);
+  context->getProperty(MultipartPartSize.getName(), multipart_size_);
+  if (multipart_size_ > getMaxUploadSize() || multipart_size_ < getMinPartSize()) {
+    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Multipart Part Size is not between the valid 5MB and 5GB range!");
+  }
+  logger_->log_debug("PutS3Object: Multipart Size %" PRIu64, multipart_size_);
+
+
+  multipart_upload_ageoff_interval_ = minifi::utils::getRequiredPropertyOrThrow<core::TimePeriodValue>(*context, MultipartUploadAgeOffInterval.getName()).getMilliseconds();
+  logger_->log_debug("PutS3Object: Multipart Upload Ageoff Interval %" PRIu64 " ms", multipart_upload_ageoff_interval_.count());
+
+  multipart_upload_max_age_threshold_ = minifi::utils::getRequiredPropertyOrThrow<core::TimePeriodValue>(*context, MultipartUploadMaxAgeThreshold.getName()).getMilliseconds();
+  logger_->log_debug("PutS3Object: Multipart Upload Max Age Threshold %" PRIu64 " ms", multipart_upload_max_age_threshold_.count());
+
   fillUserMetadata(context);
+
+  std::string multipart_temp_dir;
+  context->getProperty(TemporaryDirectoryMultipartState.getName(), multipart_temp_dir);
+
+
+  s3_wrapper_.initailizeMultipartUploadStateStorage(multipart_temp_dir, getUUIDStr());

Review Comment:
   I don't really like circumventing the state manager, we should aim to make it work by extending it instead IMO. Not necessarily in this PR, but that works, too. And I want to avoid introducing a property that will need to be removed later, because removing a processor property may break old config.ymls that contain it. (Unless dynamic properties are enabled, but they don't seem to make sense here.)
   
   I think should go with another compromise, my ideas:
   1. Give up the functionality of continuing failed multipart uploads after a restart. It could still continue if the agent wasn't restarted, but restart would restart the upload.
   2. Use a system-wide temporary directory, e.g. /tmp/minifi-multipart-state/processor_uuid
   3. Configure the multipart state dir globally in minifi.properties, because that doesn't break configs when we drop the config property
   
   I gravitate towards 1, but I'm interested in your viewpoint.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] lordgamez commented on a diff in pull request #1586: MINIFICPP-2127 Add multipart upload support for PutS3Object processor

Posted by "lordgamez (via GitHub)" <gi...@apache.org>.
lordgamez commented on code in PR #1586:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1586#discussion_r1259296645


##########
extensions/aws/processors/PutS3Object.h:
##########
@@ -97,54 +111,46 @@ class PutS3Object : public S3Processor {
 
   class ReadCallback {
    public:
-    static constexpr uint64_t MAX_SIZE = 5_GiB;
-    static constexpr uint64_t BUFFER_SIZE = 4_KiB;
-
-    ReadCallback(uint64_t flow_size, const minifi::aws::s3::PutObjectRequestParameters& options, aws::s3::S3Wrapper& s3_wrapper)
-      : flow_size_(flow_size)
-      , options_(options)
-      , s3_wrapper_(s3_wrapper) {
+    ReadCallback(uint64_t flow_size, const minifi::aws::s3::PutObjectRequestParameters& options, aws::s3::S3Wrapper& s3_wrapper,
+          uint64_t multipart_threshold, uint64_t multipart_size, core::logging::Logger& logger)

Review Comment:
   Updated in 0464ed5fc57b119da618142c671f2b985b4ae127



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] lordgamez commented on a diff in pull request #1586: MINIFICPP-2127 Add multipart upload support for PutS3Object processor

Posted by "lordgamez (via GitHub)" <gi...@apache.org>.
lordgamez commented on code in PR #1586:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1586#discussion_r1273117515


##########
extensions/aws/processors/PutS3Object.cpp:
##########
@@ -29,9 +29,51 @@
 #include "core/ProcessContext.h"
 #include "core/ProcessSession.h"
 #include "core/Resource.h"
+#include "utils/ProcessorConfigUtils.h"
 
 namespace org::apache::nifi::minifi::aws::processors {
 
+namespace {
+class ReadCallback {
+ public:
+  ReadCallback(uint64_t flow_size, const minifi::aws::s3::PutObjectRequestParameters& options, aws::s3::S3Wrapper& s3_wrapper,
+        uint64_t multipart_threshold, uint64_t multipart_size, core::logging::Logger& logger)
+    : flow_size_(flow_size),
+      options_(options),
+      s3_wrapper_(s3_wrapper),
+      multipart_threshold_(multipart_threshold),
+      multipart_size_(multipart_size),
+      logger_(logger) {
+  }
+
+  int64_t operator()(const std::shared_ptr<io::InputStream>& stream) {
+    try {
+      if (flow_size_ <= multipart_threshold_) {

Review Comment:
   I implemented it this way because NiFi also has the [same implementation](https://github.com/apache/nifi/blob/d201119f0dc2b925d7a2fcc9f09ac80ef8672be8/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java#L613) for the threshold. I think one of the reasons for this is that the default value for it is 5GB which is the maximum size of a single part upload and that should be part of the inclusive size range.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] szaszm commented on a diff in pull request #1586: MINIFICPP-2127 Add multipart upload support for PutS3Object processor

Posted by "szaszm (via GitHub)" <gi...@apache.org>.
szaszm commented on code in PR #1586:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1586#discussion_r1268163955


##########
extensions/aws/processors/PutS3Object.cpp:
##########
@@ -29,9 +29,51 @@
 #include "core/ProcessContext.h"
 #include "core/ProcessSession.h"
 #include "core/Resource.h"
+#include "utils/ProcessorConfigUtils.h"
 
 namespace org::apache::nifi::minifi::aws::processors {
 
+namespace {
+class ReadCallback {
+ public:
+  ReadCallback(uint64_t flow_size, const minifi::aws::s3::PutObjectRequestParameters& options, aws::s3::S3Wrapper& s3_wrapper,
+        uint64_t multipart_threshold, uint64_t multipart_size, core::logging::Logger& logger)
+    : flow_size_(flow_size),
+      options_(options),
+      s3_wrapper_(s3_wrapper),
+      multipart_threshold_(multipart_threshold),
+      multipart_size_(multipart_size),
+      logger_(logger) {
+  }
+
+  int64_t operator()(const std::shared_ptr<io::InputStream>& stream) {
+    try {
+      if (flow_size_ <= multipart_threshold_) {

Review Comment:
   I'd change this to "less-than", it feels more intuitive to have the threshold just hit already be multipart.



##########
extensions/aws/processors/PutS3Object.cpp:
##########
@@ -186,6 +250,48 @@ void PutS3Object::setAttributes(
   }
 }
 
+void PutS3Object::ageOffMultipartUploads(const CommonProperties &common_properties) {
+  const auto now = std::chrono::system_clock::now();
+  if (now - last_ageoff_time_.load() < multipart_upload_ageoff_interval_) {
+    logger_->log_debug("Multipart Upload Age off interval still in progress, not checking obsolete multipart uploads.");
+    return;
+  }
+
+  logger_->log_trace("Listing aged off multipart uploads still in progress.");
+  aws::s3::ListMultipartUploadsRequestParameters list_params(common_properties.credentials, *client_config_);
+  list_params.setClientConfig(common_properties.proxy, common_properties.endpoint_override_url);
+  list_params.bucket = common_properties.bucket;
+  list_params.age_off_limit = multipart_upload_max_age_threshold_;
+  list_params.use_virtual_addressing = use_virtual_addressing_;
+  auto aged_off_uploads_in_progress = s3_wrapper_.listMultipartUploads(list_params);
+  if (!aged_off_uploads_in_progress) {
+    logger_->log_error("Listing aged off multipart uploads failed!");
+    return;
+  }
+
+  logger_->log_info("Found %d aged off pending multipart upload jobs in bucket '%s'", aged_off_uploads_in_progress->size(), common_properties.bucket);
+  size_t aborted = 0;
+  for (const auto& upload : *aged_off_uploads_in_progress) {
+    logger_->log_info("Aborting multipart upload with key '%s' and upload id '%s' in bucket '%s'", upload.key, upload.upload_id, common_properties.bucket);

Review Comment:
   This log message should include the reason for the abort, to avoid confusion.



##########
extensions/aws/s3/S3Wrapper.h:
##########
@@ -195,31 +198,118 @@ struct ListedObjectAttributes : public minifi::utils::ListedObject {
 using HeadObjectRequestParameters = GetObjectRequestParameters;
 using GetObjectTagsParameters = DeleteObjectRequestParameters;
 
+struct ListMultipartUploadsRequestParameters : public RequestParameters {
+  ListMultipartUploadsRequestParameters(const Aws::Auth::AWSCredentials& creds, const Aws::Client::ClientConfiguration& config)
+    : RequestParameters(creds, config) {}
+  std::string bucket;
+  std::optional<std::chrono::milliseconds> age_off_limit;  // if set, only list the aged off uploads
+  bool use_virtual_addressing = true;
+};
+
+struct MultipartUpload {
+  std::string key;
+  std::string upload_id;
+};
+
+struct AbortMultipartUploadRequestParameters : public RequestParameters {
+  AbortMultipartUploadRequestParameters(const Aws::Auth::AWSCredentials& creds, const Aws::Client::ClientConfiguration& config)
+    : RequestParameters(creds, config) {}
+  std::string bucket;
+  std::string key;
+  std::string upload_id;
+  bool use_virtual_addressing = true;
+};
+
+class StreamReadException : public Exception {
+ public:
+  explicit StreamReadException(const std::string& error) : Exception(GENERAL_EXCEPTION, error) {}
+};
+
 class S3Wrapper {
  public:
+  static constexpr uint64_t BUFFER_SIZE = 4_KiB;
+
   S3Wrapper();
   explicit S3Wrapper(std::unique_ptr<S3RequestSender>&& request_sender);
 
-  std::optional<PutObjectResult> putObject(const PutObjectRequestParameters& put_object_params, const std::shared_ptr<Aws::IOStream>& data_stream);
+  std::optional<PutObjectResult> putObject(const PutObjectRequestParameters& put_object_params, const std::shared_ptr<io::InputStream>& stream, uint64_t flow_size);
+  std::optional<PutObjectResult> putObjectMultipart(const PutObjectRequestParameters& put_object_params, const std::shared_ptr<io::InputStream>& stream, uint64_t flow_size, uint64_t multipart_size);
   bool deleteObject(const DeleteObjectRequestParameters& params);
   std::optional<GetObjectResult> getObject(const GetObjectRequestParameters& get_object_params, io::OutputStream& out_body);
   std::optional<std::vector<ListedObjectAttributes>> listBucket(const ListRequestParameters& params);
   std::optional<std::map<std::string, std::string>> getObjectTags(const GetObjectTagsParameters& params);
   std::optional<HeadObjectResult> headObject(const HeadObjectRequestParameters& head_object_params);
+  std::optional<std::vector<MultipartUpload>> listMultipartUploads(const ListMultipartUploadsRequestParameters& params);
+  bool abortMultipartUpload(const AbortMultipartUploadRequestParameters& params);
+  void ageOffLocalS3MultipartUploadStates(std::chrono::milliseconds multipart_upload_max_age_threshold);
+  void initializeMultipartUploadStateStorage(gsl::not_null<minifi::core::StateManager*> state_manager);
 
   virtual ~S3Wrapper() = default;
 
  private:
+  struct UploadPartsResult {
+    std::string upload_id;
+    std::vector<std::string> part_etags;
+  };
+
   static Expiration getExpiration(const std::string& expiration);
 
-  void setCannedAcl(Aws::S3::Model::PutObjectRequest& request, const std::string& canned_acl) const;
+  template<typename RequestType>
+  void setCannedAcl(RequestType& request, const std::string& canned_acl) const {
+    if (canned_acl.empty()) return;
+
+    const auto it = ranges::find(CANNED_ACL_MAP, canned_acl, [](const auto& kv) { return kv.first; });
+    if (it == CANNED_ACL_MAP.end()) return;
+
+    logger_->log_debug("Setting AWS canned ACL [%s]", canned_acl);
+    request.SetACL(it->second);
+  }
+
+  template<typename RequestType>
+  RequestType createPutObjectRequest(const PutObjectRequestParameters& put_object_params) {
+    RequestType request;
+    request.WithBucket(put_object_params.bucket)
+      .WithKey(put_object_params.object_key)
+      .WithStorageClass(minifi::utils::at(STORAGE_CLASS_MAP, put_object_params.storage_class))
+      .WithServerSideEncryption(minifi::utils::at(SERVER_SIDE_ENCRYPTION_MAP, put_object_params.server_side_encryption))
+      .WithMetadata(put_object_params.user_metadata_map)
+      .WithGrantFullControl(put_object_params.fullcontrol_user_list)
+      .WithGrantRead(put_object_params.read_permission_user_list)
+      .WithGrantReadACP(put_object_params.read_acl_user_list)
+      .WithGrantWriteACP(put_object_params.write_acl_user_list);
+    request.SetContentType(put_object_params.content_type);
+    setCannedAcl<RequestType>(request, put_object_params.canned_acl);
+    return request;
+  }
+
+  template<typename ResultType>
+  PutObjectResult createPutObjectResult(const ResultType& upload_result) {
+    PutObjectResult put_object_result;
+    // Etags are returned by AWS in quoted form that should be removed
+    put_object_result.etag = minifi::utils::StringUtils::removeFramingCharacters(upload_result.GetETag(), '"');
+    put_object_result.version = upload_result.GetVersionId();
+
+    // GetExpiration returns a string pair with a date and a ruleid in 'expiry-date=\"<DATE>\", rule-id=\"<RULEID>\"' format
+    // s3.expiration only needs the date member of this pair
+    put_object_result.expiration = getExpiration(upload_result.GetExpiration()).expiration_time;
+    put_object_result.ssealgorithm = getEncryptionString(upload_result.GetServerSideEncryption());
+    return put_object_result;
+  }

Review Comment:
   ```suggestion
     PutObjectResult createPutObjectResult(const auto& upload_result) {
       return {
         // Etags are returned by AWS in quoted form that should be removed
         .etag = minifi::utils::StringUtils::removeFramingCharacters(upload_result.GetETag(), '"'),
         .version = upload_result.GetVersionId(),
   
         // GetExpiration returns a string pair with a date and a ruleid in 'expiry-date=\"<DATE>\", rule-id=\"<RULEID>\"' format
         // s3.expiration only needs the date member of this pair
         .expiration = getExpiration(upload_result.GetExpiration()).expiration_time,
         .ssealgorithm = getEncryptionString(upload_result.GetServerSideEncryption())
       };
     }
   ```



##########
extensions/aws/s3/S3Wrapper.cpp:
##########
@@ -298,4 +407,58 @@ FetchObjectResult S3Wrapper::fillFetchObjectResult(const GetObjectRequestParamet
   return result;
 }
 
+void S3Wrapper::addListMultipartUploadResults(const Aws::Vector<Aws::S3::Model::MultipartUpload>& uploads, std::optional<std::chrono::milliseconds> age_off_limit,
+    std::vector<MultipartUpload>& filtered_uploads) {
+  const auto now = Aws::Utils::DateTime::Now();
+  for (const auto& upload : uploads) {
+    // if age_off_limit is set only list the aged off uploads
+    if (age_off_limit && now - upload.GetInitiated() <= *age_off_limit) {
+      logger_->log_debug("Multipart upload with key '%s' and upload id '%s' has not aged off yet", upload.GetKey(), upload.GetUploadId());
+      continue;
+    }
+
+    MultipartUpload filtered_upload;
+    filtered_upload.key = upload.GetKey();
+    filtered_upload.upload_id = upload.GetUploadId();
+    filtered_uploads.push_back(filtered_upload);

Review Comment:
   I'd do this for simplicity
   ```suggestion
       filtered_uploads.push_back({.key = upload.GetKey(), .upload_id = upload.GetUploadId()});
   ```



##########
extensions/aws/processors/PutS3Object.cpp:
##########
@@ -186,6 +250,48 @@ void PutS3Object::setAttributes(
   }
 }
 
+void PutS3Object::ageOffMultipartUploads(const CommonProperties &common_properties) {
+  const auto now = std::chrono::system_clock::now();
+  if (now - last_ageoff_time_.load() < multipart_upload_ageoff_interval_) {
+    logger_->log_debug("Multipart Upload Age off interval still in progress, not checking obsolete multipart uploads.");
+    return;
+  }
+
+  logger_->log_trace("Listing aged off multipart uploads still in progress.");
+  aws::s3::ListMultipartUploadsRequestParameters list_params(common_properties.credentials, *client_config_);
+  list_params.setClientConfig(common_properties.proxy, common_properties.endpoint_override_url);
+  list_params.bucket = common_properties.bucket;
+  list_params.age_off_limit = multipart_upload_max_age_threshold_;
+  list_params.use_virtual_addressing = use_virtual_addressing_;
+  auto aged_off_uploads_in_progress = s3_wrapper_.listMultipartUploads(list_params);
+  if (!aged_off_uploads_in_progress) {
+    logger_->log_error("Listing aged off multipart uploads failed!");
+    return;
+  }
+
+  logger_->log_info("Found %d aged off pending multipart upload jobs in bucket '%s'", aged_off_uploads_in_progress->size(), common_properties.bucket);
+  size_t aborted = 0;
+  for (const auto& upload : *aged_off_uploads_in_progress) {
+    logger_->log_info("Aborting multipart upload with key '%s' and upload id '%s' in bucket '%s'", upload.key, upload.upload_id, common_properties.bucket);
+    aws::s3::AbortMultipartUploadRequestParameters abort_params(common_properties.credentials, *client_config_);
+    abort_params.setClientConfig(common_properties.proxy, common_properties.endpoint_override_url);
+    abort_params.bucket = common_properties.bucket;
+    abort_params.key = upload.key;
+    abort_params.upload_id = upload.upload_id;
+    abort_params.use_virtual_addressing = use_virtual_addressing_;
+    if (!s3_wrapper_.abortMultipartUpload(abort_params)) {
+       logger_->log_error("Failed to abort multipart upload with key '%s' and upload id '%s' in bucket '%s'", abort_params.key, abort_params.upload_id, abort_params.bucket);
+       continue;
+    }
+    ++aborted;
+  }
+  if (aborted > 0) {
+    logger_->log_info("Aborted %d pending multipart upload jobs in bucket '%s'", aborted, common_properties.bucket);
+  }
+  s3_wrapper_.ageOffLocalS3MultipartUploadStates(multipart_upload_max_age_threshold_);
+  last_ageoff_time_ = now;

Review Comment:
   shouldn't this synchronize the whole aged off block of data between early last_ageoff_time and early now, doing an cmpxchgq at the start and check the interval for expired ageoff parts?



##########
extensions/aws/s3/S3Wrapper.cpp:
##########
@@ -263,17 +372,17 @@ std::optional<HeadObjectResult> S3Wrapper::headObject(const HeadObjectRequestPar
 template<typename ListRequest>
 ListRequest S3Wrapper::createListRequest(const ListRequestParameters& params) {
   ListRequest request;
-  request.SetBucket(params.bucket);
-  request.SetDelimiter(params.delimiter);
-  request.SetPrefix(params.prefix);
+  request.WithBucket(params.bucket)
+    .WithDelimiter(params.delimiter)
+    .WithPrefix(params.prefix);

Review Comment:
   I'm not sure how safe is it long-term to rely on `With*` functions modifying the state in-place. I'd do something like this instead:
   ```c++
   return ListRequest{}
       .WithBucket(params.bucket)
       .WithDelimiter(params.delimiter)
       .WithPrefix(params.prefix);
   ```



##########
extensions/aws/s3/S3Wrapper.h:
##########
@@ -195,31 +198,118 @@ struct ListedObjectAttributes : public minifi::utils::ListedObject {
 using HeadObjectRequestParameters = GetObjectRequestParameters;
 using GetObjectTagsParameters = DeleteObjectRequestParameters;
 
+struct ListMultipartUploadsRequestParameters : public RequestParameters {
+  ListMultipartUploadsRequestParameters(const Aws::Auth::AWSCredentials& creds, const Aws::Client::ClientConfiguration& config)
+    : RequestParameters(creds, config) {}
+  std::string bucket;
+  std::optional<std::chrono::milliseconds> age_off_limit;  // if set, only list the aged off uploads
+  bool use_virtual_addressing = true;
+};
+
+struct MultipartUpload {
+  std::string key;
+  std::string upload_id;
+};
+
+struct AbortMultipartUploadRequestParameters : public RequestParameters {
+  AbortMultipartUploadRequestParameters(const Aws::Auth::AWSCredentials& creds, const Aws::Client::ClientConfiguration& config)
+    : RequestParameters(creds, config) {}
+  std::string bucket;
+  std::string key;
+  std::string upload_id;
+  bool use_virtual_addressing = true;
+};
+
+class StreamReadException : public Exception {
+ public:
+  explicit StreamReadException(const std::string& error) : Exception(GENERAL_EXCEPTION, error) {}
+};
+
 class S3Wrapper {
  public:
+  static constexpr uint64_t BUFFER_SIZE = 4_KiB;
+
   S3Wrapper();
   explicit S3Wrapper(std::unique_ptr<S3RequestSender>&& request_sender);
 
-  std::optional<PutObjectResult> putObject(const PutObjectRequestParameters& put_object_params, const std::shared_ptr<Aws::IOStream>& data_stream);
+  std::optional<PutObjectResult> putObject(const PutObjectRequestParameters& put_object_params, const std::shared_ptr<io::InputStream>& stream, uint64_t flow_size);
+  std::optional<PutObjectResult> putObjectMultipart(const PutObjectRequestParameters& put_object_params, const std::shared_ptr<io::InputStream>& stream, uint64_t flow_size, uint64_t multipart_size);
   bool deleteObject(const DeleteObjectRequestParameters& params);
   std::optional<GetObjectResult> getObject(const GetObjectRequestParameters& get_object_params, io::OutputStream& out_body);
   std::optional<std::vector<ListedObjectAttributes>> listBucket(const ListRequestParameters& params);
   std::optional<std::map<std::string, std::string>> getObjectTags(const GetObjectTagsParameters& params);
   std::optional<HeadObjectResult> headObject(const HeadObjectRequestParameters& head_object_params);
+  std::optional<std::vector<MultipartUpload>> listMultipartUploads(const ListMultipartUploadsRequestParameters& params);
+  bool abortMultipartUpload(const AbortMultipartUploadRequestParameters& params);
+  void ageOffLocalS3MultipartUploadStates(std::chrono::milliseconds multipart_upload_max_age_threshold);
+  void initializeMultipartUploadStateStorage(gsl::not_null<minifi::core::StateManager*> state_manager);
 
   virtual ~S3Wrapper() = default;
 
  private:
+  struct UploadPartsResult {
+    std::string upload_id;
+    std::vector<std::string> part_etags;
+  };
+
   static Expiration getExpiration(const std::string& expiration);
 
-  void setCannedAcl(Aws::S3::Model::PutObjectRequest& request, const std::string& canned_acl) const;
+  template<typename RequestType>
+  void setCannedAcl(RequestType& request, const std::string& canned_acl) const {
+    if (canned_acl.empty()) return;
+
+    const auto it = ranges::find(CANNED_ACL_MAP, canned_acl, [](const auto& kv) { return kv.first; });
+    if (it == CANNED_ACL_MAP.end()) return;
+
+    logger_->log_debug("Setting AWS canned ACL [%s]", canned_acl);
+    request.SetACL(it->second);
+  }
+
+  template<typename RequestType>
+  RequestType createPutObjectRequest(const PutObjectRequestParameters& put_object_params) {
+    RequestType request;
+    request.WithBucket(put_object_params.bucket)

Review Comment:
   ```suggestion
       auto request = RequestType{}.WithBucket(put_object_params.bucket)
   ```



##########
extensions/aws/s3/S3Wrapper.cpp:
##########
@@ -235,8 +344,8 @@ std::optional<std::vector<ListedObjectAttributes>> S3Wrapper::listBucket(const L
 
 std::optional<std::map<std::string, std::string>> S3Wrapper::getObjectTags(const GetObjectTagsParameters& params) {
   Aws::S3::Model::GetObjectTaggingRequest request;
-  request.SetBucket(params.bucket);
-  request.SetKey(params.object_key);
+  request.WithBucket(params.bucket)
+    .WithKey(params.object_key);

Review Comment:
   In this case, I'd write it like that:
   ```c++
   auto request = Aws::S3::Model::GetObjectTaggingRequest{}.WithBucket(params.bucket).WithKey(params.object_key);
   ```
   Or to modify in-place, I'd add self-assignment, just in case it stops returning a modified reference to `*this`, and starts returning a new object:
   ```c++
   request = request.WithBucket(params.bucket);
   ```



##########
extensions/aws/processors/PutS3Object.cpp:
##########
@@ -200,13 +306,15 @@ void PutS3Object::onTrigger(const std::shared_ptr<core::ProcessContext> &context
     return;
   }
 
+  ageOffMultipartUploads(*common_properties);
+
   auto put_s3_request_params = buildPutS3RequestParams(context, flow_file, *common_properties);
   if (!put_s3_request_params) {
     session->transfer(flow_file, Failure);
     return;
   }
 
-  PutS3Object::ReadCallback callback(flow_file->getSize(), *put_s3_request_params, s3_wrapper_);
+  ReadCallback callback(flow_file->getSize(), *put_s3_request_params, s3_wrapper_, multipart_threshold_, multipart_size_, *logger_);

Review Comment:
   You could also inject `this` to keep things simpler, or replace the whole callback class with an inline lambda with `[this, &flow_file, &put_s3_request_params]` capture.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a diff in pull request #1586: MINIFICPP-2127 Add multipart upload support for PutS3Object processor

Posted by "adamdebreceni (via GitHub)" <gi...@apache.org>.
adamdebreceni commented on code in PR #1586:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1586#discussion_r1258108085


##########
extensions/aws/s3/MultipartUploadStateStorage.cpp:
##########
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "MultipartUploadStateStorage.h"
+
+#include "utils/StringUtils.h"
+
+namespace org::apache::nifi::minifi::aws::s3 {
+
+void MultipartUploadStateStorage::storeState(const std::string& bucket, const std::string& key, const MultipartUploadState& state) {
+  std::unordered_map<std::string, std::string> stored_state;
+  state_manager_->get(stored_state);
+  std::string state_key = bucket + "/" + key;
+  stored_state[state_key + ".upload_id"] = state.upload_id;
+  stored_state[state_key + ".upload_time"] = std::to_string(state.upload_time.Millis());
+  stored_state[state_key + ".uploaded_parts"] = std::to_string(state.uploaded_parts);
+  stored_state[state_key + ".uploaded_size"] = std::to_string(state.uploaded_size);
+  stored_state[state_key + ".part_size"] = std::to_string(state.part_size);
+  stored_state[state_key + ".full_size"] = std::to_string(state.full_size);
+  stored_state[state_key + ".uploaded_etags"] = minifi::utils::StringUtils::join(";", state.uploaded_etags);
+  state_manager_->set(stored_state);
+  state_manager_->commit();
+  state_manager_->persist();

Review Comment:
   the `MultipartUploadStateStorage` instance seems to be a processor member, so this could be called by 2 `onTrigger`s simultaneously, the `fetch/modify/store` in most of these member functions needs synchronization 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] lordgamez commented on a diff in pull request #1586: MINIFICPP-2127 Add multipart upload support for PutS3Object processor

Posted by "lordgamez (via GitHub)" <gi...@apache.org>.
lordgamez commented on code in PR #1586:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1586#discussion_r1258202433


##########
extensions/aws/s3/S3Wrapper.cpp:
##########
@@ -75,42 +70,160 @@ std::string S3Wrapper::getEncryptionString(Aws::S3::Model::ServerSideEncryption
   return "";
 }
 
-std::optional<PutObjectResult> S3Wrapper::putObject(const PutObjectRequestParameters& put_object_params, const std::shared_ptr<Aws::IOStream>& data_stream) {
-  Aws::S3::Model::PutObjectRequest request;
-  request.SetBucket(put_object_params.bucket);
-  request.SetKey(put_object_params.object_key);
-  request.SetStorageClass(STORAGE_CLASS_MAP.at(put_object_params.storage_class));
-  request.SetServerSideEncryption(SERVER_SIDE_ENCRYPTION_MAP.at(put_object_params.server_side_encryption));
-  request.SetContentType(put_object_params.content_type);
-  request.SetMetadata(put_object_params.user_metadata_map);
+std::shared_ptr<Aws::StringStream> S3Wrapper::readFlowFileStream(const std::shared_ptr<io::InputStream>& stream, uint64_t read_limit, uint64_t& read_size_out) {
+  std::array<std::byte, BUFFER_SIZE> buffer{};
+  auto data_stream = std::make_shared<Aws::StringStream>();
+  uint64_t read_size = 0;
+  while (read_size < read_limit) {
+    const auto next_read_size = (std::min)(read_limit - read_size, BUFFER_SIZE);
+    const auto read_ret = stream->read(gsl::make_span(buffer).subspan(0, next_read_size));
+    if (io::isError(read_ret)) {
+      throw StreamReadException("Reading flow file inputstream failed!");
+    }
+    if (read_ret > 0) {
+      data_stream->write(reinterpret_cast<char*>(buffer.data()), gsl::narrow<std::streamsize>(read_ret));
+      read_size += read_ret;
+    } else {
+      break;
+    }
+  }
+  read_size_out = read_size;
+  return data_stream;
+}
+
+std::optional<PutObjectResult> S3Wrapper::putObject(const PutObjectRequestParameters& put_object_params, const std::shared_ptr<io::InputStream>& stream, uint64_t flow_size) {
+  uint64_t read_size{};
+  auto data_stream = readFlowFileStream(stream, flow_size, read_size);
+  auto request = createPutObjectRequest<Aws::S3::Model::PutObjectRequest>(put_object_params);
   request.SetBody(data_stream);
-  request.SetGrantFullControl(put_object_params.fullcontrol_user_list);
-  request.SetGrantRead(put_object_params.read_permission_user_list);
-  request.SetGrantReadACP(put_object_params.read_acl_user_list);
-  request.SetGrantWriteACP(put_object_params.write_acl_user_list);
-  setCannedAcl(request, put_object_params.canned_acl);
 
   auto aws_result = request_sender_->sendPutObjectRequest(request, put_object_params.credentials, put_object_params.client_config, put_object_params.use_virtual_addressing);
   if (!aws_result) {
     return std::nullopt;
   }
 
-  PutObjectResult result;
-  // Etags are returned by AWS in quoted form that should be removed
-  result.etag = minifi::utils::StringUtils::removeFramingCharacters(aws_result->GetETag(), '"');
-  result.version = aws_result->GetVersionId();
+  return createPutObjectResult(*aws_result);
+}
 
-  // GetExpiration returns a string pair with a date and a ruleid in 'expiry-date=\"<DATE>\", rule-id=\"<RULEID>\"' format
-  // s3.expiration only needs the date member of this pair
-  result.expiration = getExpiration(aws_result->GetExpiration()).expiration_time;
-  result.ssealgorithm = getEncryptionString(aws_result->GetServerSideEncryption());
+std::optional<S3Wrapper::UploadPartsResult> S3Wrapper::uploadParts(const PutObjectRequestParameters& put_object_params, const std::shared_ptr<io::InputStream>& stream,
+    MultipartUploadState upload_state) {
+  stream->seek(upload_state.uploaded_size);
+  S3Wrapper::UploadPartsResult result;
+  result.upload_id = upload_state.upload_id;
+  result.part_etags = upload_state.uploaded_etags;
+  const auto flow_size = upload_state.full_size - upload_state.uploaded_size;
+  const auto div_ceil = [](size_t n, size_t d) {

Review Comment:
   Thanks, I wasn't aware of that, updated in e5b30fa6df2d0fbd54a85e74dd3ab9995dafe4c9



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a diff in pull request #1586: MINIFICPP-2127 Add multipart upload support for PutS3Object processor

Posted by "adamdebreceni (via GitHub)" <gi...@apache.org>.
adamdebreceni commented on code in PR #1586:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1586#discussion_r1257924453


##########
extensions/aws/s3/S3Wrapper.cpp:
##########
@@ -75,42 +69,160 @@ std::string S3Wrapper::getEncryptionString(Aws::S3::Model::ServerSideEncryption
   return "";
 }
 
-std::optional<PutObjectResult> S3Wrapper::putObject(const PutObjectRequestParameters& put_object_params, const std::shared_ptr<Aws::IOStream>& data_stream) {
-  Aws::S3::Model::PutObjectRequest request;
-  request.SetBucket(put_object_params.bucket);
-  request.SetKey(put_object_params.object_key);
-  request.SetStorageClass(STORAGE_CLASS_MAP.at(put_object_params.storage_class));
-  request.SetServerSideEncryption(SERVER_SIDE_ENCRYPTION_MAP.at(put_object_params.server_side_encryption));
-  request.SetContentType(put_object_params.content_type);
-  request.SetMetadata(put_object_params.user_metadata_map);
+std::shared_ptr<Aws::StringStream> S3Wrapper::readFlowFileStream(const std::shared_ptr<io::InputStream>& stream, uint64_t read_limit, uint64_t& read_size_out) {
+  std::array<std::byte, BUFFER_SIZE> buffer{};
+  auto data_stream = std::make_shared<Aws::StringStream>();
+  uint64_t read_size = 0;
+  while (read_size < read_limit) {
+    const auto next_read_size = (std::min)(read_limit - read_size, BUFFER_SIZE);
+    const auto read_ret = stream->read(gsl::make_span(buffer).subspan(0, next_read_size));
+    if (io::isError(read_ret)) {
+      throw StreamReadException("Reading flow file inputstream failed!");
+    }
+    if (read_ret > 0) {
+      data_stream->write(reinterpret_cast<char*>(buffer.data()), gsl::narrow<std::streamsize>(read_ret));
+      read_size += read_ret;
+    } else {
+      break;
+    }
+  }
+  read_size_out = read_size;
+  return data_stream;
+}
+
+std::optional<PutObjectResult> S3Wrapper::putObject(const PutObjectRequestParameters& put_object_params, const std::shared_ptr<io::InputStream>& stream, uint64_t flow_size) {
+  uint64_t read_size{};
+  auto data_stream = readFlowFileStream(stream, flow_size, read_size);
+  auto request = createPutObjectRequest<Aws::S3::Model::PutObjectRequest>(put_object_params);
   request.SetBody(data_stream);
-  request.SetGrantFullControl(put_object_params.fullcontrol_user_list);
-  request.SetGrantRead(put_object_params.read_permission_user_list);
-  request.SetGrantReadACP(put_object_params.read_acl_user_list);
-  request.SetGrantWriteACP(put_object_params.write_acl_user_list);
-  setCannedAcl(request, put_object_params.canned_acl);
 
   auto aws_result = request_sender_->sendPutObjectRequest(request, put_object_params.credentials, put_object_params.client_config, put_object_params.use_virtual_addressing);
   if (!aws_result) {
     return std::nullopt;
   }
 
-  PutObjectResult result;
-  // Etags are returned by AWS in quoted form that should be removed
-  result.etag = minifi::utils::StringUtils::removeFramingCharacters(aws_result->GetETag(), '"');
-  result.version = aws_result->GetVersionId();
+  return createPutObjectResult(*aws_result);
+}
 
-  // GetExpiration returns a string pair with a date and a ruleid in 'expiry-date=\"<DATE>\", rule-id=\"<RULEID>\"' format
-  // s3.expiration only needs the date member of this pair
-  result.expiration = getExpiration(aws_result->GetExpiration()).expiration_time;
-  result.ssealgorithm = getEncryptionString(aws_result->GetServerSideEncryption());
+std::optional<S3Wrapper::UploadPartsResult> S3Wrapper::uploadParts(const PutObjectRequestParameters& put_object_params, const std::shared_ptr<io::InputStream>& stream,
+    MultipartUploadState upload_state) {
+  stream->seek(upload_state.uploaded_size);
+  S3Wrapper::UploadPartsResult result;
+  result.upload_id = upload_state.upload_id;
+  result.part_etags = upload_state.uploaded_etags;
+  const auto flow_size = upload_state.full_size - upload_state.uploaded_size;
+  const auto div_ceil = [](size_t n, size_t d) {
+    if (n % d == 0)
+      return n / d;
+    else
+      return n / d + 1;
+  };
+  const size_t part_count = div_ceil(flow_size, upload_state.part_size);
+  size_t total_read = 0;
+  const size_t start_part = upload_state.uploaded_parts + 1;
+  const size_t last_part = start_part + part_count - 1;
+  for (size_t part_number = start_part; part_number <= last_part; ++part_number) {
+    uint64_t read_size{};
+    const auto remaining = flow_size - total_read;
+    const auto next_read_size = remaining < upload_state.part_size ? remaining : upload_state.part_size;

Review Comment:
   `std::min`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] szaszm commented on a diff in pull request #1586: MINIFICPP-2127 Add multipart upload support for PutS3Object processor

Posted by "szaszm (via GitHub)" <gi...@apache.org>.
szaszm commented on code in PR #1586:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1586#discussion_r1279305420


##########
extensions/aws/processors/PutS3Object.h:
##########
@@ -26,6 +26,8 @@
 #include <string>
 #include <utility>
 #include <vector>
+#include <chrono>
+#include <atomic>

Review Comment:
   I think `<atomic>` is no longer needed, unless I'm missing something



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] lordgamez commented on a diff in pull request #1586: MINIFICPP-2127 Add multipart upload support for PutS3Object processor

Posted by "lordgamez (via GitHub)" <gi...@apache.org>.
lordgamez commented on code in PR #1586:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1586#discussion_r1234094500


##########
extensions/aws/s3/S3Wrapper.cpp:
##########
@@ -297,4 +404,57 @@ FetchObjectResult S3Wrapper::fillFetchObjectResult(const GetObjectRequestParamet
   return result;
 }
 
+void S3Wrapper::addListMultipartUploadResults(const Aws::Vector<Aws::S3::Model::MultipartUpload>& uploads, std::optional<std::chrono::milliseconds> max_upload_age,
+    std::vector<MultipartUpload>& filtered_uploads) {
+  const auto now = Aws::Utils::DateTime::Now();
+  for (const auto& upload : uploads) {
+    if (max_upload_age && now - upload.GetInitiated() <= *max_upload_age) {
+      logger_->log_debug("Multipart upload with key '%s' and upload id '%s' did not meet the age limit", upload.GetKey(), upload.GetUploadId());
+      continue;
+    }

Review Comment:
   The lister functions list all of the pending uploads by default if the age limit is not set, which is an optional parameter. I wanted to reuse it for the age off lister as well, because that's the only difference. I agree we that the namings should be better in this case I can work on that a bit.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] lordgamez commented on a diff in pull request #1586: MINIFICPP-2127 Add multipart upload support for PutS3Object processor

Posted by "lordgamez (via GitHub)" <gi...@apache.org>.
lordgamez commented on code in PR #1586:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1586#discussion_r1234106980


##########
extensions/aws/s3/S3Wrapper.cpp:
##########
@@ -297,4 +404,57 @@ FetchObjectResult S3Wrapper::fillFetchObjectResult(const GetObjectRequestParamet
   return result;
 }
 
+void S3Wrapper::addListMultipartUploadResults(const Aws::Vector<Aws::S3::Model::MultipartUpload>& uploads, std::optional<std::chrono::milliseconds> max_upload_age,
+    std::vector<MultipartUpload>& filtered_uploads) {
+  const auto now = Aws::Utils::DateTime::Now();
+  for (const auto& upload : uploads) {
+    if (max_upload_age && now - upload.GetInitiated() <= *max_upload_age) {
+      logger_->log_debug("Multipart upload with key '%s' and upload id '%s' did not meet the age limit", upload.GetKey(), upload.GetUploadId());
+      continue;
+    }

Review Comment:
   Updated the variable names and added some comments in 5624af2c3ed7c36ecb55193c739d6845bf26d877



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] fgerlits commented on a diff in pull request #1586: MINIFICPP-2127 Add multipart upload support for PutS3Object processor

Posted by "fgerlits (via GitHub)" <gi...@apache.org>.
fgerlits commented on code in PR #1586:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1586#discussion_r1234048245


##########
docker/test/integration/cluster/checkers/AwsChecker.py:
##########
@@ -29,6 +29,18 @@ def check_s3_server_object_data(self, container_name, test_data):
         (code, file_data) = self.container_communicator.execute_command(container_name, ["cat", s3_mock_dir + "/binaryData"])
         return code == 0 and file_data == test_data
 
+    @retry_check()
+    def check_s3_server_object_hash(self, container_name: str, expected_file_hash: str):
+        (code, output) = self.container_communicator.execute_command(container_name, ["find", "/s3mockroot/test_bucket", "-mindepth", "1", "-maxdepth", "1", "-type", "d"])
+        if code != 0:
+            return False
+        s3_mock_dir = output.strip()
+        (code, md5_output) = self.container_communicator.execute_command(container_name, ["md5sum", s3_mock_dir + "/binaryData"])
+        if code != 0:
+            return False
+        file_hash = md5_output.split(' ')[0].strip()
+        return code == 0 and file_hash == expected_file_hash

Review Comment:
   we could remove the second `code == 0` check now:
   ```suggestion
           if code != 0:
               return False
           file_hash = md5_output.split(' ')[0].strip()
           return file_hash == expected_file_hash
   ```



##########
extensions/aws/s3/MultipartUploadStateStorage.cpp:
##########
@@ -0,0 +1,165 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "MultipartUploadStateStorage.h"
+
+#include <unordered_map>
+
+#include "utils/StringUtils.h"
+
+namespace org::apache::nifi::minifi::aws::s3 {
+
+MultipartUploadStateStorage::MultipartUploadStateStorage(const std::string& state_directory, const std::string& state_id) {
+  if (state_directory.empty()) {
+    char format[] = "/var/tmp/nifi-minifi-cpp.s3-multipart-upload.XXXXXX";
+    state_file_path_ = minifi::utils::file::FileUtils::create_temp_directory(format);
+  } else {
+    state_file_path_ = std::filesystem::path(state_directory) / std::string(state_id + "-s3-multipart-upload-state.properties");
+    if (!std::filesystem::exists(state_file_path_)) {
+      std::filesystem::create_directories(state_file_path_.parent_path());
+      std::ofstream ofs(state_file_path_);

Review Comment:
   it's commented out now; it would be better to delete the line



##########
extensions/aws/s3/S3Wrapper.cpp:
##########
@@ -297,4 +404,57 @@ FetchObjectResult S3Wrapper::fillFetchObjectResult(const GetObjectRequestParamet
   return result;
 }
 
+void S3Wrapper::addListMultipartUploadResults(const Aws::Vector<Aws::S3::Model::MultipartUpload>& uploads, std::optional<std::chrono::milliseconds> max_upload_age,
+    std::vector<MultipartUpload>& filtered_uploads) {
+  const auto now = Aws::Utils::DateTime::Now();
+  for (const auto& upload : uploads) {
+    if (max_upload_age && now - upload.GetInitiated() <= *max_upload_age) {
+      logger_->log_debug("Multipart upload with key '%s' and upload id '%s' did not meet the age limit", upload.GetKey(), upload.GetUploadId());
+      continue;
+    }

Review Comment:
   So `multipartUploadExistsInS3()`, `listMultipartUploads()` and `addListMultipartUploadResults()` only list/check uploads which have aged off?  In that case, the names of the functions, and also local variables like `result` and `filtered_upload` should reflect this, to make the logic clearer.  Also, it would help people reading the code if the log message was something like "... has not aged off yet" instead of the vaguer ".. did not meet the age limit".
   



##########
extensions/aws/tests/PutS3ObjectTests.cpp:
##########
@@ -71,7 +87,7 @@ TEST_CASE_METHOD(PutS3ObjectTestsFixture, "Test AWS credential setting", "[awsCr
     setCredentialsService();
   }
 
-  test_controller.runSession(plan, true);
+  test_controller.runSession(plan);

Review Comment:
   that's weird, but not important



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] szaszm commented on a diff in pull request #1586: MINIFICPP-2127 Add multipart upload support for PutS3Object processor

Posted by "szaszm (via GitHub)" <gi...@apache.org>.
szaszm commented on code in PR #1586:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1586#discussion_r1258127052


##########
extensions/aws/processors/PutS3Object.h:
##########
@@ -97,54 +111,61 @@ class PutS3Object : public S3Processor {
 
   class ReadCallback {
    public:
-    static constexpr uint64_t MAX_SIZE = 5_GiB;
-    static constexpr uint64_t BUFFER_SIZE = 4_KiB;
-
-    ReadCallback(uint64_t flow_size, const minifi::aws::s3::PutObjectRequestParameters& options, aws::s3::S3Wrapper& s3_wrapper)
-      : flow_size_(flow_size)
-      , options_(options)
-      , s3_wrapper_(s3_wrapper) {
+    ReadCallback(uint64_t flow_size, const minifi::aws::s3::PutObjectRequestParameters& options, aws::s3::S3Wrapper& s3_wrapper,
+          uint64_t multipart_threshold, uint64_t multipart_size, core::logging::Logger& logger)
+      : flow_size_(flow_size),
+        options_(options),
+        s3_wrapper_(s3_wrapper),
+        multipart_threshold_(multipart_threshold),
+        multipart_size_(multipart_size),
+        logger_(logger) {
     }
 
     int64_t operator()(const std::shared_ptr<io::InputStream>& stream) {
-      if (flow_size_ > MAX_SIZE) {
-        return -1;
-      }
-      std::vector<std::byte> buffer;
-      buffer.resize(BUFFER_SIZE);
-      auto data_stream = std::make_shared<std::stringstream>();
-      read_size_ = 0;
-      while (read_size_ < flow_size_) {
-        const auto next_read_size = (std::min)(flow_size_ - read_size_, BUFFER_SIZE);
-        const auto read_ret = stream->read(gsl::make_span(buffer).subspan(0, next_read_size));
-        if (io::isError(read_ret)) {
-          return -1;
-        }
-        if (read_ret > 0) {
-          data_stream->write(reinterpret_cast<char*>(buffer.data()), gsl::narrow<std::streamsize>(next_read_size));
-          read_size_ += read_ret;
+      try {
+        if (flow_size_ <= multipart_threshold_) {
+          logger_.log_info("Uploading S3 Object '%s' in a single upload", options_.object_key);

Review Comment:
   I think it would make sense to move the implementation to the .cpp file.



##########
extensions/aws/s3/S3Wrapper.cpp:
##########
@@ -75,42 +70,160 @@ std::string S3Wrapper::getEncryptionString(Aws::S3::Model::ServerSideEncryption
   return "";
 }
 
-std::optional<PutObjectResult> S3Wrapper::putObject(const PutObjectRequestParameters& put_object_params, const std::shared_ptr<Aws::IOStream>& data_stream) {
-  Aws::S3::Model::PutObjectRequest request;
-  request.SetBucket(put_object_params.bucket);
-  request.SetKey(put_object_params.object_key);
-  request.SetStorageClass(STORAGE_CLASS_MAP.at(put_object_params.storage_class));
-  request.SetServerSideEncryption(SERVER_SIDE_ENCRYPTION_MAP.at(put_object_params.server_side_encryption));
-  request.SetContentType(put_object_params.content_type);
-  request.SetMetadata(put_object_params.user_metadata_map);
+std::shared_ptr<Aws::StringStream> S3Wrapper::readFlowFileStream(const std::shared_ptr<io::InputStream>& stream, uint64_t read_limit, uint64_t& read_size_out) {
+  std::array<std::byte, BUFFER_SIZE> buffer{};
+  auto data_stream = std::make_shared<Aws::StringStream>();
+  uint64_t read_size = 0;
+  while (read_size < read_limit) {
+    const auto next_read_size = (std::min)(read_limit - read_size, BUFFER_SIZE);
+    const auto read_ret = stream->read(gsl::make_span(buffer).subspan(0, next_read_size));
+    if (io::isError(read_ret)) {
+      throw StreamReadException("Reading flow file inputstream failed!");
+    }
+    if (read_ret > 0) {
+      data_stream->write(reinterpret_cast<char*>(buffer.data()), gsl::narrow<std::streamsize>(read_ret));
+      read_size += read_ret;
+    } else {
+      break;
+    }
+  }
+  read_size_out = read_size;
+  return data_stream;
+}
+
+std::optional<PutObjectResult> S3Wrapper::putObject(const PutObjectRequestParameters& put_object_params, const std::shared_ptr<io::InputStream>& stream, uint64_t flow_size) {
+  uint64_t read_size{};
+  auto data_stream = readFlowFileStream(stream, flow_size, read_size);
+  auto request = createPutObjectRequest<Aws::S3::Model::PutObjectRequest>(put_object_params);
   request.SetBody(data_stream);
-  request.SetGrantFullControl(put_object_params.fullcontrol_user_list);
-  request.SetGrantRead(put_object_params.read_permission_user_list);
-  request.SetGrantReadACP(put_object_params.read_acl_user_list);
-  request.SetGrantWriteACP(put_object_params.write_acl_user_list);
-  setCannedAcl(request, put_object_params.canned_acl);
 
   auto aws_result = request_sender_->sendPutObjectRequest(request, put_object_params.credentials, put_object_params.client_config, put_object_params.use_virtual_addressing);
   if (!aws_result) {
     return std::nullopt;
   }
 
-  PutObjectResult result;
-  // Etags are returned by AWS in quoted form that should be removed
-  result.etag = minifi::utils::StringUtils::removeFramingCharacters(aws_result->GetETag(), '"');
-  result.version = aws_result->GetVersionId();
+  return createPutObjectResult(*aws_result);
+}
 
-  // GetExpiration returns a string pair with a date and a ruleid in 'expiry-date=\"<DATE>\", rule-id=\"<RULEID>\"' format
-  // s3.expiration only needs the date member of this pair
-  result.expiration = getExpiration(aws_result->GetExpiration()).expiration_time;
-  result.ssealgorithm = getEncryptionString(aws_result->GetServerSideEncryption());
+std::optional<S3Wrapper::UploadPartsResult> S3Wrapper::uploadParts(const PutObjectRequestParameters& put_object_params, const std::shared_ptr<io::InputStream>& stream,
+    MultipartUploadState upload_state) {
+  stream->seek(upload_state.uploaded_size);
+  S3Wrapper::UploadPartsResult result;
+  result.upload_id = upload_state.upload_id;
+  result.part_etags = upload_state.uploaded_etags;
+  const auto flow_size = upload_state.full_size - upload_state.uploaded_size;
+  const auto div_ceil = [](size_t n, size_t d) {

Review Comment:
   This utility already exists in GeneralUtils.h, called `intdiv_ceil`. It's more general, works with both signed and unsigned integer types, but should work the same as this in the `size_t` case.
   
   You may want to double-check that the denominator isn't zero, and handle that case appropriately.



##########
extensions/aws/s3/MultipartUploadStateStorage.cpp:
##########
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "MultipartUploadStateStorage.h"
+
+#include "utils/StringUtils.h"
+
+namespace org::apache::nifi::minifi::aws::s3 {
+
+void MultipartUploadStateStorage::storeState(const std::string& bucket, const std::string& key, const MultipartUploadState& state) {
+  std::unordered_map<std::string, std::string> stored_state;
+  state_manager_->get(stored_state);
+  std::string state_key = bucket + "/" + key;
+  stored_state[state_key + ".upload_id"] = state.upload_id;
+  stored_state[state_key + ".upload_time"] = std::to_string(state.upload_time.Millis());
+  stored_state[state_key + ".uploaded_parts"] = std::to_string(state.uploaded_parts);
+  stored_state[state_key + ".uploaded_size"] = std::to_string(state.uploaded_size);
+  stored_state[state_key + ".part_size"] = std::to_string(state.part_size);
+  stored_state[state_key + ".full_size"] = std::to_string(state.full_size);
+  stored_state[state_key + ".uploaded_etags"] = minifi::utils::StringUtils::join(";", state.uploaded_etags);
+  state_manager_->set(stored_state);
+  state_manager_->commit();
+  state_manager_->persist();

Review Comment:
   What happens if a partial state is committed and persisted, but the flow file is rolled back due to an exception down the line? Do we try to reuse completed part transfers when the flow file is processed a second time after rollback?



##########
libminifi/include/utils/TimeUtil.h:
##########
@@ -189,6 +189,20 @@ inline bool unit_matches<std::chrono::days>(const std::string& unit) {
   return unit == "d" || unit == "day" || unit == "days";
 }
 
+template<>
+inline bool unit_matches<std::chrono::weeks>(const std::string& unit) {
+  return unit == "w" || unit == "week" || unit == "weeks";
+}
+
+template<>
+inline bool unit_matches<std::chrono::months>(const std::string& unit) {
+  return unit == "mon" || unit == "month" || unit == "months";
+}
+
+template<>
+inline bool unit_matches<std::chrono::years>(const std::string& unit) {
+  return unit == "y" || unit == "year" || unit == "years";
+}

Review Comment:
   I would restrict the accepted set of spellings
   - month: "month", "months"
   - year: "year", "years"
   
   For the same reason I detailed in the first comment. We can extend the accepted formats if NiFi ever decides to add these units with alternate spellings.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] lordgamez commented on a diff in pull request #1586: MINIFICPP-2127 Add multipart upload support for PutS3Object processor

Posted by "lordgamez (via GitHub)" <gi...@apache.org>.
lordgamez commented on code in PR #1586:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1586#discussion_r1231548116


##########
extensions/aws/processors/PutS3Object.cpp:
##########
@@ -77,7 +78,31 @@ void PutS3Object::onSchedule(const std::shared_ptr<core::ProcessContext> &contex
     use_virtual_addressing_ = !*use_path_style_access;
   }
 
+  context->getProperty(MultipartThreshold.getName(), multipart_threshold_);
+  if (multipart_threshold_ > getMaxUploadSize() || multipart_threshold_ < getMinPartSize()) {
+    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Multipart Threshold is not between the valid 5MB and 5GB range!");
+  }
+  logger_->log_debug("PutS3Object: Multipart Threshold %" PRIu64, multipart_threshold_);
+  context->getProperty(MultipartPartSize.getName(), multipart_size_);
+  if (multipart_size_ > getMaxUploadSize() || multipart_size_ < getMinPartSize()) {
+    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Multipart Part Size is not between the valid 5MB and 5GB range!");
+  }
+  logger_->log_debug("PutS3Object: Multipart Size %" PRIu64, multipart_size_);
+
+
+  multipart_upload_ageoff_interval_ = minifi::utils::getRequiredPropertyOrThrow<core::TimePeriodValue>(*context, MultipartUploadAgeOffInterval.getName()).getMilliseconds();
+  logger_->log_debug("PutS3Object: Multipart Upload Ageoff Interval %" PRIu64 " ms", multipart_upload_ageoff_interval_.count());
+
+  multipart_upload_max_age_threshold_ = minifi::utils::getRequiredPropertyOrThrow<core::TimePeriodValue>(*context, MultipartUploadMaxAgeThreshold.getName()).getMilliseconds();
+  logger_->log_debug("PutS3Object: Multipart Upload Max Age Threshold %" PRIu64 " ms", multipart_upload_max_age_threshold_.count());
+
   fillUserMetadata(context);
+
+  std::string multipart_temp_dir;
+  context->getProperty(TemporaryDirectoryMultipartState.getName(), multipart_temp_dir);
+
+
+  s3_wrapper_.initailizeMultipartUploadStateStorage(multipart_temp_dir, getUUIDStr());

Review Comment:
   First I implemented it with the state manager, but there was a problem with that. The state manager only commits the state at session commit and if the process is killed or the upload fails and the session is rolled back then the states between the part uploads are lost. Even when I tried to commit the state manager manually it caused an exception at the end of the trigger because the session commit could not commit (or rollback in case of a failure) an already commited session. It may be the same issue in NiFi because they also use a separate temporary directory for this state management.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] lordgamez commented on a diff in pull request #1586: MINIFICPP-2127 Add multipart upload support for PutS3Object processor

Posted by "lordgamez (via GitHub)" <gi...@apache.org>.
lordgamez commented on code in PR #1586:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1586#discussion_r1259278368


##########
extensions/aws/s3/MultipartUploadStateStorage.cpp:
##########
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "MultipartUploadStateStorage.h"
+
+#include "utils/StringUtils.h"
+
+namespace org::apache::nifi::minifi::aws::s3 {
+
+void MultipartUploadStateStorage::storeState(const std::string& bucket, const std::string& key, const MultipartUploadState& state) {
+  std::unordered_map<std::string, std::string> stored_state;
+  state_manager_->get(stored_state);
+  std::string state_key = bucket + "/" + key;
+  stored_state[state_key + ".upload_id"] = state.upload_id;
+  stored_state[state_key + ".upload_time"] = std::to_string(state.upload_time.Millis());
+  stored_state[state_key + ".uploaded_parts"] = std::to_string(state.uploaded_parts);
+  stored_state[state_key + ".uploaded_size"] = std::to_string(state.uploaded_size);
+  stored_state[state_key + ".part_size"] = std::to_string(state.part_size);
+  stored_state[state_key + ".full_size"] = std::to_string(state.full_size);
+  stored_state[state_key + ".uploaded_etags"] = minifi::utils::StringUtils::join(";", state.uploaded_etags);
+  state_manager_->set(stored_state);
+  state_manager_->commit();
+  state_manager_->persist();

Review Comment:
   I would still keep the mutex for the `get` operation as well because even though the `KeyValueStateManager` is transactional it is not thread safe.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] lordgamez commented on a diff in pull request #1586: MINIFICPP-2127 Add multipart upload support for PutS3Object processor

Posted by "lordgamez (via GitHub)" <gi...@apache.org>.
lordgamez commented on code in PR #1586:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1586#discussion_r1258122129


##########
extensions/aws/s3/MultipartUploadStateStorage.cpp:
##########
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "MultipartUploadStateStorage.h"
+
+#include "utils/StringUtils.h"
+
+namespace org::apache::nifi::minifi::aws::s3 {
+
+void MultipartUploadStateStorage::storeState(const std::string& bucket, const std::string& key, const MultipartUploadState& state) {
+  std::unordered_map<std::string, std::string> stored_state;
+  state_manager_->get(stored_state);
+  std::string state_key = bucket + "/" + key;
+  stored_state[state_key + ".upload_id"] = state.upload_id;
+  stored_state[state_key + ".upload_time"] = std::to_string(state.upload_time.Millis());
+  stored_state[state_key + ".uploaded_parts"] = std::to_string(state.uploaded_parts);
+  stored_state[state_key + ".uploaded_size"] = std::to_string(state.uploaded_size);
+  stored_state[state_key + ".part_size"] = std::to_string(state.part_size);
+  stored_state[state_key + ".full_size"] = std::to_string(state.full_size);
+  stored_state[state_key + ".uploaded_etags"] = minifi::utils::StringUtils::join(";", state.uploaded_etags);
+  state_manager_->set(stored_state);
+  state_manager_->commit();
+  state_manager_->persist();

Review Comment:
   Doesn't the state manager take care of the synchronization?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] szaszm closed pull request #1586: MINIFICPP-2127 Add multipart upload support for PutS3Object processor

Posted by "szaszm (via GitHub)" <gi...@apache.org>.
szaszm closed pull request #1586: MINIFICPP-2127 Add multipart upload support for PutS3Object processor
URL: https://github.com/apache/nifi-minifi-cpp/pull/1586


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] szaszm commented on a diff in pull request #1586: MINIFICPP-2127 Add multipart upload support for PutS3Object processor

Posted by "szaszm (via GitHub)" <gi...@apache.org>.
szaszm commented on code in PR #1586:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1586#discussion_r1279299975


##########
extensions/aws/processors/PutS3Object.cpp:
##########
@@ -186,6 +250,48 @@ void PutS3Object::setAttributes(
   }
 }
 
+void PutS3Object::ageOffMultipartUploads(const CommonProperties &common_properties) {
+  const auto now = std::chrono::system_clock::now();
+  if (now - last_ageoff_time_.load() < multipart_upload_ageoff_interval_) {
+    logger_->log_debug("Multipart Upload Age off interval still in progress, not checking obsolete multipart uploads.");
+    return;
+  }
+
+  logger_->log_trace("Listing aged off multipart uploads still in progress.");
+  aws::s3::ListMultipartUploadsRequestParameters list_params(common_properties.credentials, *client_config_);
+  list_params.setClientConfig(common_properties.proxy, common_properties.endpoint_override_url);
+  list_params.bucket = common_properties.bucket;
+  list_params.age_off_limit = multipart_upload_max_age_threshold_;
+  list_params.use_virtual_addressing = use_virtual_addressing_;
+  auto aged_off_uploads_in_progress = s3_wrapper_.listMultipartUploads(list_params);
+  if (!aged_off_uploads_in_progress) {
+    logger_->log_error("Listing aged off multipart uploads failed!");
+    return;
+  }
+
+  logger_->log_info("Found %d aged off pending multipart upload jobs in bucket '%s'", aged_off_uploads_in_progress->size(), common_properties.bucket);
+  size_t aborted = 0;
+  for (const auto& upload : *aged_off_uploads_in_progress) {
+    logger_->log_info("Aborting multipart upload with key '%s' and upload id '%s' in bucket '%s'", upload.key, upload.upload_id, common_properties.bucket);
+    aws::s3::AbortMultipartUploadRequestParameters abort_params(common_properties.credentials, *client_config_);
+    abort_params.setClientConfig(common_properties.proxy, common_properties.endpoint_override_url);
+    abort_params.bucket = common_properties.bucket;
+    abort_params.key = upload.key;
+    abort_params.upload_id = upload.upload_id;
+    abort_params.use_virtual_addressing = use_virtual_addressing_;
+    if (!s3_wrapper_.abortMultipartUpload(abort_params)) {
+       logger_->log_error("Failed to abort multipart upload with key '%s' and upload id '%s' in bucket '%s'", abort_params.key, abort_params.upload_id, abort_params.bucket);
+       continue;
+    }
+    ++aborted;
+  }
+  if (aborted > 0) {
+    logger_->log_info("Aborted %d pending multipart upload jobs in bucket '%s'", aborted, common_properties.bucket);
+  }
+  s3_wrapper_.ageOffLocalS3MultipartUploadStates(multipart_upload_max_age_threshold_);
+  last_ageoff_time_ = now;

Review Comment:
   I didn't do too well on phrasing this comment, but my concern was updating `last_ageoff_time_` at the end, instead of updating early and atomically/locked, then proceeding with the work on the current thread. I think it could more easily happen with the old version that multiple threads start deleting the same aged off uploads. The current version avoids this, even though it doesn't seem critical.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] lordgamez commented on a diff in pull request #1586: MINIFICPP-2127 Add multipart upload support for PutS3Object processor

Posted by "lordgamez (via GitHub)" <gi...@apache.org>.
lordgamez commented on code in PR #1586:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1586#discussion_r1255506187


##########
extensions/aws/processors/PutS3Object.cpp:
##########
@@ -187,6 +208,48 @@ void PutS3Object::setAttributes(
   }
 }
 
+void PutS3Object::ageOffMultipartUploads(const CommonProperties &common_properties) {
+  const auto now = std::chrono::system_clock::now();
+  if (now - last_ageoff_time_ < multipart_upload_ageoff_interval_) {
+    logger_->log_debug("Multipart Upload Age off interval still in progress, not checking obsolete multipart uploads.");
+    return;
+  }
+
+  logger_->log_trace("Listing aged off multipart uploads still in progress.");
+  aws::s3::ListMultipartUploadsRequestParameters list_params(common_properties.credentials, *client_config_);
+  list_params.setClientConfig(common_properties.proxy, common_properties.endpoint_override_url);
+  list_params.bucket = common_properties.bucket;
+  list_params.age_off_limit = multipart_upload_max_age_threshold_;
+  list_params.use_virtual_addressing = use_virtual_addressing_;
+  auto aged_off_uploads_in_progress = s3_wrapper_.listMultipartUploads(list_params);
+  if (!aged_off_uploads_in_progress) {
+    logger_->log_error("Listing aged off multipart uploads failed!");
+    return;
+  }
+
+  logger_->log_info("Found %d aged off pending multipart upload jobs in bucket '%s'", aged_off_uploads_in_progress->size(), common_properties.bucket);
+  size_t aborted = 0;
+  for (const auto& upload : *aged_off_uploads_in_progress) {
+    logger_->log_info("Aborting multipart upload with key '%s' and upload id '%s' in bucket '%s'", upload.key, upload.upload_id, common_properties.bucket);
+    aws::s3::AbortMultipartUploadRequestParameters abort_params(common_properties.credentials, *client_config_);
+    abort_params.setClientConfig(common_properties.proxy, common_properties.endpoint_override_url);
+    abort_params.bucket = common_properties.bucket;
+    abort_params.key = upload.key;
+    abort_params.upload_id = upload.upload_id;
+    abort_params.use_virtual_addressing = use_virtual_addressing_;
+    if (!s3_wrapper_.abortMultipartUpload(abort_params)) {
+       logger_->log_error("Failed to abort multipart upload with key '%s' and upload id '%s' in bucket '%s'", abort_params.key, abort_params.upload_id, abort_params.bucket);
+       continue;
+    }
+    ++aborted;
+  }
+  if (aborted > 0) {
+    logger_->log_info("Aborted %d pending multipart upload jobs in bucket '%s'", aborted, common_properties.bucket);
+  }
+  s3_wrapper_.ageOffLocalS3MultipartUploadStates(multipart_upload_max_age_threshold_);
+  last_ageoff_time_ = now;

Review Comment:
   No we don't, made the member atomic in b9fe9bcc10d1415c320d5289f5e2a5b6d7219638



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] szaszm commented on a diff in pull request #1586: MINIFICPP-2127 Add multipart upload support for PutS3Object processor

Posted by "szaszm (via GitHub)" <gi...@apache.org>.
szaszm commented on code in PR #1586:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1586#discussion_r1231135508


##########
extensions/aws/tests/MultipartUploadStateStorageTest.cpp:
##########
@@ -0,0 +1,141 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "TestBase.h"
+#include "Catch.h"
+#include "s3/MultipartUploadStateStorage.h"
+
+namespace org::apache::nifi::minifi::test {
+
+class MultipartUploadStateStorageTestFixture {
+ public:
+  MultipartUploadStateStorageTestFixture()
+    : upload_storage_(test_controller.createTempDirectory().string(), "test_id") {
+  }
+
+ protected:
+  TestController test_controller;
+  minifi::aws::s3::MultipartUploadStateStorage upload_storage_;
+};
+
+TEST_CASE_METHOD(MultipartUploadStateStorageTestFixture, "Store and get current key state", "[s3StateStorage]") {
+  REQUIRE(upload_storage_.getState("test_bucket", "key") == std::nullopt);
+  minifi::aws::s3::MultipartUploadState state;
+  state.upload_id = "id1";
+  state.uploaded_parts = 2;
+  state.uploaded_size = 100_MiB;
+  state.part_size = 50_MiB;
+  state.full_size = 200_MiB;
+  state.upload_time = Aws::Utils::DateTime::CurrentTimeMillis();
+  state.uploaded_etags = {"etag1", "etag2"};
+  upload_storage_.storeState("test_bucket", "key", state);
+  REQUIRE(*upload_storage_.getState("test_bucket", "key") == state);
+}
+
+TEST_CASE_METHOD(MultipartUploadStateStorageTestFixture, "Get key upload state from multiple keys and buckets", "[s3StateStorage]") {
+  minifi::aws::s3::MultipartUploadState state1;
+  state1.upload_id = "id1";
+  state1.uploaded_parts = 3;
+  state1.uploaded_size = 150_MiB;
+  state1.part_size = 50_MiB;
+  state1.full_size = 200_MiB;
+  state1.upload_time = Aws::Utils::DateTime::CurrentTimeMillis();
+  state1.uploaded_etags = {"etag1", "etag2", "etag3"};
+  upload_storage_.storeState("old_bucket", "key1", state1);

Review Comment:
   Why did you use `old_bucket` here? Isn't this meant to test that storing states under the same bucket and key overwrites the old one?



##########
extensions/aws/processors/PutS3Object.cpp:
##########
@@ -77,7 +78,31 @@ void PutS3Object::onSchedule(const std::shared_ptr<core::ProcessContext> &contex
     use_virtual_addressing_ = !*use_path_style_access;
   }
 
+  context->getProperty(MultipartThreshold.getName(), multipart_threshold_);
+  if (multipart_threshold_ > getMaxUploadSize() || multipart_threshold_ < getMinPartSize()) {
+    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Multipart Threshold is not between the valid 5MB and 5GB range!");
+  }
+  logger_->log_debug("PutS3Object: Multipart Threshold %" PRIu64, multipart_threshold_);
+  context->getProperty(MultipartPartSize.getName(), multipart_size_);
+  if (multipart_size_ > getMaxUploadSize() || multipart_size_ < getMinPartSize()) {
+    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Multipart Part Size is not between the valid 5MB and 5GB range!");
+  }
+  logger_->log_debug("PutS3Object: Multipart Size %" PRIu64, multipart_size_);
+
+
+  multipart_upload_ageoff_interval_ = minifi::utils::getRequiredPropertyOrThrow<core::TimePeriodValue>(*context, MultipartUploadAgeOffInterval.getName()).getMilliseconds();
+  logger_->log_debug("PutS3Object: Multipart Upload Ageoff Interval %" PRIu64 " ms", multipart_upload_ageoff_interval_.count());
+
+  multipart_upload_max_age_threshold_ = minifi::utils::getRequiredPropertyOrThrow<core::TimePeriodValue>(*context, MultipartUploadMaxAgeThreshold.getName()).getMilliseconds();
+  logger_->log_debug("PutS3Object: Multipart Upload Max Age Threshold %" PRIu64 " ms", multipart_upload_max_age_threshold_.count());
+

Review Comment:
   We should wrap in int64_t and use PRId64. See tick count representation of standard types here: https://en.cppreference.com/w/cpp/chrono/duration
   ```suggestion
     logger_->log_debug("PutS3Object: Multipart Upload Ageoff Interval %" PRId64 " ms", int64_t{multipart_upload_ageoff_interval_.count()});
   
     multipart_upload_max_age_threshold_ = minifi::utils::getRequiredPropertyOrThrow<core::TimePeriodValue>(*context, MultipartUploadMaxAgeThreshold.getName()).getMilliseconds();
     logger_->log_debug("PutS3Object: Multipart Upload Max Age Threshold %" PRId64 " ms", int64_t{multipart_upload_max_age_threshold_.count()});
   
   ```



##########
extensions/aws/processors/PutS3Object.h:
##########
@@ -81,7 +96,7 @@ class PutS3Object : public S3Processor {
   EXTENSIONAPI static constexpr bool SupportsDynamicProperties = true;
   EXTENSIONAPI static constexpr bool SupportsDynamicRelationships = false;
   EXTENSIONAPI static constexpr core::annotation::Input InputRequirement = core::annotation::Input::INPUT_REQUIRED;
-  EXTENSIONAPI static constexpr bool IsSingleThreaded = false;
+  EXTENSIONAPI static constexpr bool IsSingleThreaded = true;

Review Comment:
   Can we keep multi-threaded operation?



##########
extensions/aws/processors/PutS3Object.cpp:
##########
@@ -77,7 +78,31 @@ void PutS3Object::onSchedule(const std::shared_ptr<core::ProcessContext> &contex
     use_virtual_addressing_ = !*use_path_style_access;
   }
 
+  context->getProperty(MultipartThreshold.getName(), multipart_threshold_);
+  if (multipart_threshold_ > getMaxUploadSize() || multipart_threshold_ < getMinPartSize()) {
+    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Multipart Threshold is not between the valid 5MB and 5GB range!");
+  }
+  logger_->log_debug("PutS3Object: Multipart Threshold %" PRIu64, multipart_threshold_);
+  context->getProperty(MultipartPartSize.getName(), multipart_size_);
+  if (multipart_size_ > getMaxUploadSize() || multipart_size_ < getMinPartSize()) {
+    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Multipart Part Size is not between the valid 5MB and 5GB range!");
+  }
+  logger_->log_debug("PutS3Object: Multipart Size %" PRIu64, multipart_size_);
+
+
+  multipart_upload_ageoff_interval_ = minifi::utils::getRequiredPropertyOrThrow<core::TimePeriodValue>(*context, MultipartUploadAgeOffInterval.getName()).getMilliseconds();
+  logger_->log_debug("PutS3Object: Multipart Upload Ageoff Interval %" PRIu64 " ms", multipart_upload_ageoff_interval_.count());
+
+  multipart_upload_max_age_threshold_ = minifi::utils::getRequiredPropertyOrThrow<core::TimePeriodValue>(*context, MultipartUploadMaxAgeThreshold.getName()).getMilliseconds();
+  logger_->log_debug("PutS3Object: Multipart Upload Max Age Threshold %" PRIu64 " ms", multipart_upload_max_age_threshold_.count());
+
   fillUserMetadata(context);
+
+  std::string multipart_temp_dir;
+  context->getProperty(TemporaryDirectoryMultipartState.getName(), multipart_temp_dir);
+
+
+  s3_wrapper_.initailizeMultipartUploadStateStorage(multipart_temp_dir, getUUIDStr());

Review Comment:
   Why do we need a temp dir? We have standard state management with multiple backends.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] lordgamez commented on a diff in pull request #1586: MINIFICPP-2127 Add multipart upload support for PutS3Object processor

Posted by "lordgamez (via GitHub)" <gi...@apache.org>.
lordgamez commented on code in PR #1586:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1586#discussion_r1258152157


##########
extensions/aws/s3/MultipartUploadStateStorage.cpp:
##########
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "MultipartUploadStateStorage.h"
+
+#include "utils/StringUtils.h"
+
+namespace org::apache::nifi::minifi::aws::s3 {
+
+void MultipartUploadStateStorage::storeState(const std::string& bucket, const std::string& key, const MultipartUploadState& state) {
+  std::unordered_map<std::string, std::string> stored_state;
+  state_manager_->get(stored_state);
+  std::string state_key = bucket + "/" + key;
+  stored_state[state_key + ".upload_id"] = state.upload_id;
+  stored_state[state_key + ".upload_time"] = std::to_string(state.upload_time.Millis());
+  stored_state[state_key + ".uploaded_parts"] = std::to_string(state.uploaded_parts);
+  stored_state[state_key + ".uploaded_size"] = std::to_string(state.uploaded_size);
+  stored_state[state_key + ".part_size"] = std::to_string(state.part_size);
+  stored_state[state_key + ".full_size"] = std::to_string(state.full_size);
+  stored_state[state_key + ".uploaded_etags"] = minifi::utils::StringUtils::join(";", state.uploaded_etags);
+  state_manager_->set(stored_state);
+  state_manager_->commit();
+  state_manager_->persist();

Review Comment:
   For some reason I thought it is taken care of by the state manager. Added synchronization in 84f9245914ed9cead9574f29d7c7d9842aa59f5b



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] lordgamez commented on a diff in pull request #1586: MINIFICPP-2127 Add multipart upload support for PutS3Object processor

Posted by "lordgamez (via GitHub)" <gi...@apache.org>.
lordgamez commented on code in PR #1586:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1586#discussion_r1258201957


##########
extensions/aws/processors/PutS3Object.h:
##########
@@ -97,54 +111,61 @@ class PutS3Object : public S3Processor {
 
   class ReadCallback {
    public:
-    static constexpr uint64_t MAX_SIZE = 5_GiB;
-    static constexpr uint64_t BUFFER_SIZE = 4_KiB;
-
-    ReadCallback(uint64_t flow_size, const minifi::aws::s3::PutObjectRequestParameters& options, aws::s3::S3Wrapper& s3_wrapper)
-      : flow_size_(flow_size)
-      , options_(options)
-      , s3_wrapper_(s3_wrapper) {
+    ReadCallback(uint64_t flow_size, const minifi::aws::s3::PutObjectRequestParameters& options, aws::s3::S3Wrapper& s3_wrapper,
+          uint64_t multipart_threshold, uint64_t multipart_size, core::logging::Logger& logger)
+      : flow_size_(flow_size),
+        options_(options),
+        s3_wrapper_(s3_wrapper),
+        multipart_threshold_(multipart_threshold),
+        multipart_size_(multipart_size),
+        logger_(logger) {
     }
 
     int64_t operator()(const std::shared_ptr<io::InputStream>& stream) {
-      if (flow_size_ > MAX_SIZE) {
-        return -1;
-      }
-      std::vector<std::byte> buffer;
-      buffer.resize(BUFFER_SIZE);
-      auto data_stream = std::make_shared<std::stringstream>();
-      read_size_ = 0;
-      while (read_size_ < flow_size_) {
-        const auto next_read_size = (std::min)(flow_size_ - read_size_, BUFFER_SIZE);
-        const auto read_ret = stream->read(gsl::make_span(buffer).subspan(0, next_read_size));
-        if (io::isError(read_ret)) {
-          return -1;
-        }
-        if (read_ret > 0) {
-          data_stream->write(reinterpret_cast<char*>(buffer.data()), gsl::narrow<std::streamsize>(next_read_size));
-          read_size_ += read_ret;
+      try {
+        if (flow_size_ <= multipart_threshold_) {
+          logger_.log_info("Uploading S3 Object '%s' in a single upload", options_.object_key);

Review Comment:
   Updated in e5b30fa6df2d0fbd54a85e74dd3ab9995dafe4c9



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] lordgamez commented on a diff in pull request #1586: MINIFICPP-2127 Add multipart upload support for PutS3Object processor

Posted by "lordgamez (via GitHub)" <gi...@apache.org>.
lordgamez commented on code in PR #1586:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1586#discussion_r1231973493


##########
extensions/aws/tests/MultipartUploadStateStorageTest.cpp:
##########
@@ -0,0 +1,141 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "TestBase.h"
+#include "Catch.h"
+#include "s3/MultipartUploadStateStorage.h"
+
+namespace org::apache::nifi::minifi::test {
+
+class MultipartUploadStateStorageTestFixture {
+ public:
+  MultipartUploadStateStorageTestFixture()
+    : upload_storage_(test_controller.createTempDirectory().string(), "test_id") {
+  }
+
+ protected:
+  TestController test_controller;
+  minifi::aws::s3::MultipartUploadStateStorage upload_storage_;
+};
+
+TEST_CASE_METHOD(MultipartUploadStateStorageTestFixture, "Store and get current key state", "[s3StateStorage]") {
+  REQUIRE(upload_storage_.getState("test_bucket", "key") == std::nullopt);
+  minifi::aws::s3::MultipartUploadState state;
+  state.upload_id = "id1";
+  state.uploaded_parts = 2;
+  state.uploaded_size = 100_MiB;
+  state.part_size = 50_MiB;
+  state.full_size = 200_MiB;
+  state.upload_time = Aws::Utils::DateTime::CurrentTimeMillis();
+  state.uploaded_etags = {"etag1", "etag2"};
+  upload_storage_.storeState("test_bucket", "key", state);
+  REQUIRE(*upload_storage_.getState("test_bucket", "key") == state);
+}
+
+TEST_CASE_METHOD(MultipartUploadStateStorageTestFixture, "Get key upload state from multiple keys and buckets", "[s3StateStorage]") {
+  minifi::aws::s3::MultipartUploadState state1;
+  state1.upload_id = "id1";
+  state1.uploaded_parts = 3;
+  state1.uploaded_size = 150_MiB;
+  state1.part_size = 50_MiB;
+  state1.full_size = 200_MiB;
+  state1.upload_time = Aws::Utils::DateTime::CurrentTimeMillis();
+  state1.uploaded_etags = {"etag1", "etag2", "etag3"};
+  upload_storage_.storeState("old_bucket", "key1", state1);

Review Comment:
   You are right, I updated the test with the overwrite, but also kept the different bucket and key id state to check if those are not overwritten in a63cb4a856a9ed576f994dc849c57a29a8044ae0



##########
extensions/aws/processors/PutS3Object.cpp:
##########
@@ -77,7 +78,31 @@ void PutS3Object::onSchedule(const std::shared_ptr<core::ProcessContext> &contex
     use_virtual_addressing_ = !*use_path_style_access;
   }
 
+  context->getProperty(MultipartThreshold.getName(), multipart_threshold_);
+  if (multipart_threshold_ > getMaxUploadSize() || multipart_threshold_ < getMinPartSize()) {
+    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Multipart Threshold is not between the valid 5MB and 5GB range!");
+  }
+  logger_->log_debug("PutS3Object: Multipart Threshold %" PRIu64, multipart_threshold_);
+  context->getProperty(MultipartPartSize.getName(), multipart_size_);
+  if (multipart_size_ > getMaxUploadSize() || multipart_size_ < getMinPartSize()) {
+    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Multipart Part Size is not between the valid 5MB and 5GB range!");
+  }
+  logger_->log_debug("PutS3Object: Multipart Size %" PRIu64, multipart_size_);
+
+
+  multipart_upload_ageoff_interval_ = minifi::utils::getRequiredPropertyOrThrow<core::TimePeriodValue>(*context, MultipartUploadAgeOffInterval.getName()).getMilliseconds();
+  logger_->log_debug("PutS3Object: Multipart Upload Ageoff Interval %" PRIu64 " ms", multipart_upload_ageoff_interval_.count());
+
+  multipart_upload_max_age_threshold_ = minifi::utils::getRequiredPropertyOrThrow<core::TimePeriodValue>(*context, MultipartUploadMaxAgeThreshold.getName()).getMilliseconds();
+  logger_->log_debug("PutS3Object: Multipart Upload Max Age Threshold %" PRIu64 " ms", multipart_upload_max_age_threshold_.count());
+

Review Comment:
   Thanks, updated in a63cb4a856a9ed576f994dc849c57a29a8044ae0



##########
extensions/aws/processors/PutS3Object.h:
##########
@@ -81,7 +96,7 @@ class PutS3Object : public S3Processor {
   EXTENSIONAPI static constexpr bool SupportsDynamicProperties = true;
   EXTENSIONAPI static constexpr bool SupportsDynamicRelationships = false;
   EXTENSIONAPI static constexpr core::annotation::Input InputRequirement = core::annotation::Input::INPUT_REQUIRED;
-  EXTENSIONAPI static constexpr bool IsSingleThreaded = false;
+  EXTENSIONAPI static constexpr bool IsSingleThreaded = true;

Review Comment:
   Good catch, I forgot to revert this after discussing it, updated in a63cb4a856a9ed576f994dc849c57a29a8044ae0



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] lordgamez commented on a diff in pull request #1586: MINIFICPP-2127 Add multipart upload support for PutS3Object processor

Posted by "lordgamez (via GitHub)" <gi...@apache.org>.
lordgamez commented on code in PR #1586:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1586#discussion_r1233691783


##########
extensions/aws/processors/PutS3Object.cpp:
##########
@@ -77,7 +78,31 @@ void PutS3Object::onSchedule(const std::shared_ptr<core::ProcessContext> &contex
     use_virtual_addressing_ = !*use_path_style_access;
   }
 
+  context->getProperty(MultipartThreshold.getName(), multipart_threshold_);
+  if (multipart_threshold_ > getMaxUploadSize() || multipart_threshold_ < getMinPartSize()) {
+    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Multipart Threshold is not between the valid 5MB and 5GB range!");
+  }
+  logger_->log_debug("PutS3Object: Multipart Threshold %" PRIu64, multipart_threshold_);
+  context->getProperty(MultipartPartSize.getName(), multipart_size_);
+  if (multipart_size_ > getMaxUploadSize() || multipart_size_ < getMinPartSize()) {
+    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Multipart Part Size is not between the valid 5MB and 5GB range!");
+  }
+  logger_->log_debug("PutS3Object: Multipart Size %" PRIu64, multipart_size_);
+
+
+  multipart_upload_ageoff_interval_ = minifi::utils::getRequiredPropertyOrThrow<core::TimePeriodValue>(*context, MultipartUploadAgeOffInterval.getName()).getMilliseconds();
+  logger_->log_debug("PutS3Object: Multipart Upload Ageoff Interval %" PRIu64 " ms", multipart_upload_ageoff_interval_.count());
+
+  multipart_upload_max_age_threshold_ = minifi::utils::getRequiredPropertyOrThrow<core::TimePeriodValue>(*context, MultipartUploadMaxAgeThreshold.getName()).getMilliseconds();
+  logger_->log_debug("PutS3Object: Multipart Upload Max Age Threshold %" PRIu64 " ms", multipart_upload_max_age_threshold_.count());
+
   fillUserMetadata(context);
+
+  std::string multipart_temp_dir;
+  context->getProperty(TemporaryDirectoryMultipartState.getName(), multipart_temp_dir);
+
+
+  s3_wrapper_.initailizeMultipartUploadStateStorage(multipart_temp_dir, getUUIDStr());

Review Comment:
   I would use MINIFI_HOME subdir similarly to `nifi.python.processor.dir` and other config properties.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] lordgamez commented on a diff in pull request #1586: MINIFICPP-2127 Add multipart upload support for PutS3Object processor

Posted by "lordgamez (via GitHub)" <gi...@apache.org>.
lordgamez commented on code in PR #1586:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1586#discussion_r1234249400


##########
extensions/aws/processors/PutS3Object.cpp:
##########
@@ -77,7 +78,31 @@ void PutS3Object::onSchedule(const std::shared_ptr<core::ProcessContext> &contex
     use_virtual_addressing_ = !*use_path_style_access;
   }
 
+  context->getProperty(MultipartThreshold.getName(), multipart_threshold_);
+  if (multipart_threshold_ > getMaxUploadSize() || multipart_threshold_ < getMinPartSize()) {
+    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Multipart Threshold is not between the valid 5MB and 5GB range!");
+  }
+  logger_->log_debug("PutS3Object: Multipart Threshold %" PRIu64, multipart_threshold_);
+  context->getProperty(MultipartPartSize.getName(), multipart_size_);
+  if (multipart_size_ > getMaxUploadSize() || multipart_size_ < getMinPartSize()) {
+    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Multipart Part Size is not between the valid 5MB and 5GB range!");
+  }
+  logger_->log_debug("PutS3Object: Multipart Size %" PRIu64, multipart_size_);
+
+
+  multipart_upload_ageoff_interval_ = minifi::utils::getRequiredPropertyOrThrow<core::TimePeriodValue>(*context, MultipartUploadAgeOffInterval.getName()).getMilliseconds();
+  logger_->log_debug("PutS3Object: Multipart Upload Ageoff Interval %" PRIu64 " ms", multipart_upload_ageoff_interval_.count());
+
+  multipart_upload_max_age_threshold_ = minifi::utils::getRequiredPropertyOrThrow<core::TimePeriodValue>(*context, MultipartUploadMaxAgeThreshold.getName()).getMilliseconds();
+  logger_->log_debug("PutS3Object: Multipart Upload Max Age Threshold %" PRIu64 " ms", multipart_upload_max_age_threshold_.count());
+
   fillUserMetadata(context);
+
+  std::string multipart_temp_dir;
+  context->getProperty(TemporaryDirectoryMultipartState.getName(), multipart_temp_dir);
+
+
+  s3_wrapper_.initailizeMultipartUploadStateStorage(multipart_temp_dir, getUUIDStr());

Review Comment:
   Added a commit with this change: ea2548e618b539facccf226fbc417c6d46fe71ee



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] szaszm commented on a diff in pull request #1586: MINIFICPP-2127 Add multipart upload support for PutS3Object processor

Posted by "szaszm (via GitHub)" <gi...@apache.org>.
szaszm commented on code in PR #1586:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1586#discussion_r1232586633


##########
extensions/aws/processors/PutS3Object.cpp:
##########
@@ -77,7 +78,31 @@ void PutS3Object::onSchedule(const std::shared_ptr<core::ProcessContext> &contex
     use_virtual_addressing_ = !*use_path_style_access;
   }
 
+  context->getProperty(MultipartThreshold.getName(), multipart_threshold_);
+  if (multipart_threshold_ > getMaxUploadSize() || multipart_threshold_ < getMinPartSize()) {
+    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Multipart Threshold is not between the valid 5MB and 5GB range!");
+  }
+  logger_->log_debug("PutS3Object: Multipart Threshold %" PRIu64, multipart_threshold_);
+  context->getProperty(MultipartPartSize.getName(), multipart_size_);
+  if (multipart_size_ > getMaxUploadSize() || multipart_size_ < getMinPartSize()) {
+    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Multipart Part Size is not between the valid 5MB and 5GB range!");
+  }
+  logger_->log_debug("PutS3Object: Multipart Size %" PRIu64, multipart_size_);
+
+
+  multipart_upload_ageoff_interval_ = minifi::utils::getRequiredPropertyOrThrow<core::TimePeriodValue>(*context, MultipartUploadAgeOffInterval.getName()).getMilliseconds();
+  logger_->log_debug("PutS3Object: Multipart Upload Ageoff Interval %" PRIu64 " ms", multipart_upload_ageoff_interval_.count());
+
+  multipart_upload_max_age_threshold_ = minifi::utils::getRequiredPropertyOrThrow<core::TimePeriodValue>(*context, MultipartUploadMaxAgeThreshold.getName()).getMilliseconds();
+  logger_->log_debug("PutS3Object: Multipart Upload Max Age Threshold %" PRIu64 " ms", multipart_upload_max_age_threshold_.count());
+
   fillUserMetadata(context);
+
+  std::string multipart_temp_dir;
+  context->getProperty(TemporaryDirectoryMultipartState.getName(), multipart_temp_dir);
+
+
+  s3_wrapper_.initailizeMultipartUploadStateStorage(multipart_temp_dir, getUUIDStr());

Review Comment:
   What would you default to, when it's not configured? OS tempdir, or MINIFI_HOME subdir?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] szaszm commented on a diff in pull request #1586: MINIFICPP-2127 Add multipart upload support for PutS3Object processor

Posted by "szaszm (via GitHub)" <gi...@apache.org>.
szaszm commented on code in PR #1586:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1586#discussion_r1232540073


##########
extensions/aws/s3/S3Wrapper.cpp:
##########
@@ -75,42 +69,155 @@ std::string S3Wrapper::getEncryptionString(Aws::S3::Model::ServerSideEncryption
   return "";
 }
 
-std::optional<PutObjectResult> S3Wrapper::putObject(const PutObjectRequestParameters& put_object_params, const std::shared_ptr<Aws::IOStream>& data_stream) {
-  Aws::S3::Model::PutObjectRequest request;
-  request.SetBucket(put_object_params.bucket);
-  request.SetKey(put_object_params.object_key);
-  request.SetStorageClass(STORAGE_CLASS_MAP.at(put_object_params.storage_class));
-  request.SetServerSideEncryption(SERVER_SIDE_ENCRYPTION_MAP.at(put_object_params.server_side_encryption));
-  request.SetContentType(put_object_params.content_type);
-  request.SetMetadata(put_object_params.user_metadata_map);
+std::shared_ptr<Aws::StringStream> S3Wrapper::readFlowFileStream(const std::shared_ptr<io::InputStream>& stream, uint64_t read_limit, uint64_t& read_size_out) {
+  std::vector<std::byte> buffer;
+  buffer.resize(BUFFER_SIZE);

Review Comment:
   With a sufficiently large buffer size, this risks stack overflow. Not sure about the stack sizes in practice, but I'd only expect this to be a problem on embedded devices with the current 4K buffer size.
   Since we're doing IO in this function, the overhead of an extra allocation is probably insignificant compared to any IO overhead. But there is no harm in changing it. I'm neutral on this, just wanted to share some thoughts and add nuance.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] lordgamez commented on a diff in pull request #1586: MINIFICPP-2127 Add multipart upload support for PutS3Object processor

Posted by "lordgamez (via GitHub)" <gi...@apache.org>.
lordgamez commented on code in PR #1586:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1586#discussion_r1234106443


##########
docker/test/integration/cluster/checkers/AwsChecker.py:
##########
@@ -29,6 +29,18 @@ def check_s3_server_object_data(self, container_name, test_data):
         (code, file_data) = self.container_communicator.execute_command(container_name, ["cat", s3_mock_dir + "/binaryData"])
         return code == 0 and file_data == test_data
 
+    @retry_check()
+    def check_s3_server_object_hash(self, container_name: str, expected_file_hash: str):
+        (code, output) = self.container_communicator.execute_command(container_name, ["find", "/s3mockroot/test_bucket", "-mindepth", "1", "-maxdepth", "1", "-type", "d"])
+        if code != 0:
+            return False
+        s3_mock_dir = output.strip()
+        (code, md5_output) = self.container_communicator.execute_command(container_name, ["md5sum", s3_mock_dir + "/binaryData"])
+        if code != 0:
+            return False
+        file_hash = md5_output.split(' ')[0].strip()
+        return code == 0 and file_hash == expected_file_hash

Review Comment:
   Removed in 5624af2c3ed7c36ecb55193c739d6845bf26d877



##########
extensions/aws/s3/MultipartUploadStateStorage.cpp:
##########
@@ -0,0 +1,165 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "MultipartUploadStateStorage.h"
+
+#include <unordered_map>
+
+#include "utils/StringUtils.h"
+
+namespace org::apache::nifi::minifi::aws::s3 {
+
+MultipartUploadStateStorage::MultipartUploadStateStorage(const std::string& state_directory, const std::string& state_id) {
+  if (state_directory.empty()) {
+    char format[] = "/var/tmp/nifi-minifi-cpp.s3-multipart-upload.XXXXXX";
+    state_file_path_ = minifi::utils::file::FileUtils::create_temp_directory(format);
+  } else {
+    state_file_path_ = std::filesystem::path(state_directory) / std::string(state_id + "-s3-multipart-upload-state.properties");
+    if (!std::filesystem::exists(state_file_path_)) {
+      std::filesystem::create_directories(state_file_path_.parent_path());
+      std::ofstream ofs(state_file_path_);

Review Comment:
   My mistake, removed in 5624af2c3ed7c36ecb55193c739d6845bf26d877



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] lordgamez commented on a diff in pull request #1586: MINIFICPP-2127 Add multipart upload support for PutS3Object processor

Posted by "lordgamez (via GitHub)" <gi...@apache.org>.
lordgamez commented on code in PR #1586:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1586#discussion_r1258204617


##########
libminifi/include/utils/TimeUtil.h:
##########
@@ -189,6 +189,20 @@ inline bool unit_matches<std::chrono::days>(const std::string& unit) {
   return unit == "d" || unit == "day" || unit == "days";
 }
 
+template<>
+inline bool unit_matches<std::chrono::weeks>(const std::string& unit) {
+  return unit == "w" || unit == "week" || unit == "weeks";
+}
+
+template<>
+inline bool unit_matches<std::chrono::months>(const std::string& unit) {
+  return unit == "mon" || unit == "month" || unit == "months";
+}
+
+template<>
+inline bool unit_matches<std::chrono::years>(const std::string& unit) {
+  return unit == "y" || unit == "year" || unit == "years";
+}

Review Comment:
   Makes sense, I added it due to @fgerlits 's request, but I reverted in e5b30fa6df2d0fbd54a85e74dd3ab9995dafe4c9. I kept only the `y` for year, I think that is a simple and trivial format for it, that should be okay.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] szaszm commented on a diff in pull request #1586: MINIFICPP-2127 Add multipart upload support for PutS3Object processor

Posted by "szaszm (via GitHub)" <gi...@apache.org>.
szaszm commented on code in PR #1586:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1586#discussion_r1258328212


##########
extensions/aws/processors/PutS3Object.h:
##########
@@ -97,54 +111,46 @@ class PutS3Object : public S3Processor {
 
   class ReadCallback {
    public:
-    static constexpr uint64_t MAX_SIZE = 5_GiB;
-    static constexpr uint64_t BUFFER_SIZE = 4_KiB;
-
-    ReadCallback(uint64_t flow_size, const minifi::aws::s3::PutObjectRequestParameters& options, aws::s3::S3Wrapper& s3_wrapper)
-      : flow_size_(flow_size)
-      , options_(options)
-      , s3_wrapper_(s3_wrapper) {
+    ReadCallback(uint64_t flow_size, const minifi::aws::s3::PutObjectRequestParameters& options, aws::s3::S3Wrapper& s3_wrapper,
+          uint64_t multipart_threshold, uint64_t multipart_size, core::logging::Logger& logger)

Review Comment:
   I think the class definition could be moved to the .cpp file, too. I don't think it's used anywhere outside this processor.



##########
extensions/aws/s3/S3Wrapper.cpp:
##########
@@ -75,42 +71,158 @@ std::string S3Wrapper::getEncryptionString(Aws::S3::Model::ServerSideEncryption
   return "";
 }
 
-std::optional<PutObjectResult> S3Wrapper::putObject(const PutObjectRequestParameters& put_object_params, const std::shared_ptr<Aws::IOStream>& data_stream) {
-  Aws::S3::Model::PutObjectRequest request;
-  request.SetBucket(put_object_params.bucket);
-  request.SetKey(put_object_params.object_key);
-  request.SetStorageClass(STORAGE_CLASS_MAP.at(put_object_params.storage_class));
-  request.SetServerSideEncryption(SERVER_SIDE_ENCRYPTION_MAP.at(put_object_params.server_side_encryption));
-  request.SetContentType(put_object_params.content_type);
-  request.SetMetadata(put_object_params.user_metadata_map);
+std::shared_ptr<Aws::StringStream> S3Wrapper::readFlowFileStream(const std::shared_ptr<io::InputStream>& stream, uint64_t read_limit, uint64_t& read_size_out) {

Review Comment:
   We could return the StringStream by value with no downside as far as I can see.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a diff in pull request #1586: MINIFICPP-2127 Add multipart upload support for PutS3Object processor

Posted by "adamdebreceni (via GitHub)" <gi...@apache.org>.
adamdebreceni commented on code in PR #1586:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1586#discussion_r1255489940


##########
extensions/aws/processors/PutS3Object.cpp:
##########
@@ -187,6 +208,48 @@ void PutS3Object::setAttributes(
   }
 }
 
+void PutS3Object::ageOffMultipartUploads(const CommonProperties &common_properties) {
+  const auto now = std::chrono::system_clock::now();
+  if (now - last_ageoff_time_ < multipart_upload_ageoff_interval_) {
+    logger_->log_debug("Multipart Upload Age off interval still in progress, not checking obsolete multipart uploads.");
+    return;
+  }
+
+  logger_->log_trace("Listing aged off multipart uploads still in progress.");
+  aws::s3::ListMultipartUploadsRequestParameters list_params(common_properties.credentials, *client_config_);
+  list_params.setClientConfig(common_properties.proxy, common_properties.endpoint_override_url);
+  list_params.bucket = common_properties.bucket;
+  list_params.age_off_limit = multipart_upload_max_age_threshold_;
+  list_params.use_virtual_addressing = use_virtual_addressing_;
+  auto aged_off_uploads_in_progress = s3_wrapper_.listMultipartUploads(list_params);
+  if (!aged_off_uploads_in_progress) {
+    logger_->log_error("Listing aged off multipart uploads failed!");
+    return;
+  }
+
+  logger_->log_info("Found %d aged off pending multipart upload jobs in bucket '%s'", aged_off_uploads_in_progress->size(), common_properties.bucket);
+  size_t aborted = 0;
+  for (const auto& upload : *aged_off_uploads_in_progress) {
+    logger_->log_info("Aborting multipart upload with key '%s' and upload id '%s' in bucket '%s'", upload.key, upload.upload_id, common_properties.bucket);
+    aws::s3::AbortMultipartUploadRequestParameters abort_params(common_properties.credentials, *client_config_);
+    abort_params.setClientConfig(common_properties.proxy, common_properties.endpoint_override_url);
+    abort_params.bucket = common_properties.bucket;
+    abort_params.key = upload.key;
+    abort_params.upload_id = upload.upload_id;
+    abort_params.use_virtual_addressing = use_virtual_addressing_;
+    if (!s3_wrapper_.abortMultipartUpload(abort_params)) {
+       logger_->log_error("Failed to abort multipart upload with key '%s' and upload id '%s' in bucket '%s'", abort_params.key, abort_params.upload_id, abort_params.bucket);
+       continue;
+    }
+    ++aborted;
+  }
+  if (aborted > 0) {
+    logger_->log_info("Aborted %d pending multipart upload jobs in bucket '%s'", aborted, common_properties.bucket);
+  }
+  s3_wrapper_.ageOffLocalS3MultipartUploadStates(multipart_upload_max_age_threshold_);
+  last_ageoff_time_ = now;

Review Comment:
   `IsSingleThreaded is false for this processor, do we somewhere synchronize the access to `last_ageoff_time_`?



##########
extensions/aws/processors/PutS3Object.cpp:
##########
@@ -187,6 +208,48 @@ void PutS3Object::setAttributes(
   }
 }
 
+void PutS3Object::ageOffMultipartUploads(const CommonProperties &common_properties) {
+  const auto now = std::chrono::system_clock::now();
+  if (now - last_ageoff_time_ < multipart_upload_ageoff_interval_) {
+    logger_->log_debug("Multipart Upload Age off interval still in progress, not checking obsolete multipart uploads.");
+    return;
+  }
+
+  logger_->log_trace("Listing aged off multipart uploads still in progress.");
+  aws::s3::ListMultipartUploadsRequestParameters list_params(common_properties.credentials, *client_config_);
+  list_params.setClientConfig(common_properties.proxy, common_properties.endpoint_override_url);
+  list_params.bucket = common_properties.bucket;
+  list_params.age_off_limit = multipart_upload_max_age_threshold_;
+  list_params.use_virtual_addressing = use_virtual_addressing_;
+  auto aged_off_uploads_in_progress = s3_wrapper_.listMultipartUploads(list_params);
+  if (!aged_off_uploads_in_progress) {
+    logger_->log_error("Listing aged off multipart uploads failed!");
+    return;
+  }
+
+  logger_->log_info("Found %d aged off pending multipart upload jobs in bucket '%s'", aged_off_uploads_in_progress->size(), common_properties.bucket);
+  size_t aborted = 0;
+  for (const auto& upload : *aged_off_uploads_in_progress) {
+    logger_->log_info("Aborting multipart upload with key '%s' and upload id '%s' in bucket '%s'", upload.key, upload.upload_id, common_properties.bucket);
+    aws::s3::AbortMultipartUploadRequestParameters abort_params(common_properties.credentials, *client_config_);
+    abort_params.setClientConfig(common_properties.proxy, common_properties.endpoint_override_url);
+    abort_params.bucket = common_properties.bucket;
+    abort_params.key = upload.key;
+    abort_params.upload_id = upload.upload_id;
+    abort_params.use_virtual_addressing = use_virtual_addressing_;
+    if (!s3_wrapper_.abortMultipartUpload(abort_params)) {
+       logger_->log_error("Failed to abort multipart upload with key '%s' and upload id '%s' in bucket '%s'", abort_params.key, abort_params.upload_id, abort_params.bucket);
+       continue;
+    }
+    ++aborted;
+  }
+  if (aborted > 0) {
+    logger_->log_info("Aborted %d pending multipart upload jobs in bucket '%s'", aborted, common_properties.bucket);
+  }
+  s3_wrapper_.ageOffLocalS3MultipartUploadStates(multipart_upload_max_age_threshold_);
+  last_ageoff_time_ = now;

Review Comment:
   `IsSingleThreaded` is false for this processor, do we somewhere synchronize the access to `last_ageoff_time_`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] szaszm commented on a diff in pull request #1586: MINIFICPP-2127 Add multipart upload support for PutS3Object processor

Posted by "szaszm (via GitHub)" <gi...@apache.org>.
szaszm commented on code in PR #1586:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1586#discussion_r1223088139


##########
libminifi/include/utils/TimeUtil.h:
##########
@@ -189,6 +189,20 @@ inline bool unit_matches<std::chrono::days>(const std::string& unit) {
   return unit == "d" || unit == "day" || unit == "days";
 }
 
+template<>
+inline bool unit_matches<std::chrono::weeks>(const std::string& unit) {
+  return unit == "w" || unit == "week" || unit == "weeks";
+}
+
+template<>
+inline bool unit_matches<std::chrono::months>(const std::string& unit) {
+  return unit == "mon" || unit == "month" || unit == "months";
+}
+
+template<>
+inline bool unit_matches<std::chrono::years>(const std::string& unit) {
+  return unit == "y" || unit == "year" || unit == "years";
+}

Review Comment:
   I'd remove "mon" and "mons", at least until they appear in NiFi. They're not clear/common enough IMO, and they're one typo away from "min"/"mins". 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] lordgamez commented on a diff in pull request #1586: MINIFICPP-2127 Add multipart upload support for PutS3Object processor

Posted by "lordgamez (via GitHub)" <gi...@apache.org>.
lordgamez commented on code in PR #1586:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1586#discussion_r1223080276


##########
libminifi/include/utils/TimeUtil.h:
##########
@@ -189,6 +189,20 @@ inline bool unit_matches<std::chrono::days>(const std::string& unit) {
   return unit == "d" || unit == "day" || unit == "days";
 }
 
+template<>
+inline bool unit_matches<std::chrono::weeks>(const std::string& unit) {
+  return unit == "w" || unit == "week" || unit == "weeks";
+}
+
+template<>
+inline bool unit_matches<std::chrono::months>(const std::string& unit) {
+  return unit == "mon" || unit == "month" || unit == "months";
+}
+
+template<>
+inline bool unit_matches<std::chrono::years>(const std::string& unit) {
+  return unit == "y" || unit == "year" || unit == "years";
+}

Review Comment:
   Thanks, added additional formats in 2e8735fd94374c769c062ceae656383e547171dd



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi-minifi-cpp] szaszm commented on a diff in pull request #1586: MINIFICPP-2127 Add multipart upload support for PutS3Object processor

Posted by "szaszm (via GitHub)" <gi...@apache.org>.
szaszm commented on code in PR #1586:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1586#discussion_r1222009549


##########
libminifi/include/utils/TimeUtil.h:
##########
@@ -189,6 +189,20 @@ inline bool unit_matches<std::chrono::days>(const std::string& unit) {
   return unit == "d" || unit == "day" || unit == "days";
 }
 
+template<>
+inline bool unit_matches<std::chrono::weeks>(const std::string& unit) {
+  return unit == "w" || unit == "week" || unit == "weeks";
+}
+
+template<>
+inline bool unit_matches<std::chrono::months>(const std::string& unit) {
+  return unit == "mon" || unit == "month" || unit == "months";
+}
+
+template<>
+inline bool unit_matches<std::chrono::years>(const std::string& unit) {
+  return unit == "y" || unit == "year" || unit == "years";
+}

Review Comment:
   nifi equivalent: https://github.com/apache/nifi/blob/main/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/FormatUtils.java#L45
   
   We should try to match that for maximum compatibility. I don't really mind if we accept/support more, but if a unit is supported, then we shouldn't accept less. And if NiFi ever decides to use other suffixes for the things that are not there yet (microsec/mon/year), then it becomes an awkward backwards compatibility vs NiFi compatibility question.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org