You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2023/06/29 11:03:56 UTC
[doris] branch master updated: [feature][fix](fs)(s3)add fs_s3 benchmark tool and fix s3 file writer bug (#20926)
This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 41ccf77c7d [feature][fix](fs)(s3)add fs_s3 benchmark tool and fix s3 file writer bug (#20926)
41ccf77c7d is described below
commit 41ccf77c7d0409fe3053829c66f7c205e4c03c6e
Author: zhangdong <49...@qq.com>
AuthorDate: Thu Jun 29 19:03:49 2023 +0800
[feature][fix](fs)(s3)add fs_s3 benchmark tool and fix s3 file writer bug (#20926)
1.
Fix bug that the field of s3_file_write_bufferpool is not initialized, causing undefined behavior.
2.
add fs_s3 benchmark tool,Reference to the usage of tools https://github.com/apache/doris/pull/20770
And opt the output:
`sh bin/run-fs-benchmark.sh --conf=conf/s3.conf --fs_type=s3 --operation=single_read --threads=1 --iterations=1`
```
------------------------------------------------------------------------------------------------------------------------------
Benchmark Time CPU Iterations UserCounters...
------------------------------------------------------------------------------------------------------------------------------
S3ReadBenchmark/iterations:1/repeats:3/manual_time/threads:1 7366 ms 123 ms 1 ReadRate(B/S)=12.1823M/s ReadTime(S)=7.36572 ReadTotal(B)=89.7314M
S3ReadBenchmark/iterations:1/repeats:3/manual_time/threads:1 6163 ms 116 ms 1 ReadRate(B/S)=14.5597M/s ReadTime(S)=6.16299 ReadTotal(B)=89.7314M
S3ReadBenchmark/iterations:1/repeats:3/manual_time/threads:1 6048 ms 110 ms 1 ReadRate(B/S)=14.8366M/s ReadTime(S)=6.04796 ReadTotal(B)=89.7314M
S3ReadBenchmark/iterations:1/repeats:3/manual_time/threads:1_mean 6526 ms 116 ms 3 ReadRate(B/S)=13.8596M/s ReadTime(S)=6.52556 ReadTotal(B)=89.7314M
S3ReadBenchmark/iterations:1/repeats:3/manual_time/threads:1_median 6163 ms 116 ms 3 ReadRate(B/S)=14.5597M/s ReadTime(S)=6.16299 ReadTotal(B)=89.7314M
S3ReadBenchmark/iterations:1/repeats:3/manual_time/threads:1_stddev 730 ms 6.68 ms 3 ReadRate(B/S)=1.45914M/s ReadTime(S)=0.729876 ReadTotal(B)=0
S3ReadBenchmark/iterations:1/repeats:3/manual_time/threads:1_cv 11.18 % 5.75 % 3 ReadRate(B/S)=10.53% ReadTime(S)=11.18% ReadTotal(B)=0.00%
S3ReadBenchmark/iterations:1/repeats:3/manual_time/threads:1_max 7366 ms 123 ms 3 ReadRate(B/S)=14.8366M/s ReadTime(S)=7.36572 ReadTotal(B)=89.7314M
S3ReadBenchmark/iterations:1/repeats:3/manual_time/threads:1_min 6048 ms 110 ms 3 ReadRate(B/S)=12.1823M/s ReadTime(S)=6.04796 ReadTotal(B)=89.7314M
```
---
be/src/io/fs/benchmark/base_benchmark.h | 117 +++++++++++++++-
be/src/io/fs/benchmark/benchmark_factory.hpp | 14 +-
be/src/io/fs/benchmark/fs_benchmark_tool.cpp | 11 ++
be/src/io/fs/benchmark/hdfs_benchmark.hpp | 166 +++++------------------
be/src/io/fs/benchmark/s3_benchmark.hpp | 192 ++++++++++++++++++++++++---
be/src/io/fs/s3_file_write_bufferpool.cpp | 22 +--
be/src/io/fs/s3_file_write_bufferpool.h | 14 +-
be/src/service/doris_main.cpp | 7 +
8 files changed, 371 insertions(+), 172 deletions(-)
diff --git a/be/src/io/fs/benchmark/base_benchmark.h b/be/src/io/fs/benchmark/base_benchmark.h
index c28ad02de5..41dae7cea2 100644
--- a/be/src/io/fs/benchmark/base_benchmark.h
+++ b/be/src/io/fs/benchmark/base_benchmark.h
@@ -27,6 +27,9 @@
#include <vector>
#include "common/status.h"
+#include "io/fs/file_reader.h"
+#include "io/fs/file_writer.h"
+#include "util/slice.h"
namespace doris::io {
@@ -44,24 +47,22 @@ void bm_log(const std::string& fmt, Args&&... args) {
class BaseBenchmark {
public:
BaseBenchmark(const std::string& name, int threads, int iterations, size_t file_size,
- int repetitions, const std::map<std::string, std::string>& conf_map)
+ const std::map<std::string, std::string>& conf_map)
: _name(name),
_threads(threads),
_iterations(iterations),
_file_size(file_size),
- _repetitions(repetitions),
_conf_map(conf_map) {}
virtual ~BaseBenchmark() = default;
virtual Status init() { return Status::OK(); }
virtual Status run(benchmark::State& state) { return Status::OK(); }
+ void set_repetition(int rep) { _repetitions = rep; }
+
void register_bm() {
auto bm = benchmark::RegisterBenchmark(_name.c_str(), [&](benchmark::State& state) {
- Status st;
- if (state.thread_index() == 0) {
- st = this->init();
- }
+ Status st = this->init();
if (st != Status::OK()) {
bm_log("Benchmark {} init error: {}", _name, st.to_string());
return;
@@ -92,12 +93,114 @@ public:
});
}
+ virtual std::string get_file_path(benchmark::State& state) {
+ std::string base_dir = _conf_map["base_dir"];
+ std::string file_path;
+ if (base_dir.ends_with("/")) {
+ file_path = fmt::format("{}test_{}", base_dir, state.thread_index());
+ } else {
+ file_path = fmt::format("{}/test_{}", base_dir, state.thread_index());
+ }
+ bm_log("file_path: {}", file_path);
+ return file_path;
+ }
+
+ Status read(benchmark::State& state, FileReaderSPtr reader) {
+ bm_log("begin to read {}", _name);
+ size_t buffer_size =
+ _conf_map.contains("buffer_size") ? std::stol(_conf_map["buffer_size"]) : 1000000L;
+ std::vector<char> buffer;
+ buffer.resize(buffer_size);
+ doris::Slice data = {buffer.data(), buffer.size()};
+ size_t offset = 0;
+ size_t bytes_read = 0;
+
+ size_t read_size = reader->size();
+ if (_file_size > 0) {
+ read_size = std::min(read_size, _file_size);
+ }
+ long remaining_size = read_size;
+
+ Status status;
+ auto start = std::chrono::high_resolution_clock::now();
+ while (remaining_size > 0) {
+ bytes_read = 0;
+ size_t size = std::min(buffer_size, (size_t)remaining_size);
+ data.size = size;
+ status = reader->read_at(offset, data, &bytes_read);
+ if (status != Status::OK() || bytes_read < 0) {
+ bm_log("reader read_at error: {}", status.to_string());
+ break;
+ }
+ if (bytes_read == 0) { // EOF
+ break;
+ }
+ offset += bytes_read;
+ remaining_size -= bytes_read;
+ }
+ auto end = std::chrono::high_resolution_clock::now();
+ auto elapsed_seconds =
+ std::chrono::duration_cast<std::chrono::duration<double>>(end - start);
+ state.SetIterationTime(elapsed_seconds.count());
+ state.counters["ReadRate(B/S)"] =
+ benchmark::Counter(read_size, benchmark::Counter::kIsRate);
+ state.counters["ReadTotal(B)"] = read_size;
+ state.counters["ReadTime(S)"] = elapsed_seconds.count();
+
+ if (status.ok() && reader != nullptr) {
+ status = reader->close();
+ }
+ bm_log("finish to read {}, size {}, seconds: {}, status: {}", _name, read_size,
+ elapsed_seconds.count(), status);
+ return status;
+ }
+
+ Status write(benchmark::State& state, FileWriter* writer) {
+ bm_log("begin to write {}, size: {}", _name, _file_size);
+ size_t write_size = _file_size;
+ size_t buffer_size =
+ _conf_map.contains("buffer_size") ? std::stol(_conf_map["buffer_size"]) : 1000000L;
+ long remaining_size = write_size;
+ std::vector<char> buffer;
+ buffer.resize(buffer_size);
+ doris::Slice data = {buffer.data(), buffer.size()};
+
+ Status status;
+ auto start = std::chrono::high_resolution_clock::now();
+ while (remaining_size > 0) {
+ size_t size = std::min(buffer_size, (size_t)remaining_size);
+ data.size = size;
+ status = writer->append(data);
+ if (status != Status::OK()) {
+ bm_log("writer append error: {}", status.to_string());
+ break;
+ }
+ remaining_size -= size;
+ }
+ if (status.ok() && writer != nullptr) {
+ status = writer->close();
+ }
+
+ auto end = std::chrono::high_resolution_clock::now();
+ auto elapsed_seconds =
+ std::chrono::duration_cast<std::chrono::duration<double>>(end - start);
+ state.SetIterationTime(elapsed_seconds.count());
+ state.counters["WriteRate(B/S)"] =
+ benchmark::Counter(write_size, benchmark::Counter::kIsRate);
+ state.counters["WriteTotal(B)"] = write_size;
+ state.counters["WriteTime(S)"] = elapsed_seconds.count();
+
+ bm_log("finish to write {}, size: {}, seconds: {}, status: {}", _name, write_size,
+ elapsed_seconds.count(), status);
+ return status;
+ }
+
protected:
std::string _name;
int _threads;
int _iterations;
size_t _file_size;
- int _repetitions = 1;
+ int _repetitions = 3;
std::map<std::string, std::string> _conf_map;
};
diff --git a/be/src/io/fs/benchmark/benchmark_factory.hpp b/be/src/io/fs/benchmark/benchmark_factory.hpp
index 3e8c9314ca..0b8af3b96b 100644
--- a/be/src/io/fs/benchmark/benchmark_factory.hpp
+++ b/be/src/io/fs/benchmark/benchmark_factory.hpp
@@ -38,8 +38,18 @@ Status BenchmarkFactory::getBm(const std::string fs_type, const std::string op_t
const std::map<std::string, std::string>& conf_map,
BaseBenchmark** bm) {
if (fs_type == "s3") {
- if (op_type == "read") {
- *bm = new S3ReadBenchmark(threads, iterations, file_size, conf_map);
+ if (op_type == "create_write") {
+ *bm = new S3CreateWriteBenchmark(threads, iterations, file_size, conf_map);
+ } else if (op_type == "open_read") {
+ *bm = new S3OpenReadBenchmark(threads, iterations, file_size, conf_map);
+ } else if (op_type == "single_read") {
+ *bm = new S3SingleReadBenchmark(threads, iterations, file_size, conf_map);
+ } else if (op_type == "rename") {
+ *bm = new S3RenameBenchmark(threads, iterations, file_size, conf_map);
+ } else if (op_type == "exists") {
+ *bm = new S3ExistsBenchmark(threads, iterations, file_size, conf_map);
+ } else if (op_type == "list") {
+ *bm = new S3ListBenchmark(threads, iterations, file_size, conf_map);
} else {
return Status::Error<ErrorCode::INVALID_ARGUMENT>(
"unknown params: fs_type: {}, op_type: {}, iterations: {}", fs_type, op_type,
diff --git a/be/src/io/fs/benchmark/fs_benchmark_tool.cpp b/be/src/io/fs/benchmark/fs_benchmark_tool.cpp
index a5be5db80a..50085ae1e7 100644
--- a/be/src/io/fs/benchmark/fs_benchmark_tool.cpp
+++ b/be/src/io/fs/benchmark/fs_benchmark_tool.cpp
@@ -20,6 +20,8 @@
#include <fstream>
#include "io/fs/benchmark/benchmark_factory.hpp"
+#include "io/fs/s3_file_write_bufferpool.h"
+#include "util/threadpool.h"
DEFINE_string(fs_type, "hdfs", "Supported File System: s3, hdfs");
DEFINE_string(operation, "create_write",
@@ -107,6 +109,15 @@ int main(int argc, char** argv) {
return 1;
}
+ // init s3 write buffer pool
+ std::unique_ptr<doris::ThreadPool> buffered_reader_prefetch_thread_pool;
+ doris::ThreadPoolBuilder("BufferedReaderPrefetchThreadPool")
+ .set_min_threads(16)
+ .set_max_threads(64)
+ .build(&buffered_reader_prefetch_thread_pool);
+ doris::io::S3FileBufferPool* s3_buffer_pool = doris::io::S3FileBufferPool::GetInstance();
+ s3_buffer_pool->init(524288000, 5242880, buffered_reader_prefetch_thread_pool.get());
+
try {
doris::io::MultiBenchmark multi_bm(FLAGS_fs_type, FLAGS_operation, std::stoi(FLAGS_threads),
std::stoi(FLAGS_iterations), std::stol(FLAGS_file_size),
diff --git a/be/src/io/fs/benchmark/hdfs_benchmark.hpp b/be/src/io/fs/benchmark/hdfs_benchmark.hpp
index 1307ddc95a..b508e14a24 100644
--- a/be/src/io/fs/benchmark/hdfs_benchmark.hpp
+++ b/be/src/io/fs/benchmark/hdfs_benchmark.hpp
@@ -33,75 +33,30 @@ class HdfsOpenReadBenchmark : public BaseBenchmark {
public:
HdfsOpenReadBenchmark(int threads, int iterations, size_t file_size,
const std::map<std::string, std::string>& conf_map)
- : BaseBenchmark("HdfsReadBenchmark", threads, iterations, file_size, 3, conf_map) {}
+ : BaseBenchmark("HdfsReadBenchmark", threads, iterations, file_size, conf_map) {}
virtual ~HdfsOpenReadBenchmark() = default;
- Status init() override { return Status::OK(); }
-
- virtual std::string get_file_path(benchmark::State& state) {
- std::string base_dir = _conf_map["base_dir"];
- auto file_path = fmt::format("{}/test_{}", base_dir, state.thread_index());
- bm_log("file_path: {}", file_path);
- return file_path;
+ virtual void set_default_file_size() {
+ if (_file_size <= 0) {
+ _file_size = 10 * 1024 * 1024; // default 10MB
+ }
}
Status run(benchmark::State& state) override {
+ auto file_path = get_file_path(state);
+
+ auto start = std::chrono::high_resolution_clock::now();
std::shared_ptr<io::FileSystem> fs;
io::FileReaderSPtr reader;
- bm_log("begin to init {}", _name);
- size_t buffer_size =
- _conf_map.contains("buffer_size") ? std::stol(_conf_map["buffer_size"]) : 1000000L;
io::FileReaderOptions reader_opts = FileFactory::get_reader_options(nullptr);
THdfsParams hdfs_params = parse_properties(_conf_map);
-
- auto file_path = get_file_path(state);
RETURN_IF_ERROR(
FileFactory::create_hdfs_reader(hdfs_params, file_path, &fs, &reader, reader_opts));
- bm_log("finish to init {}", _name);
-
- bm_log("begin to run {}", _name);
- Status status;
- std::vector<char> buffer;
- buffer.resize(buffer_size);
- doris::Slice data = {buffer.data(), buffer.size()};
- size_t offset = 0;
- size_t bytes_read = 0;
-
- size_t read_size = reader->size();
- if (_file_size > 0) {
- read_size = std::min(read_size, _file_size);
- }
- long remaining_size = read_size;
-
- auto start = std::chrono::high_resolution_clock::now();
- while (remaining_size > 0) {
- bytes_read = 0;
- size_t size = std::min(buffer_size, (size_t)remaining_size);
- data.size = size;
- status = reader->read_at(offset, data, &bytes_read);
- if (status != Status::OK() || bytes_read < 0) {
- bm_log("reader read_at error: {}", status.to_string());
- break;
- }
- if (bytes_read == 0) { // EOF
- break;
- }
- offset += bytes_read;
- remaining_size -= bytes_read;
- }
- bm_log("finish to run {}", _name);
auto end = std::chrono::high_resolution_clock::now();
-
auto elapsed_seconds =
std::chrono::duration_cast<std::chrono::duration<double>>(end - start);
-
- state.SetIterationTime(elapsed_seconds.count());
- state.counters["ReadRate"] = benchmark::Counter(read_size, benchmark::Counter::kIsRate);
-
- if (reader != nullptr) {
- reader->close();
- }
- return status;
+ state.counters["OpenReaderTime(S)"] = elapsed_seconds.count();
+ return read(state, reader);
}
};
@@ -113,6 +68,10 @@ public:
: HdfsOpenReadBenchmark(threads, iterations, file_size, conf_map) {}
virtual ~HdfsSingleReadBenchmark() = default;
+ virtual void set_default_file_size() override {
+ // do nothing, default is 0, which means it will read the whole file
+ }
+
virtual std::string get_file_path(benchmark::State& state) override {
std::string file_path = _conf_map["file_path"];
bm_log("file_path: {}", file_path);
@@ -124,56 +83,20 @@ class HdfsCreateWriteBenchmark : public BaseBenchmark {
public:
HdfsCreateWriteBenchmark(int threads, int iterations, size_t file_size,
const std::map<std::string, std::string>& conf_map)
- : BaseBenchmark("HdfsCreateWriteBenchmark", threads, iterations, file_size, 3,
- conf_map) {}
+ : BaseBenchmark("HdfsCreateWriteBenchmark", threads, iterations, file_size, conf_map) {}
virtual ~HdfsCreateWriteBenchmark() = default;
- Status init() override { return Status::OK(); }
-
Status run(benchmark::State& state) override {
- bm_log("begin to run {}", _name);
- std::string base_dir = _conf_map["base_dir"];
- io::FileReaderOptions reader_opts = FileFactory::get_reader_options(nullptr);
- THdfsParams hdfs_params = parse_properties(_conf_map);
- auto file_path = fmt::format("{}/test_{}", base_dir, state.thread_index());
- bm_log("file_path: {}", file_path);
-
- auto start = std::chrono::high_resolution_clock::now();
+ auto file_path = get_file_path(state);
+ if (_file_size <= 0) {
+ _file_size = 10 * 1024 * 1024; // default 10MB
+ }
std::shared_ptr<io::HdfsFileSystem> fs;
io::FileWriterPtr writer;
+ THdfsParams hdfs_params = parse_properties(_conf_map);
RETURN_IF_ERROR(io::HdfsFileSystem::create(hdfs_params, "", &fs));
RETURN_IF_ERROR(fs->create_file(file_path, &writer));
- Status status;
- size_t write_size = _file_size;
- size_t buffer_size =
- _conf_map.contains("buffer_size") ? std::stol(_conf_map["buffer_size"]) : 1000000L;
- long remaining_size = write_size;
- std::vector<char> buffer;
- buffer.resize(buffer_size);
- doris::Slice data = {buffer.data(), buffer.size()};
- while (remaining_size > 0) {
- size_t size = std::min(buffer_size, (size_t)remaining_size);
- data.size = size;
- status = writer->append(data);
- if (status != Status::OK()) {
- bm_log("writer append error: {}", status.to_string());
- break;
- }
- remaining_size -= size;
- }
- auto end = std::chrono::high_resolution_clock::now();
- auto elapsed_seconds =
- std::chrono::duration_cast<std::chrono::duration<double>>(end - start);
-
- state.SetIterationTime(elapsed_seconds.count());
- bm_log("finish to run {}", _name);
-
- state.counters["WriteRate"] = benchmark::Counter(write_size, benchmark::Counter::kIsRate);
-
- if (writer != nullptr) {
- writer->close();
- }
- return status;
+ return write(state, writer.get());
}
};
@@ -181,75 +104,56 @@ class HdfsRenameBenchmark : public BaseBenchmark {
public:
HdfsRenameBenchmark(int threads, int iterations, size_t file_size,
const std::map<std::string, std::string>& conf_map)
- : BaseBenchmark("HdfsRenameBenchmark", threads, 1, file_size, 1, conf_map) {}
+ : BaseBenchmark("HdfsRenameBenchmark", threads, iterations, file_size, conf_map) {
+ // rename can only do once
+ set_repetition(1);
+ }
virtual ~HdfsRenameBenchmark() = default;
- Status init() override { return Status::OK(); }
-
Status run(benchmark::State& state) override {
- bm_log("begin to run {}", _name);
- std::string base_dir = _conf_map["base_dir"];
- io::FileReaderOptions reader_opts = FileFactory::get_reader_options(nullptr);
+ auto file_path = get_file_path(state);
+ auto new_file_path = file_path + "_new";
THdfsParams hdfs_params = parse_properties(_conf_map);
- auto file_path = fmt::format("{}/test_{}", base_dir, state.thread_index());
- auto new_file_path = fmt::format("{}/test_{}_new", base_dir, state.thread_index());
- bm_log("file_path: {}", file_path);
-
- auto start = std::chrono::high_resolution_clock::now();
std::shared_ptr<io::HdfsFileSystem> fs;
- io::FileWriterPtr writer;
RETURN_IF_ERROR(io::HdfsFileSystem::create(hdfs_params, "", &fs));
+
+ auto start = std::chrono::high_resolution_clock::now();
RETURN_IF_ERROR(fs->rename(file_path, new_file_path));
auto end = std::chrono::high_resolution_clock::now();
auto elapsed_seconds =
std::chrono::duration_cast<std::chrono::duration<double>>(end - start);
-
state.SetIterationTime(elapsed_seconds.count());
- bm_log("finish to run {}", _name);
-
state.counters["RenameCost"] =
benchmark::Counter(1, benchmark::Counter::kIsRate | benchmark::Counter::kInvert);
- if (writer != nullptr) {
- writer->close();
- }
return Status::OK();
}
-
-private:
};
class HdfsExistsBenchmark : public BaseBenchmark {
public:
HdfsExistsBenchmark(int threads, int iterations, size_t file_size,
const std::map<std::string, std::string>& conf_map)
- : BaseBenchmark("HdfsExistsBenchmark", threads, iterations, file_size, 3, conf_map) {}
+ : BaseBenchmark("HdfsExistsBenchmark", threads, iterations, file_size, conf_map) {}
virtual ~HdfsExistsBenchmark() = default;
- Status init() override { return Status::OK(); }
-
Status run(benchmark::State& state) override {
- bm_log("begin to run {}", _name);
- std::string base_dir = _conf_map["base_dir"];
- io::FileReaderOptions reader_opts = FileFactory::get_reader_options(nullptr);
- THdfsParams hdfs_params = parse_properties(_conf_map);
- auto file_path = fmt::format("{}/test_{}", base_dir, state.thread_index());
- bm_log("file_path: {}", file_path);
+ auto file_path = get_file_path(state);
- auto start = std::chrono::high_resolution_clock::now();
std::shared_ptr<io::HdfsFileSystem> fs;
+ THdfsParams hdfs_params = parse_properties(_conf_map);
RETURN_IF_ERROR(io::HdfsFileSystem::create(hdfs_params, "", &fs));
+
+ auto start = std::chrono::high_resolution_clock::now();
bool res = false;
RETURN_IF_ERROR(fs->exists(file_path, &res));
auto end = std::chrono::high_resolution_clock::now();
auto elapsed_seconds =
std::chrono::duration_cast<std::chrono::duration<double>>(end - start);
-
state.SetIterationTime(elapsed_seconds.count());
- bm_log("finish to run {}", _name);
-
state.counters["ExistsCost"] =
benchmark::Counter(1, benchmark::Counter::kIsRate | benchmark::Counter::kInvert);
+
return Status::OK();
}
};
diff --git a/be/src/io/fs/benchmark/s3_benchmark.hpp b/be/src/io/fs/benchmark/s3_benchmark.hpp
index 7e958cefdb..c2ee8ddd99 100644
--- a/be/src/io/fs/benchmark/s3_benchmark.hpp
+++ b/be/src/io/fs/benchmark/s3_benchmark.hpp
@@ -19,41 +19,193 @@
#include "io/file_factory.h"
#include "io/fs/benchmark/base_benchmark.h"
+#include "io/fs/file_writer.h"
#include "io/fs/s3_file_reader.h"
#include "io/fs/s3_file_system.h"
+#include "runtime/exec_env.h"
+#include "util/s3_uri.h"
#include "util/slice.h"
namespace doris::io {
-class S3ReadBenchmark : public BaseBenchmark {
+class S3Benchmark : public BaseBenchmark {
public:
- S3ReadBenchmark(int threads, int iterations, size_t file_size,
- const std::map<std::string, std::string>& conf_map)
- : BaseBenchmark("S3ReadBenchmark", threads, iterations, file_size, 3, conf_map),
- _result(buffer, 128) {}
- virtual ~S3ReadBenchmark() = default;
+ S3Benchmark(const std::string& name, int threads, int iterations, size_t file_size,
+ const std::map<std::string, std::string>& conf_map)
+ : BaseBenchmark(name, threads, iterations, file_size, conf_map) {}
+ virtual ~S3Benchmark() = default;
- Status init() override {
- bm_log("begin to init {}", _name);
- std::string file_path = _conf_map["file"];
- io::FileReaderOptions reader_opts = FileFactory::get_reader_options(nullptr);
+ Status get_fs(const std::string& path) {
+ S3URI s3_uri(path);
+ RETURN_IF_ERROR(s3_uri.parse());
RETURN_IF_ERROR(
- FileFactory::create_s3_reader(_conf_map, file_path, &_fs, &_reader, reader_opts));
- bm_log("finish to init {}", _name);
+ S3ClientFactory::convert_properties_to_s3_conf(_conf_map, s3_uri, &_s3_conf));
+ return io::S3FileSystem::create(std::move(_s3_conf), "", &_fs);
+ }
+
+protected:
+ doris::S3Conf _s3_conf;
+ std::shared_ptr<io::S3FileSystem> _fs;
+};
+
+class S3OpenReadBenchmark : public S3Benchmark {
+public:
+ S3OpenReadBenchmark(int threads, int iterations, size_t file_size,
+ const std::map<std::string, std::string>& conf_map)
+ : S3Benchmark("S3ReadBenchmark", threads, iterations, file_size, conf_map) {}
+ virtual ~S3OpenReadBenchmark() = default;
+
+ virtual void set_default_file_size() {
+ if (_file_size <= 0) {
+ _file_size = 10 * 1024 * 1024; // default 10MB
+ }
+ }
+
+ Status run(benchmark::State& state) override {
+ auto file_path = get_file_path(state);
+ RETURN_IF_ERROR(get_fs(file_path));
+
+ io::FileReaderSPtr reader;
+ io::FileReaderOptions reader_opts = FileFactory::get_reader_options(nullptr);
+ RETURN_IF_ERROR(FileFactory::create_s3_reader(
+ _conf_map, file_path, reinterpret_cast<std::shared_ptr<io::FileSystem>*>(&_fs),
+ &reader, reader_opts));
+
+ return read(state, reader);
+ }
+};
+
+// Read a single specified file
+class S3SingleReadBenchmark : public S3OpenReadBenchmark {
+public:
+ S3SingleReadBenchmark(int threads, int iterations, size_t file_size,
+ const std::map<std::string, std::string>& conf_map)
+ : S3OpenReadBenchmark(threads, iterations, file_size, conf_map) {}
+ virtual ~S3SingleReadBenchmark() = default;
+
+ virtual void set_default_file_size() override {}
+
+ virtual std::string get_file_path(benchmark::State& state) override {
+ std::string file_path = _conf_map["file_path"];
+ bm_log("file_path: {}", file_path);
+ return file_path;
+ }
+};
+
+class S3CreateWriteBenchmark : public S3Benchmark {
+public:
+ S3CreateWriteBenchmark(int threads, int iterations, size_t file_size,
+ const std::map<std::string, std::string>& conf_map)
+ : S3Benchmark("S3CreateWriteBenchmark", threads, iterations, file_size, conf_map) {}
+ virtual ~S3CreateWriteBenchmark() = default;
+
+ Status run(benchmark::State& state) override {
+ auto file_path = get_file_path(state);
+ if (_file_size <= 0) {
+ _file_size = 10 * 1024 * 1024; // default 10MB
+ }
+ RETURN_IF_ERROR(get_fs(file_path));
+
+ io::FileWriterPtr writer;
+ RETURN_IF_ERROR(_fs->create_file(file_path, &writer));
+ return write(state, writer.get());
+ }
+};
+
+class S3ListBenchmark : public S3Benchmark {
+public:
+ S3ListBenchmark(int threads, int iterations, size_t file_size,
+ const std::map<std::string, std::string>& conf_map)
+ : S3Benchmark("S3ListBenchmark", threads, iterations, file_size, conf_map) {}
+ virtual ~S3ListBenchmark() = default;
+
+ virtual std::string get_file_path(benchmark::State& state) override {
+ return _conf_map["base_dir"];
+ }
+
+ Status run(benchmark::State& state) override {
+ auto file_path = get_file_path(state);
+ RETURN_IF_ERROR(get_fs(file_path));
+
+ auto start = std::chrono::high_resolution_clock::now();
+ std::vector<FileInfo> files;
+ bool exists = true;
+ RETURN_IF_ERROR(_fs->list(file_path, true, &files, &exists));
+ auto end = std::chrono::high_resolution_clock::now();
+ auto elapsed_seconds =
+ std::chrono::duration_cast<std::chrono::duration<double>>(end - start);
+ state.SetIterationTime(elapsed_seconds.count());
+ state.counters["ListCost"] =
+ benchmark::Counter(1, benchmark::Counter::kIsRate | benchmark::Counter::kInvert);
+
+ std::stringstream ss;
+ int i = 0;
+ for (auto& file_info : files) {
+ if (i > 2) {
+ break;
+ }
+ ++i;
+ ss << "[" << file_info.file_name << ", " << file_info.file_size << ", "
+ << file_info.is_file << "] ";
+ }
+ bm_log("list files: {}", ss.str());
+
return Status::OK();
}
+};
+
+class S3RenameBenchmark : public S3Benchmark {
+public:
+ S3RenameBenchmark(int threads, int iterations, size_t file_size,
+ const std::map<std::string, std::string>& conf_map)
+ : S3Benchmark("S3RenameBenchmark", threads, iterations, file_size, conf_map) {
+ // rename can only do once
+ set_repetition(1);
+ }
+
+ virtual ~S3RenameBenchmark() = default;
Status run(benchmark::State& state) override {
- return _reader->read_at(0, _result, &_bytes_read);
+ auto file_path = get_file_path(state);
+ auto new_file_path = file_path + "_new";
+ RETURN_IF_ERROR(get_fs(file_path));
+
+ auto start = std::chrono::high_resolution_clock::now();
+ RETURN_IF_ERROR(_fs->rename(file_path, new_file_path));
+ auto end = std::chrono::high_resolution_clock::now();
+ auto elapsed_seconds =
+ std::chrono::duration_cast<std::chrono::duration<double>>(end - start);
+ state.SetIterationTime(elapsed_seconds.count());
+ state.counters["RenameCost"] =
+ benchmark::Counter(1, benchmark::Counter::kIsRate | benchmark::Counter::kInvert);
+
+ return Status::OK();
}
+};
-private:
- doris::S3Conf _s3_conf;
- std::shared_ptr<io::FileSystem> _fs;
- io::FileReaderSPtr _reader;
- char buffer[128];
- doris::Slice _result;
- size_t _bytes_read = 0;
+class S3ExistsBenchmark : public S3Benchmark {
+public:
+ S3ExistsBenchmark(int threads, int iterations, size_t file_size,
+ const std::map<std::string, std::string>& conf_map)
+ : S3Benchmark("S3ExistsBenchmark", threads, iterations, file_size, conf_map) {}
+ virtual ~S3ExistsBenchmark() = default;
+
+ Status run(benchmark::State& state) override {
+ auto file_path = get_file_path(state);
+ RETURN_IF_ERROR(get_fs(file_path));
+
+ auto start = std::chrono::high_resolution_clock::now();
+ bool res = false;
+ RETURN_IF_ERROR(_fs->exists(file_path, &res));
+ auto end = std::chrono::high_resolution_clock::now();
+ auto elapsed_seconds =
+ std::chrono::duration_cast<std::chrono::duration<double>>(end - start);
+ state.SetIterationTime(elapsed_seconds.count());
+ state.counters["ExistsCost"] =
+ benchmark::Counter(1, benchmark::Counter::kIsRate | benchmark::Counter::kInvert);
+
+ return Status::OK();
+ }
};
} // namespace doris::io
diff --git a/be/src/io/fs/s3_file_write_bufferpool.cpp b/be/src/io/fs/s3_file_write_bufferpool.cpp
index c6ec1a8b60..48887f9c6e 100644
--- a/be/src/io/fs/s3_file_write_bufferpool.cpp
+++ b/be/src/io/fs/s3_file_write_bufferpool.cpp
@@ -24,6 +24,7 @@
#include "io/fs/s3_common.h"
#include "runtime/exec_env.h"
#include "util/defer_op.h"
+#include "util/threadpool.h"
namespace doris {
namespace io {
@@ -59,26 +60,27 @@ void S3FileBuffer::submit() {
_stream_ptr = std::make_shared<StringViewStream>(_buf.data, _size);
}
- ExecEnv::GetInstance()->buffered_reader_prefetch_thread_pool()->submit_func(
- [buf = this->shared_from_this()]() { buf->_on_upload(); });
+ _thread_pool->submit_func([buf = this->shared_from_this()]() { buf->_on_upload(); });
}
-S3FileBufferPool::S3FileBufferPool() {
+void S3FileBufferPool::init(int32_t s3_write_buffer_whole_size, int32_t s3_write_buffer_size,
+ doris::ThreadPool* thread_pool) {
// the nums could be one configuration
- size_t buf_num = config::s3_write_buffer_whole_size / config::s3_write_buffer_size;
- DCHECK((config::s3_write_buffer_size >= 5 * 1024 * 1024) &&
- (config::s3_write_buffer_whole_size > config::s3_write_buffer_size));
+ size_t buf_num = s3_write_buffer_whole_size / s3_write_buffer_size;
+ DCHECK((s3_write_buffer_size >= 5 * 1024 * 1024) &&
+ (s3_write_buffer_whole_size > s3_write_buffer_size));
LOG_INFO("S3 file buffer pool with {} buffers", buf_num);
- _whole_mem_buffer = std::make_unique<char[]>(config::s3_write_buffer_whole_size);
+ _whole_mem_buffer = std::make_unique<char[]>(s3_write_buffer_whole_size);
for (size_t i = 0; i < buf_num; i++) {
- Slice s {_whole_mem_buffer.get() + i * config::s3_write_buffer_size,
- static_cast<size_t>(config::s3_write_buffer_size)};
+ Slice s {_whole_mem_buffer.get() + i * s3_write_buffer_size,
+ static_cast<size_t>(s3_write_buffer_size)};
_free_raw_buffers.emplace_back(s);
}
+ _thread_pool = thread_pool;
}
std::shared_ptr<S3FileBuffer> S3FileBufferPool::allocate(bool reserve) {
- std::shared_ptr<S3FileBuffer> buf = std::make_shared<S3FileBuffer>();
+ std::shared_ptr<S3FileBuffer> buf = std::make_shared<S3FileBuffer>(_thread_pool);
// if need reserve then we must ensure return buf with memory preserved
if (reserve) {
{
diff --git a/be/src/io/fs/s3_file_write_bufferpool.h b/be/src/io/fs/s3_file_write_bufferpool.h
index 660cbc8e8a..55fa53df42 100644
--- a/be/src/io/fs/s3_file_write_bufferpool.h
+++ b/be/src/io/fs/s3_file_write_bufferpool.h
@@ -31,13 +31,14 @@
#include "util/slice.h"
namespace doris {
+class ThreadPool;
namespace io {
// TODO(AlexYue): 1. support write into cache 2. unify write buffer and read buffer
struct S3FileBuffer : public std::enable_shared_from_this<S3FileBuffer> {
using Callback = std::function<void()>;
- S3FileBuffer() = default;
+ S3FileBuffer(ThreadPool* pool) { _thread_pool = pool; }
~S3FileBuffer() = default;
void rob_buffer(std::shared_ptr<S3FileBuffer>& other) {
@@ -110,13 +111,20 @@ struct S3FileBuffer : public std::enable_shared_from_this<S3FileBuffer> {
// only served as one reserved buffer
Slice _buf;
size_t _append_offset {0};
+ // not owned
+ ThreadPool* _thread_pool = nullptr;
};
class S3FileBufferPool {
public:
- S3FileBufferPool();
+ S3FileBufferPool() = default;
~S3FileBufferPool() = default;
+ // should be called one and only once
+ // at startup
+ void init(int32_t s3_write_buffer_whole_size, int32_t s3_write_buffer_size,
+ doris::ThreadPool* thread_pool);
+
static S3FileBufferPool* GetInstance() {
static S3FileBufferPool _pool;
return &_pool;
@@ -135,6 +143,8 @@ private:
std::condition_variable _cv;
std::unique_ptr<char[]> _whole_mem_buffer;
std::list<Slice> _free_raw_buffers;
+ // not owned
+ ThreadPool* _thread_pool = nullptr;
};
} // namespace io
} // namespace doris
diff --git a/be/src/service/doris_main.cpp b/be/src/service/doris_main.cpp
index 699f598967..246032ef9a 100644
--- a/be/src/service/doris_main.cpp
+++ b/be/src/service/doris_main.cpp
@@ -54,6 +54,7 @@
#include "common/signal_handler.h"
#include "common/status.h"
#include "io/cache/block/block_file_cache_factory.h"
+#include "io/fs/s3_file_write_bufferpool.h"
#include "olap/options.h"
#include "olap/storage_engine.h"
#include "runtime/exec_env.h"
@@ -432,6 +433,12 @@ int main(int argc, char** argv) {
doris::ExecEnv::init(exec_env, paths);
doris::TabletSchemaCache::create_global_schema_cache();
+ // init s3 write buffer pool
+ doris::io::S3FileBufferPool* s3_buffer_pool = doris::io::S3FileBufferPool::GetInstance();
+ s3_buffer_pool->init(doris::config::s3_write_buffer_whole_size,
+ doris::config::s3_write_buffer_size,
+ exec_env->buffered_reader_prefetch_thread_pool());
+
// init and open storage engine
doris::EngineOptions options;
options.store_paths = paths;
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org