You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by yi...@apache.org on 2022/07/23 08:06:34 UTC

[doris] branch master updated: [fix] Move s3 fs connect outside the lock critical area (#11026)

This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 0b6d2ae290 [fix] Move s3 fs connect outside the lock critical area (#11026)
0b6d2ae290 is described below

commit 0b6d2ae290eb5f88a229988638d5fc84eabe3a06
Author: plat1ko <pl...@gmail.com>
AuthorDate: Sat Jul 23 16:06:29 2022 +0800

    [fix] Move s3 fs connect outside the lock critical area (#11026)
    
    * fix potential bug of S3FileSystem
    
    * move s3 fs connect outside the lock critical area
---
 be/src/agent/task_worker_pool.cpp        |  4 +-
 be/src/io/fs/s3_common.h                 |  2 +
 be/src/io/fs/s3_file_reader.cpp          |  3 +
 be/src/io/fs/s3_file_system.cpp          | 93 ++++++++++++++++--------------
 be/src/io/fs/s3_file_system.h            | 12 ++--
 be/src/olap/storage_policy_mgr.cpp       | 97 +++++++++++++++++---------------
 be/src/olap/storage_policy_mgr.h         |  6 +-
 be/src/olap/tablet.h                     |  2 -
 be/src/util/s3_util.cpp                  | 28 +++++++++
 be/src/util/s3_util.h                    | 28 ++++++++-
 be/test/CMakeLists.txt                   |  1 -
 be/test/olap/rowset/beta_rowset_test.cpp | 15 ++---
 be/test/olap/tablet_clone_test.cpp       | 13 +++--
 be/test/olap/tablet_cooldown_test.cpp    | 28 +++++----
 14 files changed, 206 insertions(+), 126 deletions(-)

diff --git a/be/src/agent/task_worker_pool.cpp b/be/src/agent/task_worker_pool.cpp
index dafa6be519..1380331e7a 100644
--- a/be/src/agent/task_worker_pool.cpp
+++ b/be/src/agent/task_worker_pool.cpp
@@ -1753,7 +1753,7 @@ void TaskWorkerPool::_storage_refresh_storage_policy_worker_thread_callback() {
                 policy_ptr->md5_sum = iter.md5_checksum;
 
                 LOG_EVERY_N(INFO, 12) << "refresh storage policy task, policy " << *policy_ptr;
-                spm->periodic_put(iter.policy_name, std::move(policy_ptr));
+                spm->periodic_put(iter.policy_name, policy_ptr);
             }
         }
     }
@@ -1796,7 +1796,7 @@ void TaskWorkerPool::_storage_update_storage_policy_worker_thread_callback() {
 
         LOG(INFO) << "get storage update policy task, update policy " << *policy_ptr;
 
-        spm->update(get_storage_policy_req.policy_name, std::move(policy_ptr));
+        spm->update(get_storage_policy_req.policy_name, policy_ptr);
         _remove_task_info(agent_task_req.task_type, agent_task_req.signature);
     }
 }
diff --git a/be/src/io/fs/s3_common.h b/be/src/io/fs/s3_common.h
index 41ff04ee3a..2f42045522 100644
--- a/be/src/io/fs/s3_common.h
+++ b/be/src/io/fs/s3_common.h
@@ -15,6 +15,8 @@
 // specific language governing permissions and limitations
 // under the License.
 
+#pragma once
+
 #include <aws/core/utils/memory/stl/AWSStreamFwd.h>
 #include <aws/core/utils/stream/PreallocatedStreamBuf.h>
 
diff --git a/be/src/io/fs/s3_file_reader.cpp b/be/src/io/fs/s3_file_reader.cpp
index fa5d36e38c..a75da54bb0 100644
--- a/be/src/io/fs/s3_file_reader.cpp
+++ b/be/src/io/fs/s3_file_reader.cpp
@@ -70,6 +70,9 @@ Status S3FileReader::read_at(size_t offset, Slice result, size_t* bytes_read) {
     request.SetResponseStreamFactory(AwsWriteableStreamFactory(to, bytes_req));
 
     auto client = _fs->get_client();
+    if (!client) {
+        return Status::InternalError("init s3 client error");
+    }
     auto outcome = client->GetObject(request);
     if (!outcome.IsSuccess()) {
         return Status::IOError(fmt::format("failed to read from {}: {}", _path.native(),
diff --git a/be/src/io/fs/s3_file_system.cpp b/be/src/io/fs/s3_file_system.cpp
index 53be081e3f..e421863774 100644
--- a/be/src/io/fs/s3_file_system.cpp
+++ b/be/src/io/fs/s3_file_system.cpp
@@ -37,14 +37,18 @@
 namespace doris {
 namespace io {
 
-S3FileSystem::S3FileSystem(const std::map<std::string, std::string>& properties, std::string bucket,
-                           std::string prefix, ResourceId resource_id)
-        : RemoteFileSystem(Path(properties.at(S3_ENDPOINT)) / bucket / prefix,
-                           std::move(resource_id), FileSystemType::S3),
-          _properties(properties),
-          _bucket(std::move(bucket)),
-          _prefix(std::move(prefix)) {
-    _endpoint = properties.at(S3_ENDPOINT);
+#ifndef CHECK_S3_CLIENT
+#define CHECK_S3_CLIENT(client)                               \
+    if (!client) {                                            \
+        return Status::InternalError("init s3 client error"); \
+    }
+#endif
+
+S3FileSystem::S3FileSystem(S3Conf s3_conf, ResourceId resource_id)
+        : RemoteFileSystem(
+                  fmt::format("{}/{}/{}", s3_conf.endpoint, s3_conf.bucket, s3_conf.prefix),
+                  std::move(resource_id), FileSystemType::S3),
+          _s3_conf(std::move(s3_conf)) {
     _executor = Aws::MakeShared<Aws::Utils::Threading::PooledThreadExecutor>(
             resource_id.c_str(), config::s3_transfer_executor_pool_size);
 }
@@ -53,13 +57,16 @@ S3FileSystem::~S3FileSystem() = default;
 
 Status S3FileSystem::connect() {
     std::lock_guard lock(_client_mu);
-    _client = ClientFactory::instance().create(_properties);
+    _client = ClientFactory::instance().create(_s3_conf);
+    if (!_client) {
+        return Status::InternalError("failed to init s3 client with {}", _s3_conf.to_string());
+    }
     return Status::OK();
 }
 
 Status S3FileSystem::upload(const Path& local_path, const Path& dest_path) {
     auto client = get_client();
-    DCHECK(client);
+    CHECK_S3_CLIENT(client);
 
     Aws::Transfer::TransferManagerConfiguration transfer_config(_executor.get());
     transfer_config.s3Client = client;
@@ -68,22 +75,23 @@ Status S3FileSystem::upload(const Path& local_path, const Path& dest_path) {
     auto start = std::chrono::steady_clock::now();
 
     auto key = get_key(dest_path);
-    auto handle = transfer_manager->UploadFile(local_path.native(), _bucket, key, "text/plain",
-                                               Aws::Map<Aws::String, Aws::String>());
+    auto handle = transfer_manager->UploadFile(local_path.native(), _s3_conf.bucket, key,
+                                               "text/plain", Aws::Map<Aws::String, Aws::String>());
     handle->WaitUntilFinished();
 
     auto duration = std::chrono::duration<float>(std::chrono::steady_clock::now() - start);
 
     if (handle->GetStatus() != Aws::Transfer::TransferStatus::COMPLETED) {
-        return Status::IOError(fmt::format("failed to upload(endpoint={}, bucket={}, key={}): {}",
-                                           _endpoint, _bucket, key,
-                                           handle->GetLastError().GetMessage()));
+        return Status::IOError("failed to upload(endpoint={}, bucket={}, key={}): {}",
+                               _s3_conf.endpoint, _s3_conf.bucket, key,
+                               handle->GetLastError().GetMessage());
     }
 
     auto file_size = std::filesystem::file_size(local_path);
-    LOG(INFO) << "Upload " << local_path.native() << " to s3, endpoint=" << _endpoint
-              << ", bucket=" << _bucket << ", key=" << key << ", duration=" << duration.count()
-              << ", capacity=" << file_size << ", tp=" << (file_size) / duration.count();
+    LOG(INFO) << "Upload " << local_path.native() << " to s3, endpoint=" << _s3_conf.endpoint
+              << ", bucket=" << _s3_conf.bucket << ", key=" << key
+              << ", duration=" << duration.count() << ", capacity=" << file_size
+              << ", tp=" << (file_size) / duration.count();
 
     return Status::OK();
 }
@@ -91,7 +99,7 @@ Status S3FileSystem::upload(const Path& local_path, const Path& dest_path) {
 Status S3FileSystem::batch_upload(const std::vector<Path>& local_paths,
                                   const std::vector<Path>& dest_paths) {
     auto client = get_client();
-    DCHECK(client);
+    CHECK_S3_CLIENT(client);
 
     if (local_paths.size() != dest_paths.size()) {
         return Status::InvalidArgument("local_paths.size() != dest_paths.size()");
@@ -105,10 +113,11 @@ Status S3FileSystem::batch_upload(const std::vector<Path>& local_paths,
     for (int i = 0; i < local_paths.size(); ++i) {
         auto key = get_key(dest_paths[i]);
         LOG(INFO) << "Start to upload " << local_paths[i].native()
-                  << " to s3, endpoint=" << _endpoint << ", bucket=" << _bucket << ", key=" << key;
+                  << " to s3, endpoint=" << _s3_conf.endpoint << ", bucket=" << _s3_conf.bucket
+                  << ", key=" << key;
         auto handle =
-                transfer_manager->UploadFile(local_paths[i].native(), _bucket, key, "text/plain",
-                                             Aws::Map<Aws::String, Aws::String>());
+                transfer_manager->UploadFile(local_paths[i].native(), _s3_conf.bucket, key,
+                                             "text/plain", Aws::Map<Aws::String, Aws::String>());
         handles.push_back(std::move(handle));
     }
     for (auto& handle : handles) {
@@ -129,25 +138,25 @@ Status S3FileSystem::open_file(const Path& path, FileReaderSPtr* reader) {
     size_t fsize = 0;
     RETURN_IF_ERROR(file_size(path, &fsize));
     auto key = get_key(path);
-    auto fs_path = Path(_endpoint) / _bucket / key;
-    *reader = std::make_unique<S3FileReader>(std::move(fs_path), fsize, std::move(key), _bucket,
-                                             this);
+    auto fs_path = Path(_s3_conf.endpoint) / _s3_conf.bucket / key;
+    *reader = std::make_unique<S3FileReader>(std::move(fs_path), fsize, std::move(key),
+                                             _s3_conf.bucket, this);
     return Status::OK();
 }
 
 Status S3FileSystem::delete_file(const Path& path) {
     auto client = get_client();
-    DCHECK(client);
+    CHECK_S3_CLIENT(client);
 
     Aws::S3::Model::DeleteObjectRequest request;
     auto key = get_key(path);
-    request.WithBucket(_bucket).WithKey(key);
+    request.WithBucket(_s3_conf.bucket).WithKey(key);
 
     auto outcome = client->DeleteObject(request);
     if (!outcome.IsSuccess()) {
-        return Status::IOError(
-                fmt::format("failed to delete object(endpoint={}, bucket={}, key={}): {}",
-                            _endpoint, _bucket, key, outcome.GetError().GetMessage()));
+        return Status::IOError("failed to delete object(endpoint={}, bucket={}, key={}): {}",
+                               _s3_conf.endpoint, _s3_conf.bucket, key,
+                               outcome.GetError().GetMessage());
     }
     return Status::OK();
 }
@@ -166,11 +175,11 @@ Status S3FileSystem::link_file(const Path& src, const Path& dest) {
 
 Status S3FileSystem::exists(const Path& path, bool* res) const {
     auto client = get_client();
-    DCHECK(client);
+    CHECK_S3_CLIENT(client);
 
     Aws::S3::Model::HeadObjectRequest request;
     auto key = get_key(path);
-    request.WithBucket(_bucket).WithKey(key);
+    request.WithBucket(_s3_conf.bucket).WithKey(key);
 
     auto outcome = _client->HeadObject(request);
     if (outcome.IsSuccess()) {
@@ -178,28 +187,28 @@ Status S3FileSystem::exists(const Path& path, bool* res) const {
     } else if (outcome.GetError().GetResponseCode() == Aws::Http::HttpResponseCode::NOT_FOUND) {
         *res = false;
     } else {
-        return Status::IOError(
-                fmt::format("failed to get object head(endpoint={}, bucket={}, key={}): {}",
-                            _endpoint, _bucket, key, outcome.GetError().GetMessage()));
+        return Status::IOError("failed to get object head(endpoint={}, bucket={}, key={}): {}",
+                               _s3_conf.endpoint, _s3_conf.bucket, key,
+                               outcome.GetError().GetMessage());
     }
     return Status::OK();
 }
 
 Status S3FileSystem::file_size(const Path& path, size_t* file_size) const {
     auto client = get_client();
-    DCHECK(client);
+    CHECK_S3_CLIENT(client);
 
     Aws::S3::Model::HeadObjectRequest request;
     auto key = get_key(path);
-    request.WithBucket(_bucket).WithKey(key);
+    request.WithBucket(_s3_conf.bucket).WithKey(key);
 
     auto outcome = _client->HeadObject(request);
     if (outcome.IsSuccess()) {
         *file_size = outcome.GetResult().GetContentLength();
     } else {
-        return Status::IOError(
-                fmt::format("failed to get object size(endpoint={}, bucket={}, key={}): {}",
-                            _endpoint, _bucket, key, outcome.GetError().GetMessage()));
+        return Status::IOError("failed to get object size(endpoint={}, bucket={}, key={}): {}",
+                               _s3_conf.endpoint, _s3_conf.bucket, key,
+                               outcome.GetError().GetMessage());
     }
     return Status::OK();
 }
@@ -211,10 +220,10 @@ Status S3FileSystem::list(const Path& path, std::vector<Path>* files) {
 std::string S3FileSystem::get_key(const Path& path) const {
     StringPiece str(path.native());
     if (str.starts_with(_root_path.native())) {
-        return fmt::format("{}/{}", _prefix, str.data() + _root_path.native().size());
+        return fmt::format("{}/{}", _s3_conf.prefix, str.data() + _root_path.native().size());
     }
     // We consider it as a relative path.
-    return fmt::format("{}/{}", _prefix, path.native());
+    return fmt::format("{}/{}", _s3_conf.prefix, path.native());
 }
 
 } // namespace io
diff --git a/be/src/io/fs/s3_file_system.h b/be/src/io/fs/s3_file_system.h
index fdfbe2f8ba..219ecb3a0d 100644
--- a/be/src/io/fs/s3_file_system.h
+++ b/be/src/io/fs/s3_file_system.h
@@ -35,8 +35,7 @@ namespace io {
 // This class is thread-safe.(Except `set_xxx` method)
 class S3FileSystem final : public RemoteFileSystem {
 public:
-    S3FileSystem(const std::map<std::string, std::string>& properties, std::string bucket,
-                 std::string prefix, ResourceId resource_id);
+    S3FileSystem(S3Conf s3_conf, ResourceId resource_id);
     ~S3FileSystem() override;
 
     Status create_file(const Path& path, FileWriterPtr* writer) override;
@@ -70,19 +69,16 @@ public:
     };
 
     // Guarded by external lock.
-    void set_ak(std::string ak) { _properties[S3_AK] = std::move(ak); }
+    void set_ak(std::string ak) { _s3_conf.ak = std::move(ak); }
 
     // Guarded by external lock.
-    void set_sk(std::string sk) { _properties[S3_SK] = std::move(sk); }
+    void set_sk(std::string sk) { _s3_conf.sk = std::move(sk); }
 
 private:
     std::string get_key(const Path& path) const;
 
 private:
-    std::map<std::string, std::string> _properties;
-    std::string _endpoint;
-    std::string _bucket;
-    std::string _prefix;
+    S3Conf _s3_conf;
 
     // FIXME(cyx): We can use std::atomic<std::shared_ptr> since c++20.
     std::shared_ptr<Aws::S3::S3Client> _client;
diff --git a/be/src/olap/storage_policy_mgr.cpp b/be/src/olap/storage_policy_mgr.cpp
index f91cac76e0..57da5408f9 100644
--- a/be/src/olap/storage_policy_mgr.cpp
+++ b/be/src/olap/storage_policy_mgr.cpp
@@ -24,53 +24,62 @@
 
 namespace doris {
 
-void StoragePolicyMgr::update(const std::string& name, StoragePolicyPtr policy) {
-    std::lock_guard<std::mutex> l(_mutex);
-    auto it = _policy_map.find(name);
-    if (it != _policy_map.end()) {
-        // just support change ak, sk, cooldown_ttl, cooldown_datetime
-        LOG(INFO) << "update storage policy name: " << name;
-        auto s3_fs = std::dynamic_pointer_cast<io::S3FileSystem>(
-                io::FileSystemMap::instance()->get(name));
-        DCHECK(s3_fs);
-        s3_fs->set_ak(policy->s3_ak);
-        s3_fs->set_sk(policy->s3_sk);
-        s3_fs->connect();
-        it->second = std::move(policy);
-    } else {
-        // can't find name's policy, so do nothing.
+void StoragePolicyMgr::update(const std::string& name, const StoragePolicyPtr& policy) {
+    std::shared_ptr<io::S3FileSystem> s3_fs;
+    {
+        std::lock_guard<std::mutex> l(_mutex);
+        auto it = _policy_map.find(name);
+        if (it != _policy_map.end()) {
+            LOG(INFO) << "update storage policy name: " << name;
+            it->second = policy;
+            s3_fs = std::dynamic_pointer_cast<io::S3FileSystem>(
+                    io::FileSystemMap::instance()->get(name));
+            DCHECK(s3_fs);
+            s3_fs->set_ak(policy->s3_ak);
+            s3_fs->set_sk(policy->s3_sk);
+        }
+    }
+    if (s3_fs) {
+        auto st = s3_fs->connect();
+        if (!st.ok()) {
+            LOG(ERROR) << st;
+        }
     }
 }
 
-void StoragePolicyMgr::periodic_put(const std::string& name, StoragePolicyPtr policy) {
-    std::lock_guard<std::mutex> l(_mutex);
-    auto it = _policy_map.find(name);
-    if (it == _policy_map.end()) {
-        LOG(INFO) << "add storage policy name: " << name << " to map";
-        std::map<std::string, std::string> aws_properties = {
-                {S3_AK, policy->s3_ak},
-                {S3_SK, policy->s3_sk},
-                {S3_ENDPOINT, policy->s3_endpoint},
-                {S3_REGION, policy->s3_region},
-                {S3_MAX_CONN_SIZE, std::to_string(policy->s3_max_conn)},
-                {S3_REQUEST_TIMEOUT_MS, std::to_string(policy->s3_request_timeout_ms)},
-                {S3_CONN_TIMEOUT_MS, std::to_string(policy->s3_conn_timeout_ms)}};
-        auto s3_fs = std::make_shared<io::S3FileSystem>(aws_properties, policy->bucket,
-                                                        policy->root_path, name);
-        s3_fs->connect();
-        io::FileSystemMap::instance()->insert(name, std::move(s3_fs));
-        _policy_map.emplace(name, std::move(policy));
-    } else if (it->second->md5_sum != policy->md5_sum) {
-        // fe change policy
-        // just support change ak, sk, cooldown_ttl, cooldown_datetime
-        LOG(INFO) << "update storage policy name: " << name;
-        auto s3_fs = std::dynamic_pointer_cast<io::S3FileSystem>(
-                io::FileSystemMap::instance()->get(name));
-        DCHECK(s3_fs);
-        s3_fs->set_ak(policy->s3_ak);
-        s3_fs->set_sk(policy->s3_sk);
-        s3_fs->connect();
-        it->second = std::move(policy);
+void StoragePolicyMgr::periodic_put(const std::string& name, const StoragePolicyPtr& policy) {
+    std::shared_ptr<io::S3FileSystem> s3_fs;
+    {
+        std::lock_guard<std::mutex> l(_mutex);
+        auto it = _policy_map.find(name);
+        if (it == _policy_map.end()) {
+            LOG(INFO) << "add storage policy name: " << name << " to map";
+            S3Conf s3_conf;
+            s3_conf.ak = policy->s3_ak;
+            s3_conf.sk = policy->s3_sk;
+            s3_conf.endpoint = policy->s3_endpoint;
+            s3_conf.region = policy->s3_region;
+            s3_conf.max_connections = policy->s3_max_conn;
+            s3_conf.request_timeout_ms = policy->s3_request_timeout_ms;
+            s3_conf.connect_timeout_ms = policy->s3_conn_timeout_ms;
+            s3_fs = std::make_shared<io::S3FileSystem>(std::move(s3_conf), name);
+            io::FileSystemMap::instance()->insert(name, s3_fs);
+            _policy_map.emplace(name, policy);
+        } else if (it->second->md5_sum != policy->md5_sum) {
+            LOG(INFO) << "update storage policy name: " << name;
+            it->second = policy;
+            s3_fs = std::dynamic_pointer_cast<io::S3FileSystem>(
+                    io::FileSystemMap::instance()->get(name));
+            DCHECK(s3_fs);
+            s3_fs->set_ak(policy->s3_ak);
+            s3_fs->set_sk(policy->s3_sk);
+        }
+    }
+    if (s3_fs) {
+        auto st = s3_fs->connect();
+        if (!st.ok()) {
+            LOG(ERROR) << st;
+        }
     }
 }
 
diff --git a/be/src/olap/storage_policy_mgr.h b/be/src/olap/storage_policy_mgr.h
index 81fa15e00d..2da2c29f2d 100644
--- a/be/src/olap/storage_policy_mgr.h
+++ b/be/src/olap/storage_policy_mgr.h
@@ -58,15 +58,15 @@ class ExecEnv;
 class StoragePolicyMgr {
 public:
     using StoragePolicyPtr = std::shared_ptr<StoragePolicy>;
-    StoragePolicyMgr() {}
+    StoragePolicyMgr() = default;
 
     ~StoragePolicyMgr() = default;
 
     // fe push update policy to be
-    void update(const std::string& name, StoragePolicyPtr policy);
+    void update(const std::string& name, const StoragePolicyPtr& policy);
 
     // periodic pull from fe
-    void periodic_put(const std::string& name, StoragePolicyPtr policy);
+    void periodic_put(const std::string& name, const StoragePolicyPtr& policy);
 
     StoragePolicyPtr get(const std::string& name);
 
diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h
index aa65f2110f..74aa3a8c8f 100644
--- a/be/src/olap/tablet.h
+++ b/be/src/olap/tablet.h
@@ -296,8 +296,6 @@ public:
 
     RowsetSharedPtr pick_cooldown_rowset();
 
-    bool need_cooldown();
-
     bool need_cooldown(int64_t* cooldown_timestamp, size_t* file_size);
 
     // Physically remove remote rowsets.
diff --git a/be/src/util/s3_util.cpp b/be/src/util/s3_util.cpp
index bf47b4656f..8a42e9f6ba 100644
--- a/be/src/util/s3_util.cpp
+++ b/be/src/util/s3_util.cpp
@@ -96,4 +96,32 @@ std::shared_ptr<Aws::S3::S3Client> ClientFactory::create(
             Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never, use_virtual_addressing);
 }
 
+bool ClientFactory::is_s3_conf_valid(const S3Conf& s3_conf) {
+    return !s3_conf.ak.empty() && !s3_conf.sk.empty() && !s3_conf.endpoint.empty();
+}
+
+std::shared_ptr<Aws::S3::S3Client> ClientFactory::create(const S3Conf& s3_conf) {
+    if (!is_s3_conf_valid(s3_conf)) {
+        return nullptr;
+    }
+    Aws::Auth::AWSCredentials aws_cred(s3_conf.ak, s3_conf.sk);
+    DCHECK(!aws_cred.IsExpiredOrEmpty());
+
+    Aws::Client::ClientConfiguration aws_config;
+    aws_config.endpointOverride = s3_conf.endpoint;
+    aws_config.region = s3_conf.region;
+    if (s3_conf.max_connections > 0) {
+        aws_config.maxConnections = s3_conf.max_connections;
+    }
+    if (s3_conf.request_timeout_ms > 0) {
+        aws_config.requestTimeoutMs = s3_conf.request_timeout_ms;
+    }
+    if (s3_conf.connect_timeout_ms > 0) {
+        aws_config.connectTimeoutMs = s3_conf.connect_timeout_ms;
+    }
+    return std::make_shared<Aws::S3::S3Client>(
+            std::move(aws_cred), std::move(aws_config),
+            Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never);
+}
+
 } // end namespace doris
diff --git a/be/src/util/s3_util.h b/be/src/util/s3_util.h
index fd4dd17b75..74c723bbec 100644
--- a/be/src/util/s3_util.h
+++ b/be/src/util/s3_util.h
@@ -39,6 +39,29 @@ const static std::string S3_MAX_CONN_SIZE = "AWS_MAX_CONN_SIZE";
 const static std::string S3_REQUEST_TIMEOUT_MS = "AWS_REQUEST_TIMEOUT_MS";
 const static std::string S3_CONN_TIMEOUT_MS = "AWS_CONN_TIMEOUT_MS";
 
+struct S3Conf {
+    std::string ak;
+    std::string sk;
+    std::string endpoint;
+    std::string region;
+    std::string bucket;
+    std::string prefix;
+    int max_connections = -1;
+    int request_timeout_ms = -1;
+    int connect_timeout_ms = -1;
+
+    std::string to_string() const;
+};
+
+inline std::string S3Conf::to_string() const {
+    std::stringstream ss;
+    ss << "ak: " << ak << ", sk: " << sk << ", endpoint: " << endpoint << ", region: " << region
+       << ", bucket: " << bucket << ", prefix: " << prefix
+       << ", max_connections: " << max_connections << ", request_timeout_ms: " << request_timeout_ms
+       << ", connect_timeout_ms: " << connect_timeout_ms;
+    return ss.str();
+}
+
 class ClientFactory {
 public:
     ~ClientFactory();
@@ -47,13 +70,16 @@ public:
 
     std::shared_ptr<Aws::S3::S3Client> create(const std::map<std::string, std::string>& prop);
 
+    std::shared_ptr<Aws::S3::S3Client> create(const S3Conf& s3_conf);
+
     static bool is_s3_conf_valid(const std::map<std::string, std::string>& prop);
 
+    static bool is_s3_conf_valid(const S3Conf& s3_conf);
+
 private:
     ClientFactory();
 
     Aws::SDKOptions _aws_options;
 };
-std::unique_ptr<Aws::S3::S3Client> create_client(const std::map<std::string, std::string>& prop);
 
 } // end namespace doris
diff --git a/be/test/CMakeLists.txt b/be/test/CMakeLists.txt
index cdc39b7436..9d2f97e7cb 100644
--- a/be/test/CMakeLists.txt
+++ b/be/test/CMakeLists.txt
@@ -194,7 +194,6 @@ set(OLAP_TEST_FILES
     # olap/memtable_flush_executor_test.cpp
     # olap/push_handler_test.cpp
     olap/tablet_cooldown_test.cpp
-    olap/tablet_clone_test.cpp
 )
 
 set(RUNTIME_TEST_FILES
diff --git a/be/test/olap/rowset/beta_rowset_test.cpp b/be/test/olap/rowset/beta_rowset_test.cpp
index d0b4a785f6..3607272e9b 100644
--- a/be/test/olap/rowset/beta_rowset_test.cpp
+++ b/be/test/olap/rowset/beta_rowset_test.cpp
@@ -421,15 +421,16 @@ class S3ClientMockGetErrorData : public S3ClientMock {
 TEST_F(BetaRowsetTest, ReadTest) {
     RowsetMetaSharedPtr rowset_meta = std::make_shared<RowsetMeta>();
     BetaRowset rowset(nullptr, "", rowset_meta);
-    std::map<std::string, std::string> properties {
-            {"AWS_ACCESS_KEY", "ak"},
-            {"AWS_SECRET_KEY", "ak"},
-            {"AWS_ENDPOINT", "endpoint"},
-            {"AWS_REGION", "region"},
-    };
+    S3Conf s3_conf;
+    s3_conf.ak = "ak";
+    s3_conf.sk = "sk";
+    s3_conf.endpoint = "endpoint";
+    s3_conf.region = "region";
+    s3_conf.bucket = "bucket";
+    s3_conf.prefix = "prefix";
     io::ResourceId resource_id = "test_resourse_id";
     std::shared_ptr<io::S3FileSystem> fs =
-            std::make_shared<io::S3FileSystem>(properties, "bucket", "test prefix", resource_id);
+            std::make_shared<io::S3FileSystem>(std::move(s3_conf), resource_id);
     Aws::SDKOptions aws_options = Aws::SDKOptions {};
     Aws::InitAPI(aws_options);
     // failed to head object
diff --git a/be/test/olap/tablet_clone_test.cpp b/be/test/olap/tablet_clone_test.cpp
index 64490d0011..135ed47c77 100644
--- a/be/test/olap/tablet_clone_test.cpp
+++ b/be/test/olap/tablet_clone_test.cpp
@@ -54,10 +54,15 @@ static const std::string PREFIX = "prefix";
 class TabletCloneTest : public testing::Test {
 public:
     static void SetUpTestSuite() {
-        std::map<std::string, std::string> properties = {
-                {S3_AK, AK}, {S3_SK, SK}, {S3_ENDPOINT, ENDPOINT}, {S3_REGION, REGION}};
-        auto s3_fs = std::make_shared<io::S3FileSystem>(properties, BUCKET, PREFIX, kResourceId);
-        s3_fs->connect();
+        S3Conf s3_conf;
+        s3_conf.ak = AK;
+        s3_conf.sk = SK;
+        s3_conf.endpoint = ENDPOINT;
+        s3_conf.region = REGION;
+        s3_conf.bucket = BUCKET;
+        s3_conf.prefix = PREFIX;
+        auto s3_fs = std::make_shared<io::S3FileSystem>(std::move(s3_conf), kResourceId);
+        ASSERT_TRUE(s3_fs->connect().ok());
         io::FileSystemMap::instance()->insert(kResourceId, s3_fs);
 
         config::storage_root_path = kTestDir;
diff --git a/be/test/olap/tablet_cooldown_test.cpp b/be/test/olap/tablet_cooldown_test.cpp
index d4d8acbbb0..a7308062dd 100644
--- a/be/test/olap/tablet_cooldown_test.cpp
+++ b/be/test/olap/tablet_cooldown_test.cpp
@@ -51,27 +51,31 @@ static const std::string PREFIX = "tablet_cooldown_test";
 class TabletCooldownTest : public testing::Test {
 public:
     static void SetUpTestSuite() {
-        std::map<std::string, std::string> properties = {
-                {S3_AK, AK}, {S3_SK, SK}, {S3_ENDPOINT, ENDPOINT}, {S3_REGION, REGION}};
-        auto s3_fs = std::make_shared<io::S3FileSystem>(properties, BUCKET, PREFIX, kResourceId);
-        s3_fs->connect();
+        S3Conf s3_conf;
+        s3_conf.ak = AK;
+        s3_conf.sk = SK;
+        s3_conf.endpoint = ENDPOINT;
+        s3_conf.region = REGION;
+        s3_conf.bucket = BUCKET;
+        s3_conf.prefix = PREFIX;
+        auto s3_fs = std::make_shared<io::S3FileSystem>(std::move(s3_conf), kResourceId);
+        ASSERT_TRUE(s3_fs->connect().ok());
         io::FileSystemMap::instance()->insert(kResourceId, s3_fs);
 
-        config::storage_root_path = kTestDir;
+        constexpr uint32_t MAX_PATH_LEN = 1024;
+        char buffer[MAX_PATH_LEN];
+        EXPECT_NE(getcwd(buffer, MAX_PATH_LEN), nullptr);
+        config::storage_root_path = std::string(buffer) + "/" + kTestDir;
         config::min_file_descriptor_number = 1000;
 
-        FileUtils::remove_all(kTestDir);
-        FileUtils::create_dir(kTestDir);
+        FileUtils::remove_all(config::storage_root_path);
+        FileUtils::create_dir(config::storage_root_path);
 
-        std::vector<StorePath> paths {{kTestDir, -1}};
+        std::vector<StorePath> paths {{config::storage_root_path, -1}};
 
         EngineOptions options;
         options.store_paths = paths;
-
-        ExecEnv::GetInstance()->_storage_policy_mgr = new StoragePolicyMgr();
-
         doris::StorageEngine::open(options, &k_engine);
-        k_engine->start_bg_threads();
     }
 
     static void TearDownTestSuite() {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org