You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by "morningman (via GitHub)" <gi...@apache.org> on 2023/04/21 06:12:21 UTC

[GitHub] [doris] morningman commented on a diff in pull request #17585: [feature](io) enable s3 file writer with multi part uploading concurrently

morningman commented on code in PR #17585:
URL: https://github.com/apache/doris/pull/17585#discussion_r1173326302


##########
be/src/common/config.h:
##########
@@ -941,6 +941,9 @@ CONF_mInt64(max_tablet_io_errors, "-1");
 
 // Page size of row column, default 4KB
 CONF_mInt64(row_column_page_size, "4096");
+// it must be larger than or equal to 5MB
+CONF_mInt32(s3_write_buffer_size, "5242880");
+CONF_mInt32(s3_write_buffer_whole_size, "524288000");

Review Comment:
   what is `s3_write_buffer_whole_size` means? please add comment in code



##########
be/src/io/fs/s3_file_system.cpp:
##########
@@ -98,7 +98,9 @@ Status S3FileSystem::connect_impl() {
 
 Status S3FileSystem::create_file_impl(const Path& file, FileWriterPtr* writer) {
     GET_KEY(key, file);
-    *writer = std::make_unique<S3FileWriter>(key, get_client(), _s3_conf, getSPtr());
+    auto fs_path = Path(_s3_conf.endpoint) / _s3_conf.bucket / key;

Review Comment:
   Why changing this behavior?



##########
be/src/io/fs/s3_file_writer.cpp:
##########
@@ -17,221 +17,232 @@
 
 #include "io/fs/s3_file_writer.h"
 
-#include <aws/core/Aws.h>
 #include <aws/core/utils/HashingUtils.h>
+#include <aws/core/utils/memory/stl/AWSStreamFwd.h>
+#include <aws/core/utils/memory/stl/AWSStringStream.h>
 #include <aws/s3/S3Client.h>
 #include <aws/s3/model/AbortMultipartUploadRequest.h>
 #include <aws/s3/model/CompleteMultipartUploadRequest.h>
+#include <aws/s3/model/CompletedPart.h>
 #include <aws/s3/model/CreateMultipartUploadRequest.h>
-#include <aws/s3/model/DeleteObjectRequest.h>
-#include <aws/s3/model/DeleteObjectsRequest.h>
-#include <aws/s3/model/GetObjectRequest.h>
 #include <aws/s3/model/UploadPartRequest.h>
-#include <fmt/core.h>
-#include <sys/uio.h>
 
-#include <cerrno>
+#include <atomic>
+#include <memory>
+#include <utility>
 
-#include "common/compiler_util.h"
+#include "common/logging.h"
 #include "common/status.h"
-#include "gutil/macros.h"
+#include "io/fs/file_system.h"
 #include "io/fs/file_writer.h"
-#include "io/fs/path.h"
 #include "io/fs/s3_file_system.h"
-#include "util/doris_metrics.h"
-
-using Aws::S3::Model::AbortMultipartUploadRequest;
-using Aws::S3::Model::CompletedPart;
-using Aws::S3::Model::CompletedMultipartUpload;
-using Aws::S3::Model::CompleteMultipartUploadRequest;
-using Aws::S3::Model::CreateMultipartUploadRequest;
-using Aws::S3::Model::DeleteObjectRequest;
-using Aws::S3::Model::UploadPartRequest;
-using Aws::S3::Model::UploadPartOutcome;
+#include "io/fs/s3_file_write_bufferpool.h"
+#include "util/runtime_profile.h"
+#include "util/s3_util.h"
 
 namespace doris {
 namespace io {
+using namespace Aws::S3::Model;
+using Aws::S3::S3Client;
 
-// max size of each part when uploading: 5MB
-static const int MAX_SIZE_EACH_PART = 5 * 1024 * 1024;
-static const char* STREAM_TAG = "S3FileWriter";
-
-S3FileWriter::S3FileWriter(Path path, std::shared_ptr<Aws::S3::S3Client> client,
+S3FileWriter::S3FileWriter(Path&& path, std::string&& key, std::shared_ptr<S3Client> client,
                            const S3Conf& s3_conf, FileSystemSPtr fs)
-        : FileWriter(std::move(path), fs), _client(client), _s3_conf(s3_conf) {
-    DorisMetrics::instance()->s3_file_open_writing->increment(1);
-    DorisMetrics::instance()->s3_file_writer_total->increment(1);
-}
+        : FileWriter(std::move(path), std::move(fs)),
+          _bucket(s3_conf.bucket),
+          _key(std::move(key)),
+          _client(std::move(client)) {}
 
 S3FileWriter::~S3FileWriter() {
-    if (_opened) {
-        close();
+    if (!_closed) {
+        abort();
     }
     CHECK(!_opened || _closed) << "open: " << _opened << ", closed: " << _closed;
 }
 
-Status S3FileWriter::close() {
-    return _close();
+Status S3FileWriter::open() {
+    VLOG_DEBUG << "S3FileWriter::open, path: " << _path.native();
+    CreateMultipartUploadRequest create_request;
+    create_request.WithBucket(_bucket).WithKey(_key);
+    create_request.SetContentType("text/plain");
+
+    auto outcome = _client->CreateMultipartUpload(create_request);
+
+    if (outcome.IsSuccess()) {
+        _upload_id = outcome.GetResult().GetUploadId();
+        _closed = false;
+        _opened = true;
+        return Status::OK();
+    }
+    return Status::IOError("failed to create multipart upload(bucket={}, key={}, upload_id={}): {}",
+                           _bucket, _path.native(), _upload_id, outcome.GetError().GetMessage());
 }
 
 Status S3FileWriter::abort() {
+    _failed = true;
+    if (_closed || !_opened) {
+        return Status::OK();
+    }
+    VLOG_DEBUG << "S3FileWriter::abort, path: " << _path.native();
+    _closed = true;
+    _wait.wait();

Review Comment:
   Check the return value of `wait()`



##########
be/src/io/fs/s3_file_writer.h:
##########
@@ -33,38 +34,93 @@ class S3Client;
 
 namespace doris {
 namespace io {
+struct S3FileBuffer;
 
 class S3FileWriter final : public FileWriter {
 public:
-    S3FileWriter(Path path, std::shared_ptr<Aws::S3::S3Client> client, const S3Conf& s3_conf,
-                 FileSystemSPtr fs);
+    S3FileWriter(Path&& path, std::string&& key, std::shared_ptr<Aws::S3::S3Client> client,
+                 const S3Conf& s3_conf, FileSystemSPtr fs);
     ~S3FileWriter() override;
 
+    Status open();
+
     Status close() override;
+
     Status abort() override;
     Status appendv(const Slice* data, size_t data_cnt) override;
     Status finalize() override;
     Status write_at(size_t offset, const Slice& data) override {
         return Status::NotSupported("not support");
     }
 
-private:
-    Status _close();
-    Status _open();
-    Status _upload_part();
-    void _reset_stream();
+    size_t bytes_appended() const { return _bytes_appended; }
+
+    int64_t upload_cost_ms() const { return *_upload_cost_ms; }
 
 private:
+    class WaitGroup {
+    public:
+        WaitGroup() = default;
+
+        ~WaitGroup() = default;
+
+        WaitGroup(const WaitGroup&) = delete;
+        WaitGroup(WaitGroup&&) = delete;
+        void operator=(const WaitGroup&) = delete;
+        void operator=(WaitGroup&&) = delete;
+        // add one counter indicating one more concurrent worker
+        void add(int count = 1) { _count += count; }
+
+        // decrese count if one concurrent worker finished it's work

Review Comment:
   ```suggestion
           // decrease count if one concurrent worker finished it's work
   ```



##########
be/src/io/fs/s3_file_writer.h:
##########
@@ -33,38 +34,93 @@ class S3Client;
 
 namespace doris {
 namespace io {
+struct S3FileBuffer;
 
 class S3FileWriter final : public FileWriter {
 public:
-    S3FileWriter(Path path, std::shared_ptr<Aws::S3::S3Client> client, const S3Conf& s3_conf,
-                 FileSystemSPtr fs);
+    S3FileWriter(Path&& path, std::string&& key, std::shared_ptr<Aws::S3::S3Client> client,
+                 const S3Conf& s3_conf, FileSystemSPtr fs);
     ~S3FileWriter() override;
 
+    Status open();
+
     Status close() override;
+
     Status abort() override;
     Status appendv(const Slice* data, size_t data_cnt) override;
     Status finalize() override;
     Status write_at(size_t offset, const Slice& data) override {
         return Status::NotSupported("not support");
     }
 
-private:
-    Status _close();
-    Status _open();
-    Status _upload_part();
-    void _reset_stream();
+    size_t bytes_appended() const { return _bytes_appended; }
+
+    int64_t upload_cost_ms() const { return *_upload_cost_ms; }
 
 private:
+    class WaitGroup {
+    public:
+        WaitGroup() = default;
+
+        ~WaitGroup() = default;
+
+        WaitGroup(const WaitGroup&) = delete;
+        WaitGroup(WaitGroup&&) = delete;
+        void operator=(const WaitGroup&) = delete;
+        void operator=(WaitGroup&&) = delete;
+        // add one counter indicating one more concurrent worker
+        void add(int count = 1) { _count += count; }
+
+        // decrese count if one concurrent worker finished it's work
+        void done() {
+            _count--;
+            if (_count.load() <= 0) {
+                _cv.notify_all();
+            }
+        }
+
+        // wait for all concurrent workers finish their work
+        // would return if timeout, default timeout would be 5min
+        void wait(int64_t timeout_seconds = 300) {

Review Comment:
   need a return value? Or we don't know it is finished or not



##########
be/src/io/fs/s3_file_writer.cpp:
##########
@@ -17,221 +17,232 @@
 
 #include "io/fs/s3_file_writer.h"
 
-#include <aws/core/Aws.h>
 #include <aws/core/utils/HashingUtils.h>
+#include <aws/core/utils/memory/stl/AWSStreamFwd.h>
+#include <aws/core/utils/memory/stl/AWSStringStream.h>
 #include <aws/s3/S3Client.h>
 #include <aws/s3/model/AbortMultipartUploadRequest.h>
 #include <aws/s3/model/CompleteMultipartUploadRequest.h>
+#include <aws/s3/model/CompletedPart.h>
 #include <aws/s3/model/CreateMultipartUploadRequest.h>
-#include <aws/s3/model/DeleteObjectRequest.h>
-#include <aws/s3/model/DeleteObjectsRequest.h>
-#include <aws/s3/model/GetObjectRequest.h>
 #include <aws/s3/model/UploadPartRequest.h>
-#include <fmt/core.h>
-#include <sys/uio.h>
 
-#include <cerrno>
+#include <atomic>
+#include <memory>
+#include <utility>
 
-#include "common/compiler_util.h"
+#include "common/logging.h"
 #include "common/status.h"
-#include "gutil/macros.h"
+#include "io/fs/file_system.h"
 #include "io/fs/file_writer.h"
-#include "io/fs/path.h"
 #include "io/fs/s3_file_system.h"
-#include "util/doris_metrics.h"
-
-using Aws::S3::Model::AbortMultipartUploadRequest;
-using Aws::S3::Model::CompletedPart;
-using Aws::S3::Model::CompletedMultipartUpload;
-using Aws::S3::Model::CompleteMultipartUploadRequest;
-using Aws::S3::Model::CreateMultipartUploadRequest;
-using Aws::S3::Model::DeleteObjectRequest;
-using Aws::S3::Model::UploadPartRequest;
-using Aws::S3::Model::UploadPartOutcome;
+#include "io/fs/s3_file_write_bufferpool.h"
+#include "util/runtime_profile.h"
+#include "util/s3_util.h"
 
 namespace doris {
 namespace io {
+using namespace Aws::S3::Model;
+using Aws::S3::S3Client;
 
-// max size of each part when uploading: 5MB
-static const int MAX_SIZE_EACH_PART = 5 * 1024 * 1024;
-static const char* STREAM_TAG = "S3FileWriter";
-
-S3FileWriter::S3FileWriter(Path path, std::shared_ptr<Aws::S3::S3Client> client,
+S3FileWriter::S3FileWriter(Path&& path, std::string&& key, std::shared_ptr<S3Client> client,
                            const S3Conf& s3_conf, FileSystemSPtr fs)
-        : FileWriter(std::move(path), fs), _client(client), _s3_conf(s3_conf) {
-    DorisMetrics::instance()->s3_file_open_writing->increment(1);
-    DorisMetrics::instance()->s3_file_writer_total->increment(1);
-}
+        : FileWriter(std::move(path), std::move(fs)),
+          _bucket(s3_conf.bucket),
+          _key(std::move(key)),
+          _client(std::move(client)) {}
 
 S3FileWriter::~S3FileWriter() {
-    if (_opened) {
-        close();
+    if (!_closed) {
+        abort();

Review Comment:
   You change the `close()` to `abort()` there. Does it mean that we must call `close()` explicitly, otherwise the write operation may be aborted?
   If yes, better to add comment for S3FileWriter



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org