You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2023/06/25 01:35:34 UTC
[doris] branch master updated: [Feature](multi-catalog) Add hdfs benchmark tools. (#21074)
This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new d49c412c59 [Feature](multi-catalog) Add hdfs benchmark tools. (#21074)
d49c412c59 is described below
commit d49c412c59aa94e3c0e5671e7c4f9cb676712542
Author: Qi Chen <ka...@gmail.com>
AuthorDate: Sun Jun 25 09:35:27 2023 +0800
[Feature](multi-catalog) Add hdfs benchmark tools. (#21074)
---
be/src/io/fs/benchmark/base_benchmark.h | 49 ++--
be/src/io/fs/benchmark/benchmark_factory.hpp | 61 ++++-
be/src/io/fs/benchmark/fs_benchmark_tool.cpp | 22 +-
be/src/io/fs/benchmark/hdfs_benchmark.hpp | 299 ++++++++++++++++++++++++
be/src/io/fs/benchmark/s3_benchmark.hpp | 10 +-
bin/run-fs-benchmark.sh | 331 +++++++++++++++++++++++++++
6 files changed, 741 insertions(+), 31 deletions(-)
diff --git a/be/src/io/fs/benchmark/base_benchmark.h b/be/src/io/fs/benchmark/base_benchmark.h
index bb1c00233f..bcc8ed284c 100644
--- a/be/src/io/fs/benchmark/base_benchmark.h
+++ b/be/src/io/fs/benchmark/base_benchmark.h
@@ -43,44 +43,61 @@ void bm_log(const std::string& fmt, Args&&... args) {
class BaseBenchmark {
public:
- BaseBenchmark(const std::string& name, int iterations,
- const std::map<std::string, std::string>& conf_map)
- : _name(name), _iterations(iterations), _conf_map(conf_map) {}
+ BaseBenchmark(const std::string& name, int threads, int iterations, size_t file_size,
+ int repetitions, const std::map<std::string, std::string>& conf_map)
+ : _name(name),
+ _threads(threads),
+ _iterations(iterations),
+ _file_size(file_size),
+ _repetitions(repetitions),
+ _conf_map(conf_map) {}
virtual ~BaseBenchmark() = default;
virtual Status init() { return Status::OK(); }
- virtual Status run() { return Status::OK(); }
+ virtual Status run(benchmark::State& state) { return Status::OK(); }
void register_bm() {
auto bm = benchmark::RegisterBenchmark(_name.c_str(), [&](benchmark::State& state) {
- // first turn will use more time
Status st;
- st = this->init();
- if (!st) {
- std::cerr << "failed to init. bm: " << _name << ", err: " << st;
- return;
+ if (state.thread_index() == 0) {
+ st = this->init();
}
- st = this->run();
- if (!st) {
- std::cerr << "failed to run at first time. bm: " << _name << ", err: " << st;
+ if (st != Status::OK()) {
+ bm_log("Benchmark {} init error: {}", _name, st.to_string());
return;
}
for (auto _ : state) {
- state.PauseTiming();
- this->init();
- state.ResumeTiming();
- this->run();
+ st = this->run(state);
+ if (st != Status::OK()) {
+ bm_log("Benchmark {} run error: {}", _name, st.to_string());
+ return;
+ }
}
});
+ if (_threads != 0) {
+ bm->Threads(_threads);
+ }
if (_iterations != 0) {
bm->Iterations(_iterations);
}
+ bm->Repetitions(_repetitions);
+
bm->Unit(benchmark::kMillisecond);
+ bm->UseManualTime();
+ bm->ComputeStatistics("max", [](const std::vector<double>& v) -> double {
+ return *(std::max_element(std::begin(v), std::end(v)));
+ });
+ bm->ComputeStatistics("min", [](const std::vector<double>& v) -> double {
+ return *(std::min_element(std::begin(v), std::end(v)));
+ });
}
protected:
std::string _name;
+ int _threads;
int _iterations;
+ size_t _file_size;
+ int _repetitions = 3;
std::map<std::string, std::string> _conf_map;
};
diff --git a/be/src/io/fs/benchmark/benchmark_factory.hpp b/be/src/io/fs/benchmark/benchmark_factory.hpp
index 3f48bd16ce..bc73f904be 100644
--- a/be/src/io/fs/benchmark/benchmark_factory.hpp
+++ b/be/src/io/fs/benchmark/benchmark_factory.hpp
@@ -21,23 +21,43 @@
#include <string>
#include <vector>
+#include "io/fs/benchmark/hdfs_benchmark.hpp"
#include "io/fs/benchmark/s3_benchmark.hpp"
namespace doris::io {
class BenchmarkFactory {
public:
- static Status getBm(const std::string fs_type, const std::string op_type, int64_t iterations,
+ static Status getBm(const std::string fs_type, const std::string op_type, int64_t threads,
+ int64_t iterations, size_t file_size,
const std::map<std::string, std::string>& conf_map, BaseBenchmark** bm);
};
Status BenchmarkFactory::getBm(const std::string fs_type, const std::string op_type,
- int64_t iterations,
+ int64_t threads, int64_t iterations, size_t file_size,
const std::map<std::string, std::string>& conf_map,
BaseBenchmark** bm) {
if (fs_type == "s3") {
if (op_type == "read") {
- *bm = new S3ReadBenchmark(iterations, conf_map);
+ *bm = new S3ReadBenchmark(threads, iterations, file_size, conf_map);
+ } else {
+ return Status::Error<ErrorCode::INVALID_ARGUMENT>(
+ "unknown params: fs_type: {}, op_type: {}, iterations: {}", fs_type, op_type,
+ iterations);
+ }
+ } else if (fs_type == "hdfs") {
+ if (op_type == "create_write") {
+ *bm = new HdfsCreateWriteBenchmark(threads, iterations, file_size, conf_map);
+ } else if (op_type == "open_read") {
+ *bm = new HdfsOpenReadBenchmark(threads, iterations, file_size, conf_map);
+ } else if (op_type == "open") {
+ *bm = new HdfsOpenBenchmark(threads, iterations, file_size, conf_map);
+ } else if (op_type == "rename") {
+ *bm = new HdfsRenameBenchmark(threads, iterations, file_size, conf_map);
+ } else if (op_type == "delete") {
+ *bm = new HdfsDeleteBenchmark(threads, iterations, file_size, conf_map);
+ } else if (op_type == "exists") {
+ *bm = new HdfsExistsBenchmark(threads, iterations, file_size, conf_map);
} else {
return Status::Error<ErrorCode::INVALID_ARGUMENT>(
"unknown params: fs_type: {}, op_type: {}, iterations: {}", fs_type, op_type,
@@ -49,9 +69,15 @@ Status BenchmarkFactory::getBm(const std::string fs_type, const std::string op_t
class MultiBenchmark {
public:
- MultiBenchmark(const std::string& type, const std::string& operation, int64_t iterations,
+ MultiBenchmark(const std::string& type, const std::string& operation, int64_t threads,
+ int64_t iterations, size_t file_size,
const std::map<std::string, std::string>& conf_map)
- : _type(type), _operation(operation), _iterations(iterations), _conf_map(conf_map) {}
+ : _type(type),
+ _operation(operation),
+ _threads(threads),
+ _iterations(iterations),
+ _file_size(file_size),
+ _conf_map(conf_map) {}
~MultiBenchmark() {
for (auto bm : benchmarks) {
@@ -59,11 +85,30 @@ public:
}
}
- Status init_env() { return Status::OK(); }
+ Status init_env() {
+ std::string conffile = std::string(getenv("DORIS_HOME")) + "/conf/be.conf";
+ if (!doris::config::init(conffile.c_str(), true, true, true)) {
+ fprintf(stderr, "error read config file. \n");
+ return Status::Error<INTERNAL_ERROR>();
+ }
+ doris::CpuInfo::init();
+ Status status = Status::OK();
+ if (doris::config::enable_java_support) {
+ // Init jni
+ status = doris::JniUtil::Init();
+ if (!status.ok()) {
+ LOG(WARNING) << "Failed to initialize JNI: " << status;
+ exit(1);
+ }
+ }
+
+ return Status::OK();
+ }
Status init_bms() {
BaseBenchmark* bm;
- Status st = BenchmarkFactory::getBm(_type, _operation, _iterations, _conf_map, &bm);
+ Status st = BenchmarkFactory::getBm(_type, _operation, _threads, _iterations, _file_size,
+ _conf_map, &bm);
if (!st) {
return st;
}
@@ -76,7 +121,9 @@ private:
std::vector<BaseBenchmark*> benchmarks;
std::string _type;
std::string _operation;
+ int64_t _threads;
int64_t _iterations;
+ size_t _file_size;
std::map<std::string, std::string> _conf_map;
};
diff --git a/be/src/io/fs/benchmark/fs_benchmark_tool.cpp b/be/src/io/fs/benchmark/fs_benchmark_tool.cpp
index ad8772bb0f..4002ea84ac 100644
--- a/be/src/io/fs/benchmark/fs_benchmark_tool.cpp
+++ b/be/src/io/fs/benchmark/fs_benchmark_tool.cpp
@@ -22,8 +22,11 @@
#include "io/fs/benchmark/benchmark_factory.hpp"
DEFINE_string(fs_type, "hdfs", "Supported File System: s3, hdfs, local");
-DEFINE_string(operation, "read", "Supported Operations: read, write, open, size, list, connect");
+DEFINE_string(operation, "create_write",
+ "Supported Operations: create_write, open_read, open, rename, delete, exists");
+DEFINE_string(threads, "10", "Number of threads");
DEFINE_string(iterations, "10", "Number of runs");
+DEFINE_string(file_size, "104857600", "File size");
DEFINE_string(conf, "", "config file");
std::string get_usage(const std::string& progname) {
@@ -31,17 +34,25 @@ std::string get_usage(const std::string& progname) {
ss << progname << " is the Doris BE benchmark tool for testing file system.\n";
ss << "Usage:\n";
- ss << progname << " --fs_type=[fs_type] --operation=[op_type] --iterations=10\n";
+ ss << progname
+ << " --fs_type=[fs_type] --operation=[op_type] --threads=10 --iterations=10 "
+ "--file_size=104857600\n";
ss << "\nfs_type:\n";
ss << " hdfs\n";
ss << " s3\n";
ss << "\nop_type:\n";
ss << " read\n";
ss << " write\n";
+ ss << "\nthreads:\n";
+ ss << " num of threads\n";
ss << "\niterations:\n";
ss << " num of run\n";
+ ss << "\nfile_size:\n";
+ ss << " file size\n";
ss << "\nExample:\n";
- ss << progname << " --conf my.conf --fs_type=s3 --operation=read --iterations=100\n";
+ ss << progname
+ << " --conf my.conf --fs_type=s3 --operation=read --threads=10 --iterations=100 "
+ "--file_size=104857600\n";
return ss.str();
}
@@ -93,8 +104,9 @@ int main(int argc, char** argv) {
}
try {
- doris::io::MultiBenchmark multi_bm(FLAGS_fs_type, FLAGS_operation,
- std::stoi(FLAGS_iterations), conf_map);
+ doris::io::MultiBenchmark multi_bm(FLAGS_fs_type, FLAGS_operation, std::stoi(FLAGS_threads),
+ std::stoi(FLAGS_iterations), std::stol(FLAGS_file_size),
+ conf_map);
doris::Status st = multi_bm.init_env();
if (!st) {
std::cout << "init env failed: " << st << std::endl;
diff --git a/be/src/io/fs/benchmark/hdfs_benchmark.hpp b/be/src/io/fs/benchmark/hdfs_benchmark.hpp
new file mode 100644
index 0000000000..637f7a614a
--- /dev/null
+++ b/be/src/io/fs/benchmark/hdfs_benchmark.hpp
@@ -0,0 +1,299 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "io/file_factory.h"
+#include "io/fs/benchmark/base_benchmark.h"
+#include "io/fs/file_reader_writer_fwd.h"
+#include "io/fs/file_writer.h"
+#include "io/fs/hdfs_file_reader.h"
+#include "io/fs/hdfs_file_system.h"
+#include "io/hdfs_builder.h"
+#include "util/jni-util.h"
+#include "util/slice.h"
+
+namespace doris::io {
+
+class HdfsOpenReadBenchmark : public BaseBenchmark {
+public:
+ HdfsOpenReadBenchmark(int threads, int iterations, size_t file_size,
+ const std::map<std::string, std::string>& conf_map)
+ : BaseBenchmark("HdfsReadBenchmark", threads, iterations, file_size, 3, conf_map) {}
+ virtual ~HdfsOpenReadBenchmark() = default;
+
+ Status init() override { return Status::OK(); }
+
+ Status run(benchmark::State& state) override {
+ std::shared_ptr<io::FileSystem> fs;
+ io::FileReaderSPtr reader;
+ bm_log("begin to init {}", _name);
+ std::string base_dir = _conf_map["baseDir"];
+ size_t buffer_size =
+ _conf_map.contains("buffer_size") ? std::stol(_conf_map["buffer_size"]) : 1000000L;
+ io::FileReaderOptions reader_opts = FileFactory::get_reader_options(nullptr);
+ THdfsParams hdfs_params = parse_properties(_conf_map);
+ auto file_path = fmt::format("{}/test_{}", base_dir, state.thread_index());
+ bm_log("file_path: {}", file_path);
+ RETURN_IF_ERROR(
+ FileFactory::create_hdfs_reader(hdfs_params, file_path, &fs, &reader, reader_opts));
+ bm_log("finish to init {}", _name);
+
+ bm_log("begin to run {}", _name);
+ Status status;
+ std::vector<char> buffer;
+ buffer.resize(buffer_size);
+ doris::Slice data = {buffer.data(), buffer.size()};
+ size_t offset = 0;
+ size_t bytes_read = 0;
+
+ auto start = std::chrono::high_resolution_clock::now();
+ size_t read_size = _file_size;
+ long remaining_size = read_size;
+
+ while (remaining_size > 0) {
+ bytes_read = 0;
+ size_t size = std::min(buffer_size, (size_t)remaining_size);
+ data.size = size;
+ status = reader->read_at(offset, data, &bytes_read);
+ if (status != Status::OK() || bytes_read < 0) {
+ bm_log("reader read_at error: {}", status.to_string());
+ break;
+ }
+ if (bytes_read == 0) { // EOF
+ break;
+ }
+ offset += bytes_read;
+ remaining_size -= bytes_read;
+ }
+ bm_log("finish to run {}", _name);
+
+ auto end = std::chrono::high_resolution_clock::now();
+
+ auto elapsed_seconds =
+ std::chrono::duration_cast<std::chrono::duration<double>>(end - start);
+
+ state.SetIterationTime(elapsed_seconds.count());
+
+ if (reader != nullptr) {
+ reader->close();
+ }
+ return status;
+ }
+};
+
+class HdfsOpenBenchmark : public BaseBenchmark {
+public:
+ HdfsOpenBenchmark(int threads, int iterations, size_t file_size,
+ const std::map<std::string, std::string>& conf_map)
+ : BaseBenchmark("HdfsOpenBenchmark", threads, iterations, file_size, 3, conf_map) {}
+ virtual ~HdfsOpenBenchmark() = default;
+
+ Status init() override { return Status::OK(); }
+
+ Status run(benchmark::State& state) override {
+ bm_log("begin to run {}", _name);
+ auto start = std::chrono::high_resolution_clock::now();
+ std::string base_dir = _conf_map["baseDir"];
+ io::FileReaderOptions reader_opts = FileFactory::get_reader_options(nullptr);
+ THdfsParams hdfs_params = parse_properties(_conf_map);
+ auto file_path = fmt::format("{}/test_{}", base_dir, state.thread_index());
+ bm_log("file_path: {}", file_path);
+ std::shared_ptr<io::HdfsFileSystem> fs;
+ io::FileReaderSPtr reader;
+ RETURN_IF_ERROR(io::HdfsFileSystem::create(hdfs_params, "", &fs));
+ RETURN_IF_ERROR(fs->open_file(file_path, reader_opts, &reader));
+ auto end = std::chrono::high_resolution_clock::now();
+ auto elapsed_seconds =
+ std::chrono::duration_cast<std::chrono::duration<double>>(end - start);
+
+ state.SetIterationTime(elapsed_seconds.count());
+ bm_log("finish to run {}", _name);
+
+ if (reader != nullptr) {
+ reader->close();
+ }
+ return Status::OK();
+ }
+
+private:
+};
+
+class HdfsCreateWriteBenchmark : public BaseBenchmark {
+public:
+ HdfsCreateWriteBenchmark(int threads, int iterations, size_t file_size,
+ const std::map<std::string, std::string>& conf_map)
+ : BaseBenchmark("HdfsCreateWriteBenchmark", threads, iterations, file_size, 3,
+ conf_map) {}
+ virtual ~HdfsCreateWriteBenchmark() = default;
+
+ Status init() override {
+ std::string base_dir = _conf_map["baseDir"];
+ io::FileReaderOptions reader_opts = FileFactory::get_reader_options(nullptr);
+ THdfsParams hdfs_params = parse_properties(_conf_map);
+ std::shared_ptr<io::HdfsFileSystem> fs;
+ RETURN_IF_ERROR(io::HdfsFileSystem::create(hdfs_params, "", &fs));
+ RETURN_IF_ERROR(fs->delete_directory(base_dir));
+ return Status::OK();
+ }
+
+ Status run(benchmark::State& state) override {
+ bm_log("begin to run {}", _name);
+ auto start = std::chrono::high_resolution_clock::now();
+ std::string base_dir = _conf_map["baseDir"];
+ io::FileReaderOptions reader_opts = FileFactory::get_reader_options(nullptr);
+ THdfsParams hdfs_params = parse_properties(_conf_map);
+ auto file_path = fmt::format("{}/test_{}", base_dir, state.thread_index());
+ bm_log("file_path: {}", file_path);
+ std::shared_ptr<io::HdfsFileSystem> fs;
+ io::FileWriterPtr writer;
+ RETURN_IF_ERROR(io::HdfsFileSystem::create(hdfs_params, "", &fs));
+ RETURN_IF_ERROR(fs->create_file(file_path, &writer));
+ Status status;
+ size_t write_size = _file_size;
+ size_t buffer_size =
+ _conf_map.contains("buffer_size") ? std::stol(_conf_map["buffer_size"]) : 1000000L;
+ long remaining_size = write_size;
+ std::vector<char> buffer;
+ buffer.resize(buffer_size);
+ doris::Slice data = {buffer.data(), buffer.size()};
+ while (remaining_size > 0) {
+ size_t size = std::min(buffer_size, (size_t)remaining_size);
+ data.size = size;
+ status = writer->append(data);
+ if (status != Status::OK()) {
+ bm_log("writer append error: {}", status.to_string());
+ break;
+ }
+ remaining_size -= size;
+ }
+ auto end = std::chrono::high_resolution_clock::now();
+ auto elapsed_seconds =
+ std::chrono::duration_cast<std::chrono::duration<double>>(end - start);
+
+ state.SetIterationTime(elapsed_seconds.count());
+ bm_log("finish to run {}", _name);
+
+ if (writer != nullptr) {
+ writer->close();
+ }
+ return status;
+ }
+};
+
+class HdfsRenameBenchmark : public BaseBenchmark {
+public:
+ HdfsRenameBenchmark(int threads, int iterations, size_t file_size,
+ const std::map<std::string, std::string>& conf_map)
+ : BaseBenchmark("HdfsRenameBenchmark", threads, 1, file_size, 1, conf_map) {}
+ virtual ~HdfsRenameBenchmark() = default;
+
+ Status init() override { return Status::OK(); }
+
+ Status run(benchmark::State& state) override {
+ bm_log("begin to run {}", _name);
+ auto start = std::chrono::high_resolution_clock::now();
+ std::string base_dir = _conf_map["baseDir"];
+ io::FileReaderOptions reader_opts = FileFactory::get_reader_options(nullptr);
+ THdfsParams hdfs_params = parse_properties(_conf_map);
+ auto file_path = fmt::format("{}/test_{}", base_dir, state.thread_index());
+ auto new_file_path = fmt::format("{}/test_{}_new", base_dir, state.thread_index());
+ bm_log("file_path: {}", file_path);
+ std::shared_ptr<io::HdfsFileSystem> fs;
+ io::FileWriterPtr writer;
+ RETURN_IF_ERROR(io::HdfsFileSystem::create(hdfs_params, "", &fs));
+ RETURN_IF_ERROR(fs->rename(file_path, new_file_path));
+ auto end = std::chrono::high_resolution_clock::now();
+ auto elapsed_seconds =
+ std::chrono::duration_cast<std::chrono::duration<double>>(end - start);
+
+ state.SetIterationTime(elapsed_seconds.count());
+ bm_log("finish to run {}", _name);
+
+ if (writer != nullptr) {
+ writer->close();
+ }
+ return Status::OK();
+ }
+
+private:
+};
+
+class HdfsDeleteBenchmark : public BaseBenchmark {
+public:
+ HdfsDeleteBenchmark(int threads, int iterations, size_t file_size,
+ const std::map<std::string, std::string>& conf_map)
+ : BaseBenchmark("HdfsDeleteBenchmark", threads, 1, file_size, 1, conf_map) {}
+ virtual ~HdfsDeleteBenchmark() = default;
+
+ Status init() override { return Status::OK(); }
+
+ Status run(benchmark::State& state) override {
+ bm_log("begin to run {}", _name);
+ auto start = std::chrono::high_resolution_clock::now();
+ std::string base_dir = _conf_map["baseDir"];
+ io::FileReaderOptions reader_opts = FileFactory::get_reader_options(nullptr);
+ THdfsParams hdfs_params = parse_properties(_conf_map);
+ auto file_path = fmt::format("{}/test_{}", base_dir, state.thread_index());
+ bm_log("file_path: {}", file_path);
+ std::shared_ptr<io::HdfsFileSystem> fs;
+ RETURN_IF_ERROR(io::HdfsFileSystem::create(hdfs_params, "", &fs));
+ RETURN_IF_ERROR(fs->delete_file(file_path));
+ auto end = std::chrono::high_resolution_clock::now();
+ auto elapsed_seconds =
+ std::chrono::duration_cast<std::chrono::duration<double>>(end - start);
+
+ state.SetIterationTime(elapsed_seconds.count());
+ bm_log("finish to run {}", _name);
+ return Status::OK();
+ }
+
+private:
+};
+
+class HdfsExistsBenchmark : public BaseBenchmark {
+public:
+ HdfsExistsBenchmark(int threads, int iterations, size_t file_size,
+ const std::map<std::string, std::string>& conf_map)
+ : BaseBenchmark("HdfsExistsBenchmark", threads, iterations, file_size, 3, conf_map) {}
+ virtual ~HdfsExistsBenchmark() = default;
+
+ Status init() override { return Status::OK(); }
+
+ Status run(benchmark::State& state) override {
+ bm_log("begin to run {}", _name);
+ auto start = std::chrono::high_resolution_clock::now();
+ std::string base_dir = _conf_map["baseDir"];
+ io::FileReaderOptions reader_opts = FileFactory::get_reader_options(nullptr);
+ THdfsParams hdfs_params = parse_properties(_conf_map);
+ auto file_path = fmt::format("{}/test_{}", base_dir, state.thread_index());
+ bm_log("file_path: {}", file_path);
+ std::shared_ptr<io::HdfsFileSystem> fs;
+ RETURN_IF_ERROR(io::HdfsFileSystem::create(hdfs_params, "", &fs));
+ bool res = false;
+ RETURN_IF_ERROR(fs->exists(file_path, &res));
+ auto end = std::chrono::high_resolution_clock::now();
+ auto elapsed_seconds =
+ std::chrono::duration_cast<std::chrono::duration<double>>(end - start);
+
+ state.SetIterationTime(elapsed_seconds.count());
+ bm_log("finish to run {}", _name);
+ return Status::OK();
+ }
+};
+
+} // namespace doris::io
diff --git a/be/src/io/fs/benchmark/s3_benchmark.hpp b/be/src/io/fs/benchmark/s3_benchmark.hpp
index 5b9a81aaec..7e958cefdb 100644
--- a/be/src/io/fs/benchmark/s3_benchmark.hpp
+++ b/be/src/io/fs/benchmark/s3_benchmark.hpp
@@ -27,8 +27,10 @@ namespace doris::io {
class S3ReadBenchmark : public BaseBenchmark {
public:
- S3ReadBenchmark(int iterations, const std::map<std::string, std::string>& conf_map)
- : BaseBenchmark("S3ReadBenchmark", iterations, conf_map), _result(buffer, 128) {}
+ S3ReadBenchmark(int threads, int iterations, size_t file_size,
+ const std::map<std::string, std::string>& conf_map)
+ : BaseBenchmark("S3ReadBenchmark", threads, iterations, file_size, 3, conf_map),
+ _result(buffer, 128) {}
virtual ~S3ReadBenchmark() = default;
Status init() override {
@@ -41,7 +43,9 @@ public:
return Status::OK();
}
- Status run() override { return _reader->read_at(0, _result, &_bytes_read); }
+ Status run(benchmark::State& state) override {
+ return _reader->read_at(0, _result, &_bytes_read);
+ }
private:
doris::S3Conf _s3_conf;
diff --git a/bin/run-fs-benchmark.sh b/bin/run-fs-benchmark.sh
new file mode 100755
index 0000000000..bf433ef178
--- /dev/null
+++ b/bin/run-fs-benchmark.sh
@@ -0,0 +1,331 @@
+#!/usr/bin/env bash
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+set -eo pipefail
+
+curdir="$(cd "$(dirname "${BASH_SOURCE[0]}")" &>/dev/null && pwd)"
+
+MACHINE_OS=$(uname -s)
+if [[ "$(uname -s)" == 'Darwin' ]] && command -v brew &>/dev/null; then
+ PATH="$(brew --prefix)/opt/gnu-getopt/bin:${PATH}"
+ export PATH
+fi
+
+OPTS="$(getopt \
+ -n "$0" \
+ -o '' \
+ -l 'conf:,fs_type:,operation:,threads:,iterations:,file_size:' \
+ -- "$@")"
+
+eval set -- "${OPTS}"
+
+while true; do
+ case "$1" in
+ --conf)
+ CONF="$2"
+ shift 2
+ ;;
+ --fs_type)
+ FS_TYPE="$2"
+ shift 2
+ ;;
+ --operation)
+ OPERATION="$2"
+ shift 2
+ ;;
+ --threads)
+ THREADS="$2"
+ shift 2
+ ;;
+ --iterations)
+ ITERATIONS="$2"
+ shift 2
+ ;;
+ --file_size)
+ FILE_SIZE="$2"
+ shift 2
+ ;;
+ --)
+ shift
+ break
+ ;;
+ *)
+ echo "Internal error"
+ exit 1
+ ;;
+ esac
+done
+
+echo "CONF: ${CONF}"
+echo "FS_TYPE: ${FS_TYPE}"
+echo "OPERATION: ${OPERATION}"
+echo "THREADS: ${THREADS}"
+echo "ITERATIONS: ${ITERATIONS}"
+echo "FILE_SIZE: ${FILE_SIZE}"
+
+DORIS_HOME="$(
+ cd "${curdir}/.."
+ pwd
+)"
+export DORIS_HOME
+
+if [[ "$(uname -s)" != 'Darwin' ]]; then
+ MAX_MAP_COUNT="$(cat /proc/sys/vm/max_map_count)"
+ if [[ "${MAX_MAP_COUNT}" -lt 2000000 ]]; then
+ echo "Please set vm.max_map_count to be 2000000 under root using 'sysctl -w vm.max_map_count=2000000'."
+ exit 1
+ fi
+fi
+
+MAX_FILE_COUNT="$(ulimit -n)"
+if [[ "${MAX_FILE_COUNT}" -lt 65536 ]]; then
+ echo "Please set the maximum number of open file descriptors to be 65536 using 'ulimit -n 65536'."
+ exit 1
+fi
+
+# add java libs
+for f in "${DORIS_HOME}/lib/java_extensions"/*.jar; do
+ if [[ -z "${DORIS_CLASSPATH}" ]]; then
+ export DORIS_CLASSPATH="${f}"
+ else
+ export DORIS_CLASSPATH="${f}:${DORIS_CLASSPATH}"
+ fi
+done
+
+if [[ -d "${DORIS_HOME}/lib/hadoop_hdfs/" ]]; then
+ # add hadoop libs
+ for f in "${DORIS_HOME}/lib/hadoop_hdfs/common"/*.jar; do
+ DORIS_CLASSPATH="${f}:${DORIS_CLASSPATH}"
+ done
+ for f in "${DORIS_HOME}/lib/hadoop_hdfs/common/lib"/*.jar; do
+ DORIS_CLASSPATH="${f}:${DORIS_CLASSPATH}"
+ done
+ for f in "${DORIS_HOME}/lib/hadoop_hdfs/hdfs"/*.jar; do
+ DORIS_CLASSPATH="${f}:${DORIS_CLASSPATH}"
+ done
+ for f in "${DORIS_HOME}/lib/hadoop_hdfs/hdfs/lib"/*.jar; do
+ DORIS_CLASSPATH="${f}:${DORIS_CLASSPATH}"
+ done
+fi
+
+# the CLASSPATH and LIBHDFS_OPTS is used for hadoop libhdfs
+# and conf/ dir so that hadoop libhdfs can read .xml config file in conf/
+export CLASSPATH="${DORIS_HOME}/conf/:${DORIS_CLASSPATH}"
+# DORIS_CLASSPATH is for self-managed jni
+export DORIS_CLASSPATH="-Djava.class.path=${DORIS_CLASSPATH}"
+
+jdk_version() {
+ local java_cmd="${1}"
+ local result
+ local IFS=$'\n'
+
+ if [[ -z "${java_cmd}" ]]; then
+ result=no_java
+ return 1
+ else
+ local version
+ # remove \r for Cygwin
+ version="$("${java_cmd}" -Xms32M -Xmx32M -version 2>&1 | tr '\r' '\n' | grep version | awk '{print $3}')"
+ version="${version//\"/}"
+ if [[ "${version}" =~ ^1\. ]]; then
+ result="$(echo "${version}" | awk -F '.' '{print $2}')"
+ else
+ result="$(echo "${version}" | awk -F '.' '{print $1}')"
+ fi
+ fi
+ echo "${result}"
+ return 0
+}
+
+# export env variables from be.conf
+#
+# LOG_DIR
+# PID_DIR
+export LOG_DIR="${DORIS_HOME}/log"
+PID_DIR="$(
+ cd "${curdir}"
+ pwd
+)"
+export PID_DIR
+
+# set odbc conf path
+export ODBCSYSINI="${DORIS_HOME}/conf"
+
+# support utf8 for oracle database
+export NLS_LANG='AMERICAN_AMERICA.AL32UTF8'
+
+# filter known leak.
+export LSAN_OPTIONS="suppressions=${DORIS_HOME}/conf/lsan_suppr.conf"
+export ASAN_OPTIONS="suppressions=${DORIS_HOME}/conf/asan_suppr.conf"
+
+while read -r line; do
+ envline="$(echo "${line}" |
+ sed 's/[[:blank:]]*=[[:blank:]]*/=/g' |
+ sed 's/^[[:blank:]]*//g' |
+ grep -E "^[[:upper:]]([[:upper:]]|_|[[:digit:]])*=" ||
+ true)"
+ envline="$(eval "echo ${envline}")"
+ if [[ "${envline}" == *"="* ]]; then
+ eval 'export "${envline}"'
+ fi
+done <"${DORIS_HOME}/conf/be.conf"
+
+if [[ -e "${DORIS_HOME}/bin/palo_env.sh" ]]; then
+ # shellcheck disable=1091
+ source "${DORIS_HOME}/bin/palo_env.sh"
+fi
+
+if [[ -z "${JAVA_HOME}" ]]; then
+ echo "The JAVA_HOME environment variable is not defined correctly"
+ echo "This environment variable is needed to run this program"
+ echo "NB: JAVA_HOME should point to a JDK not a JRE"
+ echo "You can set it in be.conf"
+ exit 1
+fi
+
+if [[ ! -d "${LOG_DIR}" ]]; then
+ mkdir -p "${LOG_DIR}"
+fi
+
+pidfile="${PID_DIR}/fs_benchmark_tool.pid"
+
+if [[ -f "${pidfile}" ]]; then
+ if kill -0 "$(cat "${pidfile}")" >/dev/null 2>&1; then
+ echo "Backend running as process $(cat "${pidfile}"). Stop it first."
+ exit 1
+ else
+ rm "${pidfile}"
+ fi
+fi
+
+chmod 755 "${DORIS_HOME}/lib/fs_benchmark_tool"
+echo "start time: $(date)" >>"${LOG_DIR}/fs_benchmark_tool.out"
+
+if [[ ! -f '/bin/limit3' ]]; then
+ LIMIT=''
+else
+ LIMIT="/bin/limit3 -c 0 -n 65536"
+fi
+
+## set asan and ubsan env to generate core file
+export ASAN_OPTIONS=symbolize=1:abort_on_error=1:disable_coredump=0:unmap_shadow_on_exit=1:detect_container_overflow=0
+export UBSAN_OPTIONS=print_stacktrace=1
+
+## set TCMALLOC_HEAP_LIMIT_MB to limit memory used by tcmalloc
+set_tcmalloc_heap_limit() {
+ local total_mem_mb
+ local mem_limit_str
+
+ if [[ "$(uname -s)" != 'Darwin' ]]; then
+ total_mem_mb="$(free -m | grep Mem | awk '{print $2}')"
+ else
+ total_mem_mb="$(($(sysctl -a hw.memsize | awk '{print $NF}') / 1024))"
+ fi
+ mem_limit_str=$(grep ^mem_limit "${DORIS_HOME}"/conf/be.conf)
+ local digits_unit=${mem_limit_str##*=}
+ digits_unit="${digits_unit#"${digits_unit%%[![:space:]]*}"}"
+ digits_unit="${digits_unit%"${digits_unit##*[![:space:]]}"}"
+ local digits=${digits_unit%%[^[:digit:]]*}
+ local unit=${digits_unit##*[[:digit:] ]}
+
+ mem_limit_mb=0
+ case ${unit} in
+ t | T) mem_limit_mb=$((digits * 1024 * 1024)) ;;
+ g | G) mem_limit_mb=$((digits * 1024)) ;;
+ m | M) mem_limit_mb=$((digits)) ;;
+ k | K) mem_limit_mb=$((digits / 1024)) ;;
+ %) mem_limit_mb=$((total_mem_mb * digits / 100)) ;;
+ *) mem_limit_mb=$((digits / 1024 / 1024 / 1024)) ;;
+ esac
+
+ if [[ "${mem_limit_mb}" -eq 0 ]]; then
+ mem_limit_mb=$((total_mem_mb * 90 / 100))
+ fi
+
+ if [[ "${mem_limit_mb}" -gt "${total_mem_mb}" ]]; then
+ echo "mem_limit is larger than whole memory of the server. ${mem_limit_mb} > ${total_mem_mb}."
+ return 1
+ fi
+ export TCMALLOC_HEAP_LIMIT_MB=${mem_limit_mb}
+}
+
+# set_tcmalloc_heap_limit || exit 1
+
+## set hdfs3 conf
+if [[ -f "${DORIS_HOME}/conf/hdfs-site.xml" ]]; then
+ export LIBHDFS3_CONF="${DORIS_HOME}/conf/hdfs-site.xml"
+fi
+
+# check java version and choose correct JAVA_OPTS
+java_version="$(
+ set -e
+ jdk_version "${JAVA_HOME}/bin/java"
+)"
+
+CUR_DATE=$(date +%Y%m%d-%H%M%S)
+LOG_PATH="-DlogPath=${DORIS_HOME}/log/jni.log"
+COMMON_OPTS="-Dsun.java.command=DorisBE -XX:-CriticalJNINatives"
+JDBC_OPTS="-DJDBC_MIN_POOL=1 -DJDBC_MAX_POOL=100 -DJDBC_MAX_IDEL_TIME=300000"
+
+if [[ "${java_version}" -gt 8 ]]; then
+ if [[ -z ${JAVA_OPTS_FOR_JDK_9} ]]; then
+ JAVA_OPTS_FOR_JDK_9="-Xmx1024m ${LOG_PATH} -Xlog:gc:${DORIS_HOME}/log/fs_benchmark_tool.gc.log.${CUR_DATE} ${COMMON_OPTS} ${JDBC_OPTS}"
+ fi
+ final_java_opt="${JAVA_OPTS_FOR_JDK_9}"
+else
+ if [[ -z ${JAVA_OPTS} ]]; then
+ JAVA_OPTS="-Xmx1024m ${LOG_PATH} -Xloggc:${DORIS_HOME}/log/fs_benchmark_tool.gc.log.${CUR_DATE} ${COMMON_OPTS} ${JDBC_OPTS}"
+ fi
+ final_java_opt="${JAVA_OPTS}"
+fi
+
+if [[ "${MACHINE_OS}" == "Darwin" ]]; then
+ max_fd_limit='-XX:-MaxFDLimit'
+
+ if ! echo "${final_java_opt}" | grep "${max_fd_limit/-/\-}" >/dev/null; then
+ final_java_opt="${final_java_opt} ${max_fd_limit}"
+ fi
+
+ if [[ -n "${JAVA_OPTS}" ]] && ! echo "${JAVA_OPTS}" | grep "${max_fd_limit/-/\-}" >/dev/null; then
+ JAVA_OPTS="${JAVA_OPTS} ${max_fd_limit}"
+ fi
+fi
+
+# set LIBHDFS_OPTS for hadoop libhdfs
+export LIBHDFS_OPTS="${final_java_opt}"
+
+#echo "CLASSPATH: ${CLASSPATH}"
+#echo "LD_LIBRARY_PATH: ${LD_LIBRARY_PATH}"
+#echo "LIBHDFS_OPTS: ${LIBHDFS_OPTS}"
+
+# see https://github.com/apache/doris/blob/master/docs/zh-CN/community/developer-guide/debug-tool.md#jemalloc-heap-profile
+export JEMALLOC_CONF="percpu_arena:percpu,background_thread:true,metadata_thp:auto,muzzy_decay_ms:30000,dirty_decay_ms:30000,oversize_threshold:0,lg_tcache_max:16,prof_prefix:jeprof.out"
+
+${LIMIT:+${LIMIT}} "${DORIS_HOME}/lib/fs_benchmark_tool" --conf "${CONF}" --fs_type="${FS_TYPE}" --operation="${OPERATION}" --threads="${THREADS}" --iterations="${ITERATIONS}" --file_size="${FILE_SIZE}" 2>&1 | tee "${LOG_DIR}/fs_benchmark_tool.log"
+
+qps="0MB/s"
+latency="0ms"
+
+eval "$(grep "median" "${LOG_DIR}/fs_benchmark_tool.log" | awk '{printf("qps=%sMB/s latency=%sms", "'"${FILE_SIZE}"'" / 1024 / 1024 / ($2 * "'"${THREADS}"'" / 1000), $2 * "'"${THREADS}"'")}')"
+
+echo "------------------------------"
+echo " Benchmark Result "
+echo "------------------------------"
+echo "thread_num: ${THREADS}."
+echo "qps: ${qps}."
+echo "latency: ${latency}."
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org