You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2023/06/29 11:03:56 UTC

[doris] branch master updated: [feature][fix](fs)(s3)add fs_s3 benchmark tool and fix s3 file writer bug (#20926)

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 41ccf77c7d [feature][fix](fs)(s3)add fs_s3 benchmark tool and fix s3 file writer bug (#20926)
41ccf77c7d is described below

commit 41ccf77c7d0409fe3053829c66f7c205e4c03c6e
Author: zhangdong <49...@qq.com>
AuthorDate: Thu Jun 29 19:03:49 2023 +0800

    [feature][fix](fs)(s3)add fs_s3 benchmark tool and fix s3 file writer bug (#20926)
    
    1.
    Fix bug that the field of s3_file_write_bufferpool is not initialized, causing undefined behavior.
    
    2.
    add fs_s3 benchmark tool,Reference to the usage of tools https://github.com/apache/doris/pull/20770
    And opt the output:
    
    `sh bin/run-fs-benchmark.sh --conf=conf/s3.conf --fs_type=s3 --operation=single_read --threads=1 --iterations=1`
    
    ```
    ------------------------------------------------------------------------------------------------------------------------------
    Benchmark                                                                    Time             CPU   Iterations UserCounters...
    ------------------------------------------------------------------------------------------------------------------------------
    S3ReadBenchmark/iterations:1/repeats:3/manual_time/threads:1              7366 ms          123 ms            1 ReadRate(B/S)=12.1823M/s ReadTime(S)=7.36572 ReadTotal(B)=89.7314M
    S3ReadBenchmark/iterations:1/repeats:3/manual_time/threads:1              6163 ms          116 ms            1 ReadRate(B/S)=14.5597M/s ReadTime(S)=6.16299 ReadTotal(B)=89.7314M
    S3ReadBenchmark/iterations:1/repeats:3/manual_time/threads:1              6048 ms          110 ms            1 ReadRate(B/S)=14.8366M/s ReadTime(S)=6.04796 ReadTotal(B)=89.7314M
    S3ReadBenchmark/iterations:1/repeats:3/manual_time/threads:1_mean         6526 ms          116 ms            3 ReadRate(B/S)=13.8596M/s ReadTime(S)=6.52556 ReadTotal(B)=89.7314M
    S3ReadBenchmark/iterations:1/repeats:3/manual_time/threads:1_median       6163 ms          116 ms            3 ReadRate(B/S)=14.5597M/s ReadTime(S)=6.16299 ReadTotal(B)=89.7314M
    S3ReadBenchmark/iterations:1/repeats:3/manual_time/threads:1_stddev        730 ms         6.68 ms            3 ReadRate(B/S)=1.45914M/s ReadTime(S)=0.729876 ReadTotal(B)=0
    S3ReadBenchmark/iterations:1/repeats:3/manual_time/threads:1_cv          11.18 %          5.75 %             3 ReadRate(B/S)=10.53% ReadTime(S)=11.18% ReadTotal(B)=0.00%
    S3ReadBenchmark/iterations:1/repeats:3/manual_time/threads:1_max          7366 ms          123 ms            3 ReadRate(B/S)=14.8366M/s ReadTime(S)=7.36572 ReadTotal(B)=89.7314M
    S3ReadBenchmark/iterations:1/repeats:3/manual_time/threads:1_min          6048 ms          110 ms            3 ReadRate(B/S)=12.1823M/s ReadTime(S)=6.04796 ReadTotal(B)=89.7314M
    ```
---
 be/src/io/fs/benchmark/base_benchmark.h      | 117 +++++++++++++++-
 be/src/io/fs/benchmark/benchmark_factory.hpp |  14 +-
 be/src/io/fs/benchmark/fs_benchmark_tool.cpp |  11 ++
 be/src/io/fs/benchmark/hdfs_benchmark.hpp    | 166 +++++------------------
 be/src/io/fs/benchmark/s3_benchmark.hpp      | 192 ++++++++++++++++++++++++---
 be/src/io/fs/s3_file_write_bufferpool.cpp    |  22 +--
 be/src/io/fs/s3_file_write_bufferpool.h      |  14 +-
 be/src/service/doris_main.cpp                |   7 +
 8 files changed, 371 insertions(+), 172 deletions(-)

diff --git a/be/src/io/fs/benchmark/base_benchmark.h b/be/src/io/fs/benchmark/base_benchmark.h
index c28ad02de5..41dae7cea2 100644
--- a/be/src/io/fs/benchmark/base_benchmark.h
+++ b/be/src/io/fs/benchmark/base_benchmark.h
@@ -27,6 +27,9 @@
 #include <vector>
 
 #include "common/status.h"
+#include "io/fs/file_reader.h"
+#include "io/fs/file_writer.h"
+#include "util/slice.h"
 
 namespace doris::io {
 
@@ -44,24 +47,22 @@ void bm_log(const std::string& fmt, Args&&... args) {
 class BaseBenchmark {
 public:
     BaseBenchmark(const std::string& name, int threads, int iterations, size_t file_size,
-                  int repetitions, const std::map<std::string, std::string>& conf_map)
+                  const std::map<std::string, std::string>& conf_map)
             : _name(name),
               _threads(threads),
               _iterations(iterations),
               _file_size(file_size),
-              _repetitions(repetitions),
               _conf_map(conf_map) {}
     virtual ~BaseBenchmark() = default;
 
     virtual Status init() { return Status::OK(); }
     virtual Status run(benchmark::State& state) { return Status::OK(); }
 
+    void set_repetition(int rep) { _repetitions = rep; }
+
     void register_bm() {
         auto bm = benchmark::RegisterBenchmark(_name.c_str(), [&](benchmark::State& state) {
-            Status st;
-            if (state.thread_index() == 0) {
-                st = this->init();
-            }
+            Status st = this->init();
             if (st != Status::OK()) {
                 bm_log("Benchmark {} init error: {}", _name, st.to_string());
                 return;
@@ -92,12 +93,114 @@ public:
         });
     }
 
+    virtual std::string get_file_path(benchmark::State& state) {
+        std::string base_dir = _conf_map["base_dir"];
+        std::string file_path;
+        if (base_dir.ends_with("/")) {
+            file_path = fmt::format("{}test_{}", base_dir, state.thread_index());
+        } else {
+            file_path = fmt::format("{}/test_{}", base_dir, state.thread_index());
+        }
+        bm_log("file_path: {}", file_path);
+        return file_path;
+    }
+
+    Status read(benchmark::State& state, FileReaderSPtr reader) {
+        bm_log("begin to read {}", _name);
+        size_t buffer_size =
+                _conf_map.contains("buffer_size") ? std::stol(_conf_map["buffer_size"]) : 1000000L;
+        std::vector<char> buffer;
+        buffer.resize(buffer_size);
+        doris::Slice data = {buffer.data(), buffer.size()};
+        size_t offset = 0;
+        size_t bytes_read = 0;
+
+        size_t read_size = reader->size();
+        if (_file_size > 0) {
+            read_size = std::min(read_size, _file_size);
+        }
+        long remaining_size = read_size;
+
+        Status status;
+        auto start = std::chrono::high_resolution_clock::now();
+        while (remaining_size > 0) {
+            bytes_read = 0;
+            size_t size = std::min(buffer_size, (size_t)remaining_size);
+            data.size = size;
+            status = reader->read_at(offset, data, &bytes_read);
+            if (status != Status::OK() || bytes_read < 0) {
+                bm_log("reader read_at error: {}", status.to_string());
+                break;
+            }
+            if (bytes_read == 0) { // EOF
+                break;
+            }
+            offset += bytes_read;
+            remaining_size -= bytes_read;
+        }
+        auto end = std::chrono::high_resolution_clock::now();
+        auto elapsed_seconds =
+                std::chrono::duration_cast<std::chrono::duration<double>>(end - start);
+        state.SetIterationTime(elapsed_seconds.count());
+        state.counters["ReadRate(B/S)"] =
+                benchmark::Counter(read_size, benchmark::Counter::kIsRate);
+        state.counters["ReadTotal(B)"] = read_size;
+        state.counters["ReadTime(S)"] = elapsed_seconds.count();
+
+        if (status.ok() && reader != nullptr) {
+            status = reader->close();
+        }
+        bm_log("finish to read {}, size {}, seconds: {}, status: {}", _name, read_size,
+               elapsed_seconds.count(), status);
+        return status;
+    }
+
+    Status write(benchmark::State& state, FileWriter* writer) {
+        bm_log("begin to write {}, size: {}", _name, _file_size);
+        size_t write_size = _file_size;
+        size_t buffer_size =
+                _conf_map.contains("buffer_size") ? std::stol(_conf_map["buffer_size"]) : 1000000L;
+        long remaining_size = write_size;
+        std::vector<char> buffer;
+        buffer.resize(buffer_size);
+        doris::Slice data = {buffer.data(), buffer.size()};
+
+        Status status;
+        auto start = std::chrono::high_resolution_clock::now();
+        while (remaining_size > 0) {
+            size_t size = std::min(buffer_size, (size_t)remaining_size);
+            data.size = size;
+            status = writer->append(data);
+            if (status != Status::OK()) {
+                bm_log("writer append error: {}", status.to_string());
+                break;
+            }
+            remaining_size -= size;
+        }
+        if (status.ok() && writer != nullptr) {
+            status = writer->close();
+        }
+
+        auto end = std::chrono::high_resolution_clock::now();
+        auto elapsed_seconds =
+                std::chrono::duration_cast<std::chrono::duration<double>>(end - start);
+        state.SetIterationTime(elapsed_seconds.count());
+        state.counters["WriteRate(B/S)"] =
+                benchmark::Counter(write_size, benchmark::Counter::kIsRate);
+        state.counters["WriteTotal(B)"] = write_size;
+        state.counters["WriteTime(S)"] = elapsed_seconds.count();
+
+        bm_log("finish to write {}, size: {}, seconds: {}, status: {}", _name, write_size,
+               elapsed_seconds.count(), status);
+        return status;
+    }
+
 protected:
     std::string _name;
     int _threads;
     int _iterations;
     size_t _file_size;
-    int _repetitions = 1;
+    int _repetitions = 3;
     std::map<std::string, std::string> _conf_map;
 };
 
diff --git a/be/src/io/fs/benchmark/benchmark_factory.hpp b/be/src/io/fs/benchmark/benchmark_factory.hpp
index 3e8c9314ca..0b8af3b96b 100644
--- a/be/src/io/fs/benchmark/benchmark_factory.hpp
+++ b/be/src/io/fs/benchmark/benchmark_factory.hpp
@@ -38,8 +38,18 @@ Status BenchmarkFactory::getBm(const std::string fs_type, const std::string op_t
                                const std::map<std::string, std::string>& conf_map,
                                BaseBenchmark** bm) {
     if (fs_type == "s3") {
-        if (op_type == "read") {
-            *bm = new S3ReadBenchmark(threads, iterations, file_size, conf_map);
+        if (op_type == "create_write") {
+            *bm = new S3CreateWriteBenchmark(threads, iterations, file_size, conf_map);
+        } else if (op_type == "open_read") {
+            *bm = new S3OpenReadBenchmark(threads, iterations, file_size, conf_map);
+        } else if (op_type == "single_read") {
+            *bm = new S3SingleReadBenchmark(threads, iterations, file_size, conf_map);
+        } else if (op_type == "rename") {
+            *bm = new S3RenameBenchmark(threads, iterations, file_size, conf_map);
+        } else if (op_type == "exists") {
+            *bm = new S3ExistsBenchmark(threads, iterations, file_size, conf_map);
+        } else if (op_type == "list") {
+            *bm = new S3ListBenchmark(threads, iterations, file_size, conf_map);
         } else {
             return Status::Error<ErrorCode::INVALID_ARGUMENT>(
                     "unknown params: fs_type: {}, op_type: {}, iterations: {}", fs_type, op_type,
diff --git a/be/src/io/fs/benchmark/fs_benchmark_tool.cpp b/be/src/io/fs/benchmark/fs_benchmark_tool.cpp
index a5be5db80a..50085ae1e7 100644
--- a/be/src/io/fs/benchmark/fs_benchmark_tool.cpp
+++ b/be/src/io/fs/benchmark/fs_benchmark_tool.cpp
@@ -20,6 +20,8 @@
 #include <fstream>
 
 #include "io/fs/benchmark/benchmark_factory.hpp"
+#include "io/fs/s3_file_write_bufferpool.h"
+#include "util/threadpool.h"
 
 DEFINE_string(fs_type, "hdfs", "Supported File System: s3, hdfs");
 DEFINE_string(operation, "create_write",
@@ -107,6 +109,15 @@ int main(int argc, char** argv) {
         return 1;
     }
 
+    // init s3 write buffer pool
+    std::unique_ptr<doris::ThreadPool> buffered_reader_prefetch_thread_pool;
+    doris::ThreadPoolBuilder("BufferedReaderPrefetchThreadPool")
+            .set_min_threads(16)
+            .set_max_threads(64)
+            .build(&buffered_reader_prefetch_thread_pool);
+    doris::io::S3FileBufferPool* s3_buffer_pool = doris::io::S3FileBufferPool::GetInstance();
+    s3_buffer_pool->init(524288000, 5242880, buffered_reader_prefetch_thread_pool.get());
+
     try {
         doris::io::MultiBenchmark multi_bm(FLAGS_fs_type, FLAGS_operation, std::stoi(FLAGS_threads),
                                            std::stoi(FLAGS_iterations), std::stol(FLAGS_file_size),
diff --git a/be/src/io/fs/benchmark/hdfs_benchmark.hpp b/be/src/io/fs/benchmark/hdfs_benchmark.hpp
index 1307ddc95a..b508e14a24 100644
--- a/be/src/io/fs/benchmark/hdfs_benchmark.hpp
+++ b/be/src/io/fs/benchmark/hdfs_benchmark.hpp
@@ -33,75 +33,30 @@ class HdfsOpenReadBenchmark : public BaseBenchmark {
 public:
     HdfsOpenReadBenchmark(int threads, int iterations, size_t file_size,
                           const std::map<std::string, std::string>& conf_map)
-            : BaseBenchmark("HdfsReadBenchmark", threads, iterations, file_size, 3, conf_map) {}
+            : BaseBenchmark("HdfsReadBenchmark", threads, iterations, file_size, conf_map) {}
     virtual ~HdfsOpenReadBenchmark() = default;
 
-    Status init() override { return Status::OK(); }
-
-    virtual std::string get_file_path(benchmark::State& state) {
-        std::string base_dir = _conf_map["base_dir"];
-        auto file_path = fmt::format("{}/test_{}", base_dir, state.thread_index());
-        bm_log("file_path: {}", file_path);
-        return file_path;
+    virtual void set_default_file_size() {
+        if (_file_size <= 0) {
+            _file_size = 10 * 1024 * 1024; // default 10MB
+        }
     }
 
     Status run(benchmark::State& state) override {
+        auto file_path = get_file_path(state);
+
+        auto start = std::chrono::high_resolution_clock::now();
         std::shared_ptr<io::FileSystem> fs;
         io::FileReaderSPtr reader;
-        bm_log("begin to init {}", _name);
-        size_t buffer_size =
-                _conf_map.contains("buffer_size") ? std::stol(_conf_map["buffer_size"]) : 1000000L;
         io::FileReaderOptions reader_opts = FileFactory::get_reader_options(nullptr);
         THdfsParams hdfs_params = parse_properties(_conf_map);
-
-        auto file_path = get_file_path(state);
         RETURN_IF_ERROR(
                 FileFactory::create_hdfs_reader(hdfs_params, file_path, &fs, &reader, reader_opts));
-        bm_log("finish to init {}", _name);
-
-        bm_log("begin to run {}", _name);
-        Status status;
-        std::vector<char> buffer;
-        buffer.resize(buffer_size);
-        doris::Slice data = {buffer.data(), buffer.size()};
-        size_t offset = 0;
-        size_t bytes_read = 0;
-
-        size_t read_size = reader->size();
-        if (_file_size > 0) {
-            read_size = std::min(read_size, _file_size);
-        }
-        long remaining_size = read_size;
-
-        auto start = std::chrono::high_resolution_clock::now();
-        while (remaining_size > 0) {
-            bytes_read = 0;
-            size_t size = std::min(buffer_size, (size_t)remaining_size);
-            data.size = size;
-            status = reader->read_at(offset, data, &bytes_read);
-            if (status != Status::OK() || bytes_read < 0) {
-                bm_log("reader read_at error: {}", status.to_string());
-                break;
-            }
-            if (bytes_read == 0) { // EOF
-                break;
-            }
-            offset += bytes_read;
-            remaining_size -= bytes_read;
-        }
-        bm_log("finish to run {}", _name);
         auto end = std::chrono::high_resolution_clock::now();
-
         auto elapsed_seconds =
                 std::chrono::duration_cast<std::chrono::duration<double>>(end - start);
-
-        state.SetIterationTime(elapsed_seconds.count());
-        state.counters["ReadRate"] = benchmark::Counter(read_size, benchmark::Counter::kIsRate);
-
-        if (reader != nullptr) {
-            reader->close();
-        }
-        return status;
+        state.counters["OpenReaderTime(S)"] = elapsed_seconds.count();
+        return read(state, reader);
     }
 };
 
@@ -113,6 +68,10 @@ public:
             : HdfsOpenReadBenchmark(threads, iterations, file_size, conf_map) {}
     virtual ~HdfsSingleReadBenchmark() = default;
 
+    virtual void set_default_file_size() override {
+        // do nothing, default is 0, which means it will read the whole file
+    }
+
     virtual std::string get_file_path(benchmark::State& state) override {
         std::string file_path = _conf_map["file_path"];
         bm_log("file_path: {}", file_path);
@@ -124,56 +83,20 @@ class HdfsCreateWriteBenchmark : public BaseBenchmark {
 public:
     HdfsCreateWriteBenchmark(int threads, int iterations, size_t file_size,
                              const std::map<std::string, std::string>& conf_map)
-            : BaseBenchmark("HdfsCreateWriteBenchmark", threads, iterations, file_size, 3,
-                            conf_map) {}
+            : BaseBenchmark("HdfsCreateWriteBenchmark", threads, iterations, file_size, conf_map) {}
     virtual ~HdfsCreateWriteBenchmark() = default;
 
-    Status init() override { return Status::OK(); }
-
     Status run(benchmark::State& state) override {
-        bm_log("begin to run {}", _name);
-        std::string base_dir = _conf_map["base_dir"];
-        io::FileReaderOptions reader_opts = FileFactory::get_reader_options(nullptr);
-        THdfsParams hdfs_params = parse_properties(_conf_map);
-        auto file_path = fmt::format("{}/test_{}", base_dir, state.thread_index());
-        bm_log("file_path: {}", file_path);
-
-        auto start = std::chrono::high_resolution_clock::now();
+        auto file_path = get_file_path(state);
+        if (_file_size <= 0) {
+            _file_size = 10 * 1024 * 1024; // default 10MB
+        }
         std::shared_ptr<io::HdfsFileSystem> fs;
         io::FileWriterPtr writer;
+        THdfsParams hdfs_params = parse_properties(_conf_map);
         RETURN_IF_ERROR(io::HdfsFileSystem::create(hdfs_params, "", &fs));
         RETURN_IF_ERROR(fs->create_file(file_path, &writer));
-        Status status;
-        size_t write_size = _file_size;
-        size_t buffer_size =
-                _conf_map.contains("buffer_size") ? std::stol(_conf_map["buffer_size"]) : 1000000L;
-        long remaining_size = write_size;
-        std::vector<char> buffer;
-        buffer.resize(buffer_size);
-        doris::Slice data = {buffer.data(), buffer.size()};
-        while (remaining_size > 0) {
-            size_t size = std::min(buffer_size, (size_t)remaining_size);
-            data.size = size;
-            status = writer->append(data);
-            if (status != Status::OK()) {
-                bm_log("writer append error: {}", status.to_string());
-                break;
-            }
-            remaining_size -= size;
-        }
-        auto end = std::chrono::high_resolution_clock::now();
-        auto elapsed_seconds =
-                std::chrono::duration_cast<std::chrono::duration<double>>(end - start);
-
-        state.SetIterationTime(elapsed_seconds.count());
-        bm_log("finish to run {}", _name);
-
-        state.counters["WriteRate"] = benchmark::Counter(write_size, benchmark::Counter::kIsRate);
-
-        if (writer != nullptr) {
-            writer->close();
-        }
-        return status;
+        return write(state, writer.get());
     }
 };
 
@@ -181,75 +104,56 @@ class HdfsRenameBenchmark : public BaseBenchmark {
 public:
     HdfsRenameBenchmark(int threads, int iterations, size_t file_size,
                         const std::map<std::string, std::string>& conf_map)
-            : BaseBenchmark("HdfsRenameBenchmark", threads, 1, file_size, 1, conf_map) {}
+            : BaseBenchmark("HdfsRenameBenchmark", threads, iterations, file_size, conf_map) {
+        // rename can only do once
+        set_repetition(1);
+    }
     virtual ~HdfsRenameBenchmark() = default;
 
-    Status init() override { return Status::OK(); }
-
     Status run(benchmark::State& state) override {
-        bm_log("begin to run {}", _name);
-        std::string base_dir = _conf_map["base_dir"];
-        io::FileReaderOptions reader_opts = FileFactory::get_reader_options(nullptr);
+        auto file_path = get_file_path(state);
+        auto new_file_path = file_path + "_new";
         THdfsParams hdfs_params = parse_properties(_conf_map);
-        auto file_path = fmt::format("{}/test_{}", base_dir, state.thread_index());
-        auto new_file_path = fmt::format("{}/test_{}_new", base_dir, state.thread_index());
-        bm_log("file_path: {}", file_path);
-
-        auto start = std::chrono::high_resolution_clock::now();
         std::shared_ptr<io::HdfsFileSystem> fs;
-        io::FileWriterPtr writer;
         RETURN_IF_ERROR(io::HdfsFileSystem::create(hdfs_params, "", &fs));
+
+        auto start = std::chrono::high_resolution_clock::now();
         RETURN_IF_ERROR(fs->rename(file_path, new_file_path));
         auto end = std::chrono::high_resolution_clock::now();
         auto elapsed_seconds =
                 std::chrono::duration_cast<std::chrono::duration<double>>(end - start);
-
         state.SetIterationTime(elapsed_seconds.count());
-        bm_log("finish to run {}", _name);
-
         state.counters["RenameCost"] =
                 benchmark::Counter(1, benchmark::Counter::kIsRate | benchmark::Counter::kInvert);
 
-        if (writer != nullptr) {
-            writer->close();
-        }
         return Status::OK();
     }
-
-private:
 };
 
 class HdfsExistsBenchmark : public BaseBenchmark {
 public:
     HdfsExistsBenchmark(int threads, int iterations, size_t file_size,
                         const std::map<std::string, std::string>& conf_map)
-            : BaseBenchmark("HdfsExistsBenchmark", threads, iterations, file_size, 3, conf_map) {}
+            : BaseBenchmark("HdfsExistsBenchmark", threads, iterations, file_size, conf_map) {}
     virtual ~HdfsExistsBenchmark() = default;
 
-    Status init() override { return Status::OK(); }
-
     Status run(benchmark::State& state) override {
-        bm_log("begin to run {}", _name);
-        std::string base_dir = _conf_map["base_dir"];
-        io::FileReaderOptions reader_opts = FileFactory::get_reader_options(nullptr);
-        THdfsParams hdfs_params = parse_properties(_conf_map);
-        auto file_path = fmt::format("{}/test_{}", base_dir, state.thread_index());
-        bm_log("file_path: {}", file_path);
+        auto file_path = get_file_path(state);
 
-        auto start = std::chrono::high_resolution_clock::now();
         std::shared_ptr<io::HdfsFileSystem> fs;
+        THdfsParams hdfs_params = parse_properties(_conf_map);
         RETURN_IF_ERROR(io::HdfsFileSystem::create(hdfs_params, "", &fs));
+
+        auto start = std::chrono::high_resolution_clock::now();
         bool res = false;
         RETURN_IF_ERROR(fs->exists(file_path, &res));
         auto end = std::chrono::high_resolution_clock::now();
         auto elapsed_seconds =
                 std::chrono::duration_cast<std::chrono::duration<double>>(end - start);
-
         state.SetIterationTime(elapsed_seconds.count());
-        bm_log("finish to run {}", _name);
-
         state.counters["ExistsCost"] =
                 benchmark::Counter(1, benchmark::Counter::kIsRate | benchmark::Counter::kInvert);
+
         return Status::OK();
     }
 };
diff --git a/be/src/io/fs/benchmark/s3_benchmark.hpp b/be/src/io/fs/benchmark/s3_benchmark.hpp
index 7e958cefdb..c2ee8ddd99 100644
--- a/be/src/io/fs/benchmark/s3_benchmark.hpp
+++ b/be/src/io/fs/benchmark/s3_benchmark.hpp
@@ -19,41 +19,193 @@
 
 #include "io/file_factory.h"
 #include "io/fs/benchmark/base_benchmark.h"
+#include "io/fs/file_writer.h"
 #include "io/fs/s3_file_reader.h"
 #include "io/fs/s3_file_system.h"
+#include "runtime/exec_env.h"
+#include "util/s3_uri.h"
 #include "util/slice.h"
 
 namespace doris::io {
 
-class S3ReadBenchmark : public BaseBenchmark {
+class S3Benchmark : public BaseBenchmark {
 public:
-    S3ReadBenchmark(int threads, int iterations, size_t file_size,
-                    const std::map<std::string, std::string>& conf_map)
-            : BaseBenchmark("S3ReadBenchmark", threads, iterations, file_size, 3, conf_map),
-              _result(buffer, 128) {}
-    virtual ~S3ReadBenchmark() = default;
+    S3Benchmark(const std::string& name, int threads, int iterations, size_t file_size,
+                const std::map<std::string, std::string>& conf_map)
+            : BaseBenchmark(name, threads, iterations, file_size, conf_map) {}
+    virtual ~S3Benchmark() = default;
 
-    Status init() override {
-        bm_log("begin to init {}", _name);
-        std::string file_path = _conf_map["file"];
-        io::FileReaderOptions reader_opts = FileFactory::get_reader_options(nullptr);
+    Status get_fs(const std::string& path) {
+        S3URI s3_uri(path);
+        RETURN_IF_ERROR(s3_uri.parse());
         RETURN_IF_ERROR(
-                FileFactory::create_s3_reader(_conf_map, file_path, &_fs, &_reader, reader_opts));
-        bm_log("finish to init {}", _name);
+                S3ClientFactory::convert_properties_to_s3_conf(_conf_map, s3_uri, &_s3_conf));
+        return io::S3FileSystem::create(std::move(_s3_conf), "", &_fs);
+    }
+
+protected:
+    doris::S3Conf _s3_conf;
+    std::shared_ptr<io::S3FileSystem> _fs;
+};
+
+class S3OpenReadBenchmark : public S3Benchmark {
+public:
+    S3OpenReadBenchmark(int threads, int iterations, size_t file_size,
+                        const std::map<std::string, std::string>& conf_map)
+            : S3Benchmark("S3ReadBenchmark", threads, iterations, file_size, conf_map) {}
+    virtual ~S3OpenReadBenchmark() = default;
+
+    virtual void set_default_file_size() {
+        if (_file_size <= 0) {
+            _file_size = 10 * 1024 * 1024; // default 10MB
+        }
+    }
+
+    Status run(benchmark::State& state) override {
+        auto file_path = get_file_path(state);
+        RETURN_IF_ERROR(get_fs(file_path));
+
+        io::FileReaderSPtr reader;
+        io::FileReaderOptions reader_opts = FileFactory::get_reader_options(nullptr);
+        RETURN_IF_ERROR(FileFactory::create_s3_reader(
+                _conf_map, file_path, reinterpret_cast<std::shared_ptr<io::FileSystem>*>(&_fs),
+                &reader, reader_opts));
+
+        return read(state, reader);
+    }
+};
+
+// Read a single specified file
+class S3SingleReadBenchmark : public S3OpenReadBenchmark {
+public:
+    S3SingleReadBenchmark(int threads, int iterations, size_t file_size,
+                          const std::map<std::string, std::string>& conf_map)
+            : S3OpenReadBenchmark(threads, iterations, file_size, conf_map) {}
+    virtual ~S3SingleReadBenchmark() = default;
+
+    virtual void set_default_file_size() override {}
+
+    virtual std::string get_file_path(benchmark::State& state) override {
+        std::string file_path = _conf_map["file_path"];
+        bm_log("file_path: {}", file_path);
+        return file_path;
+    }
+};
+
+class S3CreateWriteBenchmark : public S3Benchmark {
+public:
+    S3CreateWriteBenchmark(int threads, int iterations, size_t file_size,
+                           const std::map<std::string, std::string>& conf_map)
+            : S3Benchmark("S3CreateWriteBenchmark", threads, iterations, file_size, conf_map) {}
+    virtual ~S3CreateWriteBenchmark() = default;
+
+    Status run(benchmark::State& state) override {
+        auto file_path = get_file_path(state);
+        if (_file_size <= 0) {
+            _file_size = 10 * 1024 * 1024; // default 10MB
+        }
+        RETURN_IF_ERROR(get_fs(file_path));
+
+        io::FileWriterPtr writer;
+        RETURN_IF_ERROR(_fs->create_file(file_path, &writer));
+        return write(state, writer.get());
+    }
+};
+
+class S3ListBenchmark : public S3Benchmark {
+public:
+    S3ListBenchmark(int threads, int iterations, size_t file_size,
+                    const std::map<std::string, std::string>& conf_map)
+            : S3Benchmark("S3ListBenchmark", threads, iterations, file_size, conf_map) {}
+    virtual ~S3ListBenchmark() = default;
+
+    virtual std::string get_file_path(benchmark::State& state) override {
+        return _conf_map["base_dir"];
+    }
+
+    Status run(benchmark::State& state) override {
+        auto file_path = get_file_path(state);
+        RETURN_IF_ERROR(get_fs(file_path));
+
+        auto start = std::chrono::high_resolution_clock::now();
+        std::vector<FileInfo> files;
+        bool exists = true;
+        RETURN_IF_ERROR(_fs->list(file_path, true, &files, &exists));
+        auto end = std::chrono::high_resolution_clock::now();
+        auto elapsed_seconds =
+                std::chrono::duration_cast<std::chrono::duration<double>>(end - start);
+        state.SetIterationTime(elapsed_seconds.count());
+        state.counters["ListCost"] =
+                benchmark::Counter(1, benchmark::Counter::kIsRate | benchmark::Counter::kInvert);
+
+        std::stringstream ss;
+        int i = 0;
+        for (auto& file_info : files) {
+            if (i > 2) {
+                break;
+            }
+            ++i;
+            ss << "[" << file_info.file_name << ", " << file_info.file_size << ", "
+               << file_info.is_file << "] ";
+        }
+        bm_log("list files: {}", ss.str());
+
         return Status::OK();
     }
+};
+
+class S3RenameBenchmark : public S3Benchmark {
+public:
+    S3RenameBenchmark(int threads, int iterations, size_t file_size,
+                      const std::map<std::string, std::string>& conf_map)
+            : S3Benchmark("S3RenameBenchmark", threads, iterations, file_size, conf_map) {
+        // rename can only do once
+        set_repetition(1);
+    }
+
+    virtual ~S3RenameBenchmark() = default;
 
     Status run(benchmark::State& state) override {
-        return _reader->read_at(0, _result, &_bytes_read);
+        auto file_path = get_file_path(state);
+        auto new_file_path = file_path + "_new";
+        RETURN_IF_ERROR(get_fs(file_path));
+
+        auto start = std::chrono::high_resolution_clock::now();
+        RETURN_IF_ERROR(_fs->rename(file_path, new_file_path));
+        auto end = std::chrono::high_resolution_clock::now();
+        auto elapsed_seconds =
+                std::chrono::duration_cast<std::chrono::duration<double>>(end - start);
+        state.SetIterationTime(elapsed_seconds.count());
+        state.counters["RenameCost"] =
+                benchmark::Counter(1, benchmark::Counter::kIsRate | benchmark::Counter::kInvert);
+
+        return Status::OK();
     }
+};
 
-private:
-    doris::S3Conf _s3_conf;
-    std::shared_ptr<io::FileSystem> _fs;
-    io::FileReaderSPtr _reader;
-    char buffer[128];
-    doris::Slice _result;
-    size_t _bytes_read = 0;
+class S3ExistsBenchmark : public S3Benchmark {
+public:
+    S3ExistsBenchmark(int threads, int iterations, size_t file_size,
+                      const std::map<std::string, std::string>& conf_map)
+            : S3Benchmark("S3ExistsBenchmark", threads, iterations, file_size, conf_map) {}
+    virtual ~S3ExistsBenchmark() = default;
+
+    Status run(benchmark::State& state) override {
+        auto file_path = get_file_path(state);
+        RETURN_IF_ERROR(get_fs(file_path));
+
+        auto start = std::chrono::high_resolution_clock::now();
+        bool res = false;
+        RETURN_IF_ERROR(_fs->exists(file_path, &res));
+        auto end = std::chrono::high_resolution_clock::now();
+        auto elapsed_seconds =
+                std::chrono::duration_cast<std::chrono::duration<double>>(end - start);
+        state.SetIterationTime(elapsed_seconds.count());
+        state.counters["ExistsCost"] =
+                benchmark::Counter(1, benchmark::Counter::kIsRate | benchmark::Counter::kInvert);
+
+        return Status::OK();
+    }
 };
 
 } // namespace doris::io
diff --git a/be/src/io/fs/s3_file_write_bufferpool.cpp b/be/src/io/fs/s3_file_write_bufferpool.cpp
index c6ec1a8b60..48887f9c6e 100644
--- a/be/src/io/fs/s3_file_write_bufferpool.cpp
+++ b/be/src/io/fs/s3_file_write_bufferpool.cpp
@@ -24,6 +24,7 @@
 #include "io/fs/s3_common.h"
 #include "runtime/exec_env.h"
 #include "util/defer_op.h"
+#include "util/threadpool.h"
 
 namespace doris {
 namespace io {
@@ -59,26 +60,27 @@ void S3FileBuffer::submit() {
         _stream_ptr = std::make_shared<StringViewStream>(_buf.data, _size);
     }
 
-    ExecEnv::GetInstance()->buffered_reader_prefetch_thread_pool()->submit_func(
-            [buf = this->shared_from_this()]() { buf->_on_upload(); });
+    _thread_pool->submit_func([buf = this->shared_from_this()]() { buf->_on_upload(); });
 }
 
-S3FileBufferPool::S3FileBufferPool() {
+void S3FileBufferPool::init(int32_t s3_write_buffer_whole_size, int32_t s3_write_buffer_size,
+                            doris::ThreadPool* thread_pool) {
     // the nums could be one configuration
-    size_t buf_num = config::s3_write_buffer_whole_size / config::s3_write_buffer_size;
-    DCHECK((config::s3_write_buffer_size >= 5 * 1024 * 1024) &&
-           (config::s3_write_buffer_whole_size > config::s3_write_buffer_size));
+    size_t buf_num = s3_write_buffer_whole_size / s3_write_buffer_size;
+    DCHECK((s3_write_buffer_size >= 5 * 1024 * 1024) &&
+           (s3_write_buffer_whole_size > s3_write_buffer_size));
     LOG_INFO("S3 file buffer pool with {} buffers", buf_num);
-    _whole_mem_buffer = std::make_unique<char[]>(config::s3_write_buffer_whole_size);
+    _whole_mem_buffer = std::make_unique<char[]>(s3_write_buffer_whole_size);
     for (size_t i = 0; i < buf_num; i++) {
-        Slice s {_whole_mem_buffer.get() + i * config::s3_write_buffer_size,
-                 static_cast<size_t>(config::s3_write_buffer_size)};
+        Slice s {_whole_mem_buffer.get() + i * s3_write_buffer_size,
+                 static_cast<size_t>(s3_write_buffer_size)};
         _free_raw_buffers.emplace_back(s);
     }
+    _thread_pool = thread_pool;
 }
 
 std::shared_ptr<S3FileBuffer> S3FileBufferPool::allocate(bool reserve) {
-    std::shared_ptr<S3FileBuffer> buf = std::make_shared<S3FileBuffer>();
+    std::shared_ptr<S3FileBuffer> buf = std::make_shared<S3FileBuffer>(_thread_pool);
     // if need reserve then we must ensure return buf with memory preserved
     if (reserve) {
         {
diff --git a/be/src/io/fs/s3_file_write_bufferpool.h b/be/src/io/fs/s3_file_write_bufferpool.h
index 660cbc8e8a..55fa53df42 100644
--- a/be/src/io/fs/s3_file_write_bufferpool.h
+++ b/be/src/io/fs/s3_file_write_bufferpool.h
@@ -31,13 +31,14 @@
 #include "util/slice.h"
 
 namespace doris {
+class ThreadPool;
 namespace io {
 
 // TODO(AlexYue): 1. support write into cache 2. unify write buffer and read buffer
 struct S3FileBuffer : public std::enable_shared_from_this<S3FileBuffer> {
     using Callback = std::function<void()>;
 
-    S3FileBuffer() = default;
+    S3FileBuffer(ThreadPool* pool) { _thread_pool = pool; }
     ~S3FileBuffer() = default;
 
     void rob_buffer(std::shared_ptr<S3FileBuffer>& other) {
@@ -110,13 +111,20 @@ struct S3FileBuffer : public std::enable_shared_from_this<S3FileBuffer> {
     // only served as one reserved buffer
     Slice _buf;
     size_t _append_offset {0};
+    // not owned
+    ThreadPool* _thread_pool = nullptr;
 };
 
 class S3FileBufferPool {
 public:
-    S3FileBufferPool();
+    S3FileBufferPool() = default;
     ~S3FileBufferPool() = default;
 
+    // should be called one and only once
+    // at startup
+    void init(int32_t s3_write_buffer_whole_size, int32_t s3_write_buffer_size,
+              doris::ThreadPool* thread_pool);
+
     static S3FileBufferPool* GetInstance() {
         static S3FileBufferPool _pool;
         return &_pool;
@@ -135,6 +143,8 @@ private:
     std::condition_variable _cv;
     std::unique_ptr<char[]> _whole_mem_buffer;
     std::list<Slice> _free_raw_buffers;
+    // not owned
+    ThreadPool* _thread_pool = nullptr;
 };
 } // namespace io
 } // namespace doris
diff --git a/be/src/service/doris_main.cpp b/be/src/service/doris_main.cpp
index 699f598967..246032ef9a 100644
--- a/be/src/service/doris_main.cpp
+++ b/be/src/service/doris_main.cpp
@@ -54,6 +54,7 @@
 #include "common/signal_handler.h"
 #include "common/status.h"
 #include "io/cache/block/block_file_cache_factory.h"
+#include "io/fs/s3_file_write_bufferpool.h"
 #include "olap/options.h"
 #include "olap/storage_engine.h"
 #include "runtime/exec_env.h"
@@ -432,6 +433,12 @@ int main(int argc, char** argv) {
     doris::ExecEnv::init(exec_env, paths);
     doris::TabletSchemaCache::create_global_schema_cache();
 
+    // init s3 write buffer pool
+    doris::io::S3FileBufferPool* s3_buffer_pool = doris::io::S3FileBufferPool::GetInstance();
+    s3_buffer_pool->init(doris::config::s3_write_buffer_whole_size,
+                         doris::config::s3_write_buffer_size,
+                         exec_env->buffered_reader_prefetch_thread_pool());
+
     // init and open storage engine
     doris::EngineOptions options;
     options.store_paths = paths;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org