You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by GitBox <gi...@apache.org> on 2021/01/19 10:59:41 UTC

[GitHub] [nifi-minifi-cpp] lordgamez commented on a change in pull request #975: MINIFICPP-1400 Create ListS3 processor

lordgamez commented on a change in pull request #975:
URL: https://github.com/apache/nifi-minifi-cpp/pull/975#discussion_r560091816



##########
File path: extensions/aws/s3/S3Wrapper.cpp
##########
@@ -30,46 +37,253 @@ namespace minifi {
 namespace aws {
 namespace s3 {
 
-minifi::utils::optional<Aws::S3::Model::PutObjectResult> S3Wrapper::sendPutObjectRequest(const Aws::S3::Model::PutObjectRequest& request) {
-  Aws::S3::S3Client s3_client(credentials_, client_config_);
-  auto outcome = s3_client.PutObject(request);
+void HeadObjectResult::setFilePaths(const std::string& key) {
+  absolute_path = key;
+  std::tie(path, filename) = minifi::utils::file::FileUtils::split_path(key, true /*force_posix*/);
+}
+
+S3Wrapper::S3Wrapper() : request_sender_(minifi::utils::make_unique<S3ClientRequestSender>()) {
+}
+
+S3Wrapper::S3Wrapper(std::unique_ptr<S3RequestSender> request_sender) : request_sender_(std::move(request_sender)) {
+}
+
+void S3Wrapper::setCredentials(const Aws::Auth::AWSCredentials& cred) {
+  request_sender_->setCredentials(cred);
+}
+
+void S3Wrapper::setRegion(const Aws::String& region) {
+  request_sender_->setRegion(region);
+}
+
+void S3Wrapper::setTimeout(uint64_t timeout) {
+  request_sender_->setTimeout(timeout);
+}
+
+void S3Wrapper::setEndpointOverrideUrl(const Aws::String& url) {
+  request_sender_->setEndpointOverrideUrl(url);
+}
+
+void S3Wrapper::setProxy(const ProxyOptions& proxy) {
+  request_sender_->setProxy(proxy);
+}
+
+void S3Wrapper::setCannedAcl(Aws::S3::Model::PutObjectRequest& request, const std::string& canned_acl) const {
+  if (canned_acl.empty() || CANNED_ACL_MAP.find(canned_acl) == CANNED_ACL_MAP.end())
+    return;
+
+  logger_->log_debug("Setting AWS canned ACL [%s]", canned_acl);
+  request.SetACL(CANNED_ACL_MAP.at(canned_acl));
+}
+
+Expiration S3Wrapper::getExpiration(const std::string& expiration) {
+  minifi::utils::Regex expr("expiry-date=\"(.*)\", rule-id=\"(.*)\"");
+  const auto match = expr.match(expiration);
+  const auto& results = expr.getResult();
+  if (!match || results.size() < 3)
+    return Expiration{};
+  return Expiration{results[1], results[2]};
+}
+
+std::string S3Wrapper::getEncryptionString(Aws::S3::Model::ServerSideEncryption encryption) {
+  if (encryption == Aws::S3::Model::ServerSideEncryption::NOT_SET) {
+    return "";
+  }
+
+  auto it = std::find_if(SERVER_SIDE_ENCRYPTION_MAP.begin(), SERVER_SIDE_ENCRYPTION_MAP.end(),
+    [&](const std::pair<std::string, const Aws::S3::Model::ServerSideEncryption&> pair) {
+      return pair.second == encryption;
+    });
+  if (it != SERVER_SIDE_ENCRYPTION_MAP.end()) {
+    return it->first;
+  }
+  return "";
+}
+
+minifi::utils::optional<PutObjectResult> S3Wrapper::putObject(const PutObjectRequestParameters& put_object_params, std::shared_ptr<Aws::IOStream> data_stream) {
+  Aws::S3::Model::PutObjectRequest request;
+  request.SetBucket(put_object_params.bucket);
+  request.SetKey(put_object_params.object_key);
+  request.SetStorageClass(STORAGE_CLASS_MAP.at(put_object_params.storage_class));
+  request.SetServerSideEncryption(SERVER_SIDE_ENCRYPTION_MAP.at(put_object_params.server_side_encryption));
+  request.SetContentType(put_object_params.content_type);
+  request.SetMetadata(put_object_params.user_metadata_map);
+  request.SetBody(data_stream);
+  request.SetGrantFullControl(put_object_params.fullcontrol_user_list);
+  request.SetGrantRead(put_object_params.read_permission_user_list);
+  request.SetGrantReadACP(put_object_params.read_acl_user_list);
+  request.SetGrantWriteACP(put_object_params.write_acl_user_list);
+  setCannedAcl(request, put_object_params.canned_acl);
+
+  auto aws_result = request_sender_->sendPutObjectRequest(request);
+  if (!aws_result) {
+    return minifi::utils::nullopt;
+  }
+
+  PutObjectResult result;
+  // Etags are returned by AWS in quoted form that should be removed
+  result.etag = minifi::utils::StringUtils::removeFramingCharacters(aws_result->GetETag(), '"');
+  result.version = aws_result->GetVersionId();
+
+  // GetExpiration returns a string pair with a date and a ruleid in 'expiry-date=\"<DATE>\", rule-id=\"<RULEID>\"' format
+  // s3.expiration only needs the date member of this pair
+  result.expiration = getExpiration(aws_result->GetExpiration()).expiration_time;
+  result.ssealgorithm = getEncryptionString(aws_result->GetServerSideEncryption());
+  return result;
+}
+
+bool S3Wrapper::deleteObject(const std::string& bucket, const std::string& object_key, const std::string& version) {
+  Aws::S3::Model::DeleteObjectRequest request;
+  request.SetBucket(bucket);
+  request.SetKey(object_key);
+  if (!version.empty()) {
+    request.SetVersionId(version);
+  }
+  return request_sender_->sendDeleteObjectRequest(request);
+}
+
+int64_t S3Wrapper::writeFetchedBody(Aws::IOStream& source, const int64_t data_size, const std::shared_ptr<io::BaseStream>& output) {
+  static const uint64_t BUFFER_SIZE = 4096;
+  std::vector<uint8_t> buffer;
+  buffer.reserve(BUFFER_SIZE);
 
-  if (outcome.IsSuccess()) {
-      logger_->log_info("Added S3 object '%s' to bucket '%s'", request.GetKey(), request.GetBucket());
-      return outcome.GetResultWithOwnership();
-  } else {
-    logger_->log_error("PutS3Object failed with the following: '%s'", outcome.GetError().GetMessage());
+  int64_t write_size = 0;
+  while (write_size < data_size) {
+    auto next_write_size = data_size - write_size < BUFFER_SIZE ? data_size - write_size : BUFFER_SIZE;
+    if (!source.read(reinterpret_cast<char*>(buffer.data()), next_write_size)) {
+      return -1;
+    }
+    auto ret = output->write(buffer.data(), next_write_size);
+    if (ret < 0) {
+      return ret;
+    }
+    write_size += next_write_size;
+  }
+  return write_size;
+}
+
+minifi::utils::optional<GetObjectResult> S3Wrapper::getObject(const GetObjectRequestParameters& get_object_params, const std::shared_ptr<io::BaseStream>& out_body) {
+  auto request = createFetchObjectRequest<Aws::S3::Model::GetObjectRequest>(get_object_params);
+  auto aws_result = request_sender_->sendGetObjectRequest(request);
+  if (!aws_result) {
     return minifi::utils::nullopt;
   }
+  auto result = fillFetchObjectResult<Aws::S3::Model::GetObjectResult, GetObjectResult>(get_object_params, aws_result.value());
+  result.write_size = writeFetchedBody(aws_result->GetBody(), aws_result->GetContentLength(), out_body);
+  return result;
+}
+
+void S3Wrapper::addListResults(const Aws::Vector<Aws::S3::Model::ObjectVersion>& content, const uint64_t min_object_age, std::vector<ListedObjectAttributes>& listed_objects) {
+  for (const auto& version : content) {
+    if (last_bucket_list_timestamp_ - min_object_age < version.GetLastModified().Millis()) {
+      logger_->log_debug("Object version '%s' of key '%s' skipped due to minimum object age filter", version.GetVersionId(), version.GetKey());
+      continue;
+    }
+
+    ListedObjectAttributes attributes;
+    attributes.etag = minifi::utils::StringUtils::removeFramingCharacters(version.GetETag(), '"');
+    attributes.filename = version.GetKey();
+    attributes.is_latest = version.GetIsLatest();
+    attributes.last_modified = version.GetLastModified().Millis();
+    attributes.length = version.GetSize();
+    attributes.store_class = VERSION_STORAGE_CLASS_MAP.at(version.GetStorageClass());
+    attributes.version = version.GetVersionId();
+    listed_objects.push_back(attributes);
+  }
 }
 
-bool S3Wrapper::sendDeleteObjectRequest(const Aws::S3::Model::DeleteObjectRequest& request) {
-  Aws::S3::S3Client s3_client(credentials_, client_config_);
-  Aws::S3::Model::DeleteObjectOutcome outcome = s3_client.DeleteObject(request);
+void S3Wrapper::addListResults(const Aws::Vector<Aws::S3::Model::Object>& content, const uint64_t min_object_age, std::vector<ListedObjectAttributes>& listed_objects) {
+  for (const auto& object : content) {
+    if (last_bucket_list_timestamp_ - min_object_age < object.GetLastModified().Millis()) {
+      logger_->log_debug("Object with key '%s' skipped due to minimum object age filter", object.GetKey());
+      continue;
+    }
 
-  if (outcome.IsSuccess()) {
-    logger_->log_info("Deleted S3 object '%s' from bucket '%s'", request.GetKey(), request.GetBucket());
-    return true;
-  } else if (outcome.GetError().GetErrorType() == Aws::S3::S3Errors::NO_SUCH_KEY) {
-    logger_->log_info("S3 object '%s' was not found in bucket '%s'", request.GetKey(), request.GetBucket());
-    return true;
-  } else {
-    logger_->log_error("DeleteS3Object failed with the following: '%s'", outcome.GetError().GetMessage());
-    return false;
+    ListedObjectAttributes attributes;
+    attributes.etag = minifi::utils::StringUtils::removeFramingCharacters(object.GetETag(), '"');
+    attributes.filename = object.GetKey();
+    attributes.is_latest = true;
+    attributes.last_modified = object.GetLastModified().Millis();
+    attributes.length = object.GetSize();
+    attributes.store_class = OBJECT_STORAGE_CLASS_MAP.at(object.GetStorageClass());
+    listed_objects.push_back(attributes);
   }
 }
 
-minifi::utils::optional<Aws::S3::Model::GetObjectResult> S3Wrapper::sendGetObjectRequest(const Aws::S3::Model::GetObjectRequest& request) {
-  Aws::S3::S3Client s3_client(credentials_, client_config_);
-  auto outcome = s3_client.GetObject(request);
+minifi::utils::optional<std::vector<ListedObjectAttributes>> S3Wrapper::listVersions(const ListRequestParameters& params) {
+  auto request = createListRequest<Aws::S3::Model::ListObjectVersionsRequest>(params);
+  std::vector<ListedObjectAttributes> attribute_list;
+  nonstd::optional_lite::optional<Aws::S3::Model::ListObjectVersionsResult> aws_result;
+  do {
+    aws_result = request_sender_->sendListVersionsRequest(request);
+    if (!aws_result) {
+      return minifi::utils::nullopt;
+    }
+    const auto& versions = aws_result->GetVersions();
+    logger_->log_debug("AWS S3 List operation returned %d versions. This result is truncated: %s", versions.size(), aws_result->GetIsTruncated() ? "true" : "false");

Review comment:
       Fixed in [d1af764](https://github.com/apache/nifi-minifi-cpp/pull/975/commits/d1af764a93439b099a8bee23f6d397772eefc0ff)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org