You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by "github-actions[bot] (via GitHub)" <gi...@apache.org> on 2023/04/04 03:52:48 UTC

[GitHub] [doris] github-actions[bot] commented on a diff in pull request #17585: [feature](io) enable s3 file writer with multi part uploading concurrently

github-actions[bot] commented on code in PR #17585:
URL: https://github.com/apache/doris/pull/17585#discussion_r1156699107


##########
be/src/io/fs/s3_file_write_bufferpool.h:
##########
@@ -0,0 +1,141 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <condition_variable>
+#include <cstdint>
+#include <fstream>
+#include <functional>
+#include <list>
+#include <memory>
+#include <mutex>
+
+#include "io/fs/s3_common.h"
+#include "common/config.h"
+#include "common/status.h"
+#include "util/slice.h"
+
+namespace doris {
+namespace io {
+
+// TODO(AlexYue): 1. support write into cache 2. unify write buffer and read buffer
+struct S3FileBuffer : public std::enable_shared_from_this<S3FileBuffer> {

Review Comment:
   warning: 'S3FileBuffer' defined as a struct here but previously declared as a class; this is valid, but may result in linker errors under the Microsoft C++ ABI [clang-diagnostic-mismatched-tags]
   ```cpp
   struct S3FileBuffer : public std::enable_shared_from_this<S3FileBuffer> {
   ^
   ```
   **be/src/io/fs/s3_file_writer.h:36:** did you mean struct here?
   ```cpp
   class S3FileBuffer;
   ^
   ```
   



##########
be/src/io/fs/s3_file_writer.h:
##########
@@ -33,38 +34,94 @@ class S3Client;
 
 namespace doris {
 namespace io {
+class S3FileBuffer;
 
 class S3FileWriter final : public FileWriter {
 public:
-    S3FileWriter(Path path, std::shared_ptr<Aws::S3::S3Client> client, const S3Conf& s3_conf,
+    S3FileWriter(Path&& path, std::string&& key, std::shared_ptr<Aws::S3::S3Client> client, const S3Conf& s3_conf,
                  FileSystemSPtr fs);
     ~S3FileWriter() override;
 
+    Status open() ;
+
     Status close() override;
+
     Status abort() override;
     Status appendv(const Slice* data, size_t data_cnt) override;
     Status finalize() override;
     Status write_at(size_t offset, const Slice& data) override {
         return Status::NotSupported("not support");
     }
 
-private:
-    Status _close();
-    Status _open();
-    Status _upload_part();
-    void _reset_stream();
+    size_t bytes_appended() const  { return _bytes_appended; }
+
+    int64_t upload_cost_ms() const { return *_upload_cost_ms; }
 
 private:
+    class WaitGroup {
+    public:
+        WaitGroup() = default;
+
+        ~WaitGroup() = default;
+
+        WaitGroup(const WaitGroup&) = delete;
+        WaitGroup(WaitGroup&&) = delete;
+        void operator=(const WaitGroup&) = delete;
+        void operator=(WaitGroup&&) = delete;
+        // add one counter indicating one more concurrent worker
+        void add(int count = 1) { _count += count; }
+
+        // decrese count if one concurrent worker finished it's work
+        void done() {
+            _count--;
+            if (_count.load() <= 0) {
+                _cv.notify_all();
+            }
+        }
+
+        // wait for all concurrent workers finish their work
+        // would return if timeout, default timeout would be 5min
+        void wait(int64_t timeout_seconds = 300) {
+            if (_count.load() <= 0) {
+                return;
+            }
+            std::unique_lock<std::mutex> lck {_lock};
+            _cv.wait_for(lck, std::chrono::seconds(timeout_seconds),
+                         [this]() { return _count.load() <= 0; });
+        }
+
+    private:
+        std::mutex _lock;
+        std::condition_variable _cv;
+        std::atomic_int64_t _count {0};
+    };
+    Status _complete();
+    void _upload_one_part(int64_t part_num, S3FileBuffer& buf);
+
+    std::string _bucket;
+    std::string _key;
+    bool _closed = true;
+    bool _opened = false;
+
+    std::shared_ptr<int64_t> _upload_cost_ms;
+
     std::shared_ptr<Aws::S3::S3Client> _client;
-    S3Conf _s3_conf;
     std::string _upload_id;
-    bool _is_open = false;
-    bool _closed = false;
+    size_t _bytes_appended {0};
+    size_t _index_offset {0};

Review Comment:
   warning: private field '_index_offset' is not used [clang-diagnostic-unused-private-field]
   ```cpp
       size_t _index_offset {0};
              ^
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org