You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by yi...@apache.org on 2022/07/23 08:06:34 UTC
[doris] branch master updated: [fix] Move s3 fs connect outside the lock critical area (#11026)
This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 0b6d2ae290 [fix] Move s3 fs connect outside the lock critical area (#11026)
0b6d2ae290 is described below
commit 0b6d2ae290eb5f88a229988638d5fc84eabe3a06
Author: plat1ko <pl...@gmail.com>
AuthorDate: Sat Jul 23 16:06:29 2022 +0800
[fix] Move s3 fs connect outside the lock critical area (#11026)
* fix potential bug of S3FileSystem
* move s3 fs connect outside the lock critical area
---
be/src/agent/task_worker_pool.cpp | 4 +-
be/src/io/fs/s3_common.h | 2 +
be/src/io/fs/s3_file_reader.cpp | 3 +
be/src/io/fs/s3_file_system.cpp | 93 ++++++++++++++++--------------
be/src/io/fs/s3_file_system.h | 12 ++--
be/src/olap/storage_policy_mgr.cpp | 97 +++++++++++++++++---------------
be/src/olap/storage_policy_mgr.h | 6 +-
be/src/olap/tablet.h | 2 -
be/src/util/s3_util.cpp | 28 +++++++++
be/src/util/s3_util.h | 28 ++++++++-
be/test/CMakeLists.txt | 1 -
be/test/olap/rowset/beta_rowset_test.cpp | 15 ++---
be/test/olap/tablet_clone_test.cpp | 13 +++--
be/test/olap/tablet_cooldown_test.cpp | 28 +++++----
14 files changed, 206 insertions(+), 126 deletions(-)
diff --git a/be/src/agent/task_worker_pool.cpp b/be/src/agent/task_worker_pool.cpp
index dafa6be519..1380331e7a 100644
--- a/be/src/agent/task_worker_pool.cpp
+++ b/be/src/agent/task_worker_pool.cpp
@@ -1753,7 +1753,7 @@ void TaskWorkerPool::_storage_refresh_storage_policy_worker_thread_callback() {
policy_ptr->md5_sum = iter.md5_checksum;
LOG_EVERY_N(INFO, 12) << "refresh storage policy task, policy " << *policy_ptr;
- spm->periodic_put(iter.policy_name, std::move(policy_ptr));
+ spm->periodic_put(iter.policy_name, policy_ptr);
}
}
}
@@ -1796,7 +1796,7 @@ void TaskWorkerPool::_storage_update_storage_policy_worker_thread_callback() {
LOG(INFO) << "get storage update policy task, update policy " << *policy_ptr;
- spm->update(get_storage_policy_req.policy_name, std::move(policy_ptr));
+ spm->update(get_storage_policy_req.policy_name, policy_ptr);
_remove_task_info(agent_task_req.task_type, agent_task_req.signature);
}
}
diff --git a/be/src/io/fs/s3_common.h b/be/src/io/fs/s3_common.h
index 41ff04ee3a..2f42045522 100644
--- a/be/src/io/fs/s3_common.h
+++ b/be/src/io/fs/s3_common.h
@@ -15,6 +15,8 @@
// specific language governing permissions and limitations
// under the License.
+#pragma once
+
#include <aws/core/utils/memory/stl/AWSStreamFwd.h>
#include <aws/core/utils/stream/PreallocatedStreamBuf.h>
diff --git a/be/src/io/fs/s3_file_reader.cpp b/be/src/io/fs/s3_file_reader.cpp
index fa5d36e38c..a75da54bb0 100644
--- a/be/src/io/fs/s3_file_reader.cpp
+++ b/be/src/io/fs/s3_file_reader.cpp
@@ -70,6 +70,9 @@ Status S3FileReader::read_at(size_t offset, Slice result, size_t* bytes_read) {
request.SetResponseStreamFactory(AwsWriteableStreamFactory(to, bytes_req));
auto client = _fs->get_client();
+ if (!client) {
+ return Status::InternalError("init s3 client error");
+ }
auto outcome = client->GetObject(request);
if (!outcome.IsSuccess()) {
return Status::IOError(fmt::format("failed to read from {}: {}", _path.native(),
diff --git a/be/src/io/fs/s3_file_system.cpp b/be/src/io/fs/s3_file_system.cpp
index 53be081e3f..e421863774 100644
--- a/be/src/io/fs/s3_file_system.cpp
+++ b/be/src/io/fs/s3_file_system.cpp
@@ -37,14 +37,18 @@
namespace doris {
namespace io {
-S3FileSystem::S3FileSystem(const std::map<std::string, std::string>& properties, std::string bucket,
- std::string prefix, ResourceId resource_id)
- : RemoteFileSystem(Path(properties.at(S3_ENDPOINT)) / bucket / prefix,
- std::move(resource_id), FileSystemType::S3),
- _properties(properties),
- _bucket(std::move(bucket)),
- _prefix(std::move(prefix)) {
- _endpoint = properties.at(S3_ENDPOINT);
+#ifndef CHECK_S3_CLIENT
+#define CHECK_S3_CLIENT(client) \
+ if (!client) { \
+ return Status::InternalError("init s3 client error"); \
+ }
+#endif
+
+S3FileSystem::S3FileSystem(S3Conf s3_conf, ResourceId resource_id)
+ : RemoteFileSystem(
+ fmt::format("{}/{}/{}", s3_conf.endpoint, s3_conf.bucket, s3_conf.prefix),
+ std::move(resource_id), FileSystemType::S3),
+ _s3_conf(std::move(s3_conf)) {
_executor = Aws::MakeShared<Aws::Utils::Threading::PooledThreadExecutor>(
resource_id.c_str(), config::s3_transfer_executor_pool_size);
}
@@ -53,13 +57,16 @@ S3FileSystem::~S3FileSystem() = default;
Status S3FileSystem::connect() {
std::lock_guard lock(_client_mu);
- _client = ClientFactory::instance().create(_properties);
+ _client = ClientFactory::instance().create(_s3_conf);
+ if (!_client) {
+ return Status::InternalError("failed to init s3 client with {}", _s3_conf.to_string());
+ }
return Status::OK();
}
Status S3FileSystem::upload(const Path& local_path, const Path& dest_path) {
auto client = get_client();
- DCHECK(client);
+ CHECK_S3_CLIENT(client);
Aws::Transfer::TransferManagerConfiguration transfer_config(_executor.get());
transfer_config.s3Client = client;
@@ -68,22 +75,23 @@ Status S3FileSystem::upload(const Path& local_path, const Path& dest_path) {
auto start = std::chrono::steady_clock::now();
auto key = get_key(dest_path);
- auto handle = transfer_manager->UploadFile(local_path.native(), _bucket, key, "text/plain",
- Aws::Map<Aws::String, Aws::String>());
+ auto handle = transfer_manager->UploadFile(local_path.native(), _s3_conf.bucket, key,
+ "text/plain", Aws::Map<Aws::String, Aws::String>());
handle->WaitUntilFinished();
auto duration = std::chrono::duration<float>(std::chrono::steady_clock::now() - start);
if (handle->GetStatus() != Aws::Transfer::TransferStatus::COMPLETED) {
- return Status::IOError(fmt::format("failed to upload(endpoint={}, bucket={}, key={}): {}",
- _endpoint, _bucket, key,
- handle->GetLastError().GetMessage()));
+ return Status::IOError("failed to upload(endpoint={}, bucket={}, key={}): {}",
+ _s3_conf.endpoint, _s3_conf.bucket, key,
+ handle->GetLastError().GetMessage());
}
auto file_size = std::filesystem::file_size(local_path);
- LOG(INFO) << "Upload " << local_path.native() << " to s3, endpoint=" << _endpoint
- << ", bucket=" << _bucket << ", key=" << key << ", duration=" << duration.count()
- << ", capacity=" << file_size << ", tp=" << (file_size) / duration.count();
+ LOG(INFO) << "Upload " << local_path.native() << " to s3, endpoint=" << _s3_conf.endpoint
+ << ", bucket=" << _s3_conf.bucket << ", key=" << key
+ << ", duration=" << duration.count() << ", capacity=" << file_size
+ << ", tp=" << (file_size) / duration.count();
return Status::OK();
}
@@ -91,7 +99,7 @@ Status S3FileSystem::upload(const Path& local_path, const Path& dest_path) {
Status S3FileSystem::batch_upload(const std::vector<Path>& local_paths,
const std::vector<Path>& dest_paths) {
auto client = get_client();
- DCHECK(client);
+ CHECK_S3_CLIENT(client);
if (local_paths.size() != dest_paths.size()) {
return Status::InvalidArgument("local_paths.size() != dest_paths.size()");
@@ -105,10 +113,11 @@ Status S3FileSystem::batch_upload(const std::vector<Path>& local_paths,
for (int i = 0; i < local_paths.size(); ++i) {
auto key = get_key(dest_paths[i]);
LOG(INFO) << "Start to upload " << local_paths[i].native()
- << " to s3, endpoint=" << _endpoint << ", bucket=" << _bucket << ", key=" << key;
+ << " to s3, endpoint=" << _s3_conf.endpoint << ", bucket=" << _s3_conf.bucket
+ << ", key=" << key;
auto handle =
- transfer_manager->UploadFile(local_paths[i].native(), _bucket, key, "text/plain",
- Aws::Map<Aws::String, Aws::String>());
+ transfer_manager->UploadFile(local_paths[i].native(), _s3_conf.bucket, key,
+ "text/plain", Aws::Map<Aws::String, Aws::String>());
handles.push_back(std::move(handle));
}
for (auto& handle : handles) {
@@ -129,25 +138,25 @@ Status S3FileSystem::open_file(const Path& path, FileReaderSPtr* reader) {
size_t fsize = 0;
RETURN_IF_ERROR(file_size(path, &fsize));
auto key = get_key(path);
- auto fs_path = Path(_endpoint) / _bucket / key;
- *reader = std::make_unique<S3FileReader>(std::move(fs_path), fsize, std::move(key), _bucket,
- this);
+ auto fs_path = Path(_s3_conf.endpoint) / _s3_conf.bucket / key;
+ *reader = std::make_unique<S3FileReader>(std::move(fs_path), fsize, std::move(key),
+ _s3_conf.bucket, this);
return Status::OK();
}
Status S3FileSystem::delete_file(const Path& path) {
auto client = get_client();
- DCHECK(client);
+ CHECK_S3_CLIENT(client);
Aws::S3::Model::DeleteObjectRequest request;
auto key = get_key(path);
- request.WithBucket(_bucket).WithKey(key);
+ request.WithBucket(_s3_conf.bucket).WithKey(key);
auto outcome = client->DeleteObject(request);
if (!outcome.IsSuccess()) {
- return Status::IOError(
- fmt::format("failed to delete object(endpoint={}, bucket={}, key={}): {}",
- _endpoint, _bucket, key, outcome.GetError().GetMessage()));
+ return Status::IOError("failed to delete object(endpoint={}, bucket={}, key={}): {}",
+ _s3_conf.endpoint, _s3_conf.bucket, key,
+ outcome.GetError().GetMessage());
}
return Status::OK();
}
@@ -166,11 +175,11 @@ Status S3FileSystem::link_file(const Path& src, const Path& dest) {
Status S3FileSystem::exists(const Path& path, bool* res) const {
auto client = get_client();
- DCHECK(client);
+ CHECK_S3_CLIENT(client);
Aws::S3::Model::HeadObjectRequest request;
auto key = get_key(path);
- request.WithBucket(_bucket).WithKey(key);
+ request.WithBucket(_s3_conf.bucket).WithKey(key);
auto outcome = _client->HeadObject(request);
if (outcome.IsSuccess()) {
@@ -178,28 +187,28 @@ Status S3FileSystem::exists(const Path& path, bool* res) const {
} else if (outcome.GetError().GetResponseCode() == Aws::Http::HttpResponseCode::NOT_FOUND) {
*res = false;
} else {
- return Status::IOError(
- fmt::format("failed to get object head(endpoint={}, bucket={}, key={}): {}",
- _endpoint, _bucket, key, outcome.GetError().GetMessage()));
+ return Status::IOError("failed to get object head(endpoint={}, bucket={}, key={}): {}",
+ _s3_conf.endpoint, _s3_conf.bucket, key,
+ outcome.GetError().GetMessage());
}
return Status::OK();
}
Status S3FileSystem::file_size(const Path& path, size_t* file_size) const {
auto client = get_client();
- DCHECK(client);
+ CHECK_S3_CLIENT(client);
Aws::S3::Model::HeadObjectRequest request;
auto key = get_key(path);
- request.WithBucket(_bucket).WithKey(key);
+ request.WithBucket(_s3_conf.bucket).WithKey(key);
auto outcome = _client->HeadObject(request);
if (outcome.IsSuccess()) {
*file_size = outcome.GetResult().GetContentLength();
} else {
- return Status::IOError(
- fmt::format("failed to get object size(endpoint={}, bucket={}, key={}): {}",
- _endpoint, _bucket, key, outcome.GetError().GetMessage()));
+ return Status::IOError("failed to get object size(endpoint={}, bucket={}, key={}): {}",
+ _s3_conf.endpoint, _s3_conf.bucket, key,
+ outcome.GetError().GetMessage());
}
return Status::OK();
}
@@ -211,10 +220,10 @@ Status S3FileSystem::list(const Path& path, std::vector<Path>* files) {
std::string S3FileSystem::get_key(const Path& path) const {
StringPiece str(path.native());
if (str.starts_with(_root_path.native())) {
- return fmt::format("{}/{}", _prefix, str.data() + _root_path.native().size());
+ return fmt::format("{}/{}", _s3_conf.prefix, str.data() + _root_path.native().size());
}
// We consider it as a relative path.
- return fmt::format("{}/{}", _prefix, path.native());
+ return fmt::format("{}/{}", _s3_conf.prefix, path.native());
}
} // namespace io
diff --git a/be/src/io/fs/s3_file_system.h b/be/src/io/fs/s3_file_system.h
index fdfbe2f8ba..219ecb3a0d 100644
--- a/be/src/io/fs/s3_file_system.h
+++ b/be/src/io/fs/s3_file_system.h
@@ -35,8 +35,7 @@ namespace io {
// This class is thread-safe.(Except `set_xxx` method)
class S3FileSystem final : public RemoteFileSystem {
public:
- S3FileSystem(const std::map<std::string, std::string>& properties, std::string bucket,
- std::string prefix, ResourceId resource_id);
+ S3FileSystem(S3Conf s3_conf, ResourceId resource_id);
~S3FileSystem() override;
Status create_file(const Path& path, FileWriterPtr* writer) override;
@@ -70,19 +69,16 @@ public:
};
// Guarded by external lock.
- void set_ak(std::string ak) { _properties[S3_AK] = std::move(ak); }
+ void set_ak(std::string ak) { _s3_conf.ak = std::move(ak); }
// Guarded by external lock.
- void set_sk(std::string sk) { _properties[S3_SK] = std::move(sk); }
+ void set_sk(std::string sk) { _s3_conf.sk = std::move(sk); }
private:
std::string get_key(const Path& path) const;
private:
- std::map<std::string, std::string> _properties;
- std::string _endpoint;
- std::string _bucket;
- std::string _prefix;
+ S3Conf _s3_conf;
// FIXME(cyx): We can use std::atomic<std::shared_ptr> since c++20.
std::shared_ptr<Aws::S3::S3Client> _client;
diff --git a/be/src/olap/storage_policy_mgr.cpp b/be/src/olap/storage_policy_mgr.cpp
index f91cac76e0..57da5408f9 100644
--- a/be/src/olap/storage_policy_mgr.cpp
+++ b/be/src/olap/storage_policy_mgr.cpp
@@ -24,53 +24,62 @@
namespace doris {
-void StoragePolicyMgr::update(const std::string& name, StoragePolicyPtr policy) {
- std::lock_guard<std::mutex> l(_mutex);
- auto it = _policy_map.find(name);
- if (it != _policy_map.end()) {
- // just support change ak, sk, cooldown_ttl, cooldown_datetime
- LOG(INFO) << "update storage policy name: " << name;
- auto s3_fs = std::dynamic_pointer_cast<io::S3FileSystem>(
- io::FileSystemMap::instance()->get(name));
- DCHECK(s3_fs);
- s3_fs->set_ak(policy->s3_ak);
- s3_fs->set_sk(policy->s3_sk);
- s3_fs->connect();
- it->second = std::move(policy);
- } else {
- // can't find name's policy, so do nothing.
+void StoragePolicyMgr::update(const std::string& name, const StoragePolicyPtr& policy) {
+ std::shared_ptr<io::S3FileSystem> s3_fs;
+ {
+ std::lock_guard<std::mutex> l(_mutex);
+ auto it = _policy_map.find(name);
+ if (it != _policy_map.end()) {
+ LOG(INFO) << "update storage policy name: " << name;
+ it->second = policy;
+ s3_fs = std::dynamic_pointer_cast<io::S3FileSystem>(
+ io::FileSystemMap::instance()->get(name));
+ DCHECK(s3_fs);
+ s3_fs->set_ak(policy->s3_ak);
+ s3_fs->set_sk(policy->s3_sk);
+ }
+ }
+ if (s3_fs) {
+ auto st = s3_fs->connect();
+ if (!st.ok()) {
+ LOG(ERROR) << st;
+ }
}
}
-void StoragePolicyMgr::periodic_put(const std::string& name, StoragePolicyPtr policy) {
- std::lock_guard<std::mutex> l(_mutex);
- auto it = _policy_map.find(name);
- if (it == _policy_map.end()) {
- LOG(INFO) << "add storage policy name: " << name << " to map";
- std::map<std::string, std::string> aws_properties = {
- {S3_AK, policy->s3_ak},
- {S3_SK, policy->s3_sk},
- {S3_ENDPOINT, policy->s3_endpoint},
- {S3_REGION, policy->s3_region},
- {S3_MAX_CONN_SIZE, std::to_string(policy->s3_max_conn)},
- {S3_REQUEST_TIMEOUT_MS, std::to_string(policy->s3_request_timeout_ms)},
- {S3_CONN_TIMEOUT_MS, std::to_string(policy->s3_conn_timeout_ms)}};
- auto s3_fs = std::make_shared<io::S3FileSystem>(aws_properties, policy->bucket,
- policy->root_path, name);
- s3_fs->connect();
- io::FileSystemMap::instance()->insert(name, std::move(s3_fs));
- _policy_map.emplace(name, std::move(policy));
- } else if (it->second->md5_sum != policy->md5_sum) {
- // fe change policy
- // just support change ak, sk, cooldown_ttl, cooldown_datetime
- LOG(INFO) << "update storage policy name: " << name;
- auto s3_fs = std::dynamic_pointer_cast<io::S3FileSystem>(
- io::FileSystemMap::instance()->get(name));
- DCHECK(s3_fs);
- s3_fs->set_ak(policy->s3_ak);
- s3_fs->set_sk(policy->s3_sk);
- s3_fs->connect();
- it->second = std::move(policy);
+void StoragePolicyMgr::periodic_put(const std::string& name, const StoragePolicyPtr& policy) {
+ std::shared_ptr<io::S3FileSystem> s3_fs;
+ {
+ std::lock_guard<std::mutex> l(_mutex);
+ auto it = _policy_map.find(name);
+ if (it == _policy_map.end()) {
+ LOG(INFO) << "add storage policy name: " << name << " to map";
+ S3Conf s3_conf;
+ s3_conf.ak = policy->s3_ak;
+ s3_conf.sk = policy->s3_sk;
+ s3_conf.endpoint = policy->s3_endpoint;
+ s3_conf.region = policy->s3_region;
+ s3_conf.max_connections = policy->s3_max_conn;
+ s3_conf.request_timeout_ms = policy->s3_request_timeout_ms;
+ s3_conf.connect_timeout_ms = policy->s3_conn_timeout_ms;
+ s3_fs = std::make_shared<io::S3FileSystem>(std::move(s3_conf), name);
+ io::FileSystemMap::instance()->insert(name, s3_fs);
+ _policy_map.emplace(name, policy);
+ } else if (it->second->md5_sum != policy->md5_sum) {
+ LOG(INFO) << "update storage policy name: " << name;
+ it->second = policy;
+ s3_fs = std::dynamic_pointer_cast<io::S3FileSystem>(
+ io::FileSystemMap::instance()->get(name));
+ DCHECK(s3_fs);
+ s3_fs->set_ak(policy->s3_ak);
+ s3_fs->set_sk(policy->s3_sk);
+ }
+ }
+ if (s3_fs) {
+ auto st = s3_fs->connect();
+ if (!st.ok()) {
+ LOG(ERROR) << st;
+ }
}
}
diff --git a/be/src/olap/storage_policy_mgr.h b/be/src/olap/storage_policy_mgr.h
index 81fa15e00d..2da2c29f2d 100644
--- a/be/src/olap/storage_policy_mgr.h
+++ b/be/src/olap/storage_policy_mgr.h
@@ -58,15 +58,15 @@ class ExecEnv;
class StoragePolicyMgr {
public:
using StoragePolicyPtr = std::shared_ptr<StoragePolicy>;
- StoragePolicyMgr() {}
+ StoragePolicyMgr() = default;
~StoragePolicyMgr() = default;
// fe push update policy to be
- void update(const std::string& name, StoragePolicyPtr policy);
+ void update(const std::string& name, const StoragePolicyPtr& policy);
// periodic pull from fe
- void periodic_put(const std::string& name, StoragePolicyPtr policy);
+ void periodic_put(const std::string& name, const StoragePolicyPtr& policy);
StoragePolicyPtr get(const std::string& name);
diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h
index aa65f2110f..74aa3a8c8f 100644
--- a/be/src/olap/tablet.h
+++ b/be/src/olap/tablet.h
@@ -296,8 +296,6 @@ public:
RowsetSharedPtr pick_cooldown_rowset();
- bool need_cooldown();
-
bool need_cooldown(int64_t* cooldown_timestamp, size_t* file_size);
// Physically remove remote rowsets.
diff --git a/be/src/util/s3_util.cpp b/be/src/util/s3_util.cpp
index bf47b4656f..8a42e9f6ba 100644
--- a/be/src/util/s3_util.cpp
+++ b/be/src/util/s3_util.cpp
@@ -96,4 +96,32 @@ std::shared_ptr<Aws::S3::S3Client> ClientFactory::create(
Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never, use_virtual_addressing);
}
+bool ClientFactory::is_s3_conf_valid(const S3Conf& s3_conf) {
+ return !s3_conf.ak.empty() && !s3_conf.sk.empty() && !s3_conf.endpoint.empty();
+}
+
+std::shared_ptr<Aws::S3::S3Client> ClientFactory::create(const S3Conf& s3_conf) {
+ if (!is_s3_conf_valid(s3_conf)) {
+ return nullptr;
+ }
+ Aws::Auth::AWSCredentials aws_cred(s3_conf.ak, s3_conf.sk);
+ DCHECK(!aws_cred.IsExpiredOrEmpty());
+
+ Aws::Client::ClientConfiguration aws_config;
+ aws_config.endpointOverride = s3_conf.endpoint;
+ aws_config.region = s3_conf.region;
+ if (s3_conf.max_connections > 0) {
+ aws_config.maxConnections = s3_conf.max_connections;
+ }
+ if (s3_conf.request_timeout_ms > 0) {
+ aws_config.requestTimeoutMs = s3_conf.request_timeout_ms;
+ }
+ if (s3_conf.connect_timeout_ms > 0) {
+ aws_config.connectTimeoutMs = s3_conf.connect_timeout_ms;
+ }
+ return std::make_shared<Aws::S3::S3Client>(
+ std::move(aws_cred), std::move(aws_config),
+ Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never);
+}
+
} // end namespace doris
diff --git a/be/src/util/s3_util.h b/be/src/util/s3_util.h
index fd4dd17b75..74c723bbec 100644
--- a/be/src/util/s3_util.h
+++ b/be/src/util/s3_util.h
@@ -39,6 +39,29 @@ const static std::string S3_MAX_CONN_SIZE = "AWS_MAX_CONN_SIZE";
const static std::string S3_REQUEST_TIMEOUT_MS = "AWS_REQUEST_TIMEOUT_MS";
const static std::string S3_CONN_TIMEOUT_MS = "AWS_CONN_TIMEOUT_MS";
+struct S3Conf {
+ std::string ak;
+ std::string sk;
+ std::string endpoint;
+ std::string region;
+ std::string bucket;
+ std::string prefix;
+ int max_connections = -1;
+ int request_timeout_ms = -1;
+ int connect_timeout_ms = -1;
+
+ std::string to_string() const;
+};
+
+inline std::string S3Conf::to_string() const {
+ std::stringstream ss;
+ ss << "ak: " << ak << ", sk: " << sk << ", endpoint: " << endpoint << ", region: " << region
+ << ", bucket: " << bucket << ", prefix: " << prefix
+ << ", max_connections: " << max_connections << ", request_timeout_ms: " << request_timeout_ms
+ << ", connect_timeout_ms: " << connect_timeout_ms;
+ return ss.str();
+}
+
class ClientFactory {
public:
~ClientFactory();
@@ -47,13 +70,16 @@ public:
std::shared_ptr<Aws::S3::S3Client> create(const std::map<std::string, std::string>& prop);
+ std::shared_ptr<Aws::S3::S3Client> create(const S3Conf& s3_conf);
+
static bool is_s3_conf_valid(const std::map<std::string, std::string>& prop);
+ static bool is_s3_conf_valid(const S3Conf& s3_conf);
+
private:
ClientFactory();
Aws::SDKOptions _aws_options;
};
-std::unique_ptr<Aws::S3::S3Client> create_client(const std::map<std::string, std::string>& prop);
} // end namespace doris
diff --git a/be/test/CMakeLists.txt b/be/test/CMakeLists.txt
index cdc39b7436..9d2f97e7cb 100644
--- a/be/test/CMakeLists.txt
+++ b/be/test/CMakeLists.txt
@@ -194,7 +194,6 @@ set(OLAP_TEST_FILES
# olap/memtable_flush_executor_test.cpp
# olap/push_handler_test.cpp
olap/tablet_cooldown_test.cpp
- olap/tablet_clone_test.cpp
)
set(RUNTIME_TEST_FILES
diff --git a/be/test/olap/rowset/beta_rowset_test.cpp b/be/test/olap/rowset/beta_rowset_test.cpp
index d0b4a785f6..3607272e9b 100644
--- a/be/test/olap/rowset/beta_rowset_test.cpp
+++ b/be/test/olap/rowset/beta_rowset_test.cpp
@@ -421,15 +421,16 @@ class S3ClientMockGetErrorData : public S3ClientMock {
TEST_F(BetaRowsetTest, ReadTest) {
RowsetMetaSharedPtr rowset_meta = std::make_shared<RowsetMeta>();
BetaRowset rowset(nullptr, "", rowset_meta);
- std::map<std::string, std::string> properties {
- {"AWS_ACCESS_KEY", "ak"},
- {"AWS_SECRET_KEY", "ak"},
- {"AWS_ENDPOINT", "endpoint"},
- {"AWS_REGION", "region"},
- };
+ S3Conf s3_conf;
+ s3_conf.ak = "ak";
+ s3_conf.sk = "sk";
+ s3_conf.endpoint = "endpoint";
+ s3_conf.region = "region";
+ s3_conf.bucket = "bucket";
+ s3_conf.prefix = "prefix";
io::ResourceId resource_id = "test_resourse_id";
std::shared_ptr<io::S3FileSystem> fs =
- std::make_shared<io::S3FileSystem>(properties, "bucket", "test prefix", resource_id);
+ std::make_shared<io::S3FileSystem>(std::move(s3_conf), resource_id);
Aws::SDKOptions aws_options = Aws::SDKOptions {};
Aws::InitAPI(aws_options);
// failed to head object
diff --git a/be/test/olap/tablet_clone_test.cpp b/be/test/olap/tablet_clone_test.cpp
index 64490d0011..135ed47c77 100644
--- a/be/test/olap/tablet_clone_test.cpp
+++ b/be/test/olap/tablet_clone_test.cpp
@@ -54,10 +54,15 @@ static const std::string PREFIX = "prefix";
class TabletCloneTest : public testing::Test {
public:
static void SetUpTestSuite() {
- std::map<std::string, std::string> properties = {
- {S3_AK, AK}, {S3_SK, SK}, {S3_ENDPOINT, ENDPOINT}, {S3_REGION, REGION}};
- auto s3_fs = std::make_shared<io::S3FileSystem>(properties, BUCKET, PREFIX, kResourceId);
- s3_fs->connect();
+ S3Conf s3_conf;
+ s3_conf.ak = AK;
+ s3_conf.sk = SK;
+ s3_conf.endpoint = ENDPOINT;
+ s3_conf.region = REGION;
+ s3_conf.bucket = BUCKET;
+ s3_conf.prefix = PREFIX;
+ auto s3_fs = std::make_shared<io::S3FileSystem>(std::move(s3_conf), kResourceId);
+ ASSERT_TRUE(s3_fs->connect().ok());
io::FileSystemMap::instance()->insert(kResourceId, s3_fs);
config::storage_root_path = kTestDir;
diff --git a/be/test/olap/tablet_cooldown_test.cpp b/be/test/olap/tablet_cooldown_test.cpp
index d4d8acbbb0..a7308062dd 100644
--- a/be/test/olap/tablet_cooldown_test.cpp
+++ b/be/test/olap/tablet_cooldown_test.cpp
@@ -51,27 +51,31 @@ static const std::string PREFIX = "tablet_cooldown_test";
class TabletCooldownTest : public testing::Test {
public:
static void SetUpTestSuite() {
- std::map<std::string, std::string> properties = {
- {S3_AK, AK}, {S3_SK, SK}, {S3_ENDPOINT, ENDPOINT}, {S3_REGION, REGION}};
- auto s3_fs = std::make_shared<io::S3FileSystem>(properties, BUCKET, PREFIX, kResourceId);
- s3_fs->connect();
+ S3Conf s3_conf;
+ s3_conf.ak = AK;
+ s3_conf.sk = SK;
+ s3_conf.endpoint = ENDPOINT;
+ s3_conf.region = REGION;
+ s3_conf.bucket = BUCKET;
+ s3_conf.prefix = PREFIX;
+ auto s3_fs = std::make_shared<io::S3FileSystem>(std::move(s3_conf), kResourceId);
+ ASSERT_TRUE(s3_fs->connect().ok());
io::FileSystemMap::instance()->insert(kResourceId, s3_fs);
- config::storage_root_path = kTestDir;
+ constexpr uint32_t MAX_PATH_LEN = 1024;
+ char buffer[MAX_PATH_LEN];
+ EXPECT_NE(getcwd(buffer, MAX_PATH_LEN), nullptr);
+ config::storage_root_path = std::string(buffer) + "/" + kTestDir;
config::min_file_descriptor_number = 1000;
- FileUtils::remove_all(kTestDir);
- FileUtils::create_dir(kTestDir);
+ FileUtils::remove_all(config::storage_root_path);
+ FileUtils::create_dir(config::storage_root_path);
- std::vector<StorePath> paths {{kTestDir, -1}};
+ std::vector<StorePath> paths {{config::storage_root_path, -1}};
EngineOptions options;
options.store_paths = paths;
-
- ExecEnv::GetInstance()->_storage_policy_mgr = new StoragePolicyMgr();
-
doris::StorageEngine::open(options, &k_engine);
- k_engine->start_bg_threads();
}
static void TearDownTestSuite() {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org