You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2021/12/22 14:59:52 UTC
[incubator-doris] branch master updated: [feature-wip](remote storage)(step1) use a struct instead of string for parameter path, add basic remote method (#7098)
This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new 20ef8a6 [feature-wip](remote storage)(step1) use a struct instead of string for parameter path, add basic remote method (#7098)
20ef8a6 is described below
commit 20ef8a6e212aa3ddabe5f5a0f16ea36f7bc3e933
Author: pengxiangyu <di...@163.com>
AuthorDate: Wed Dec 22 22:58:23 2021 +0800
[feature-wip](remote storage)(step1) use a struct instead of string for parameter path, add basic remote method (#7098)
For the first, we need to make a parameter to discribe the data is local or remote.
At then, we need to support some basic function to support the operation for remote storage.
---
be/src/agent/task_worker_pool.cpp | 6 +-
be/src/common/config.h | 8 +
be/src/env/CMakeLists.txt | 2 +
be/src/{olap/fs/fs_util.cpp => env/env.cpp} | 43 +-
be/src/env/env.h | 121 +++++-
be/src/env/env_posix.cpp | 462 +++++++++++++--------
be/src/env/env_posix.h | 101 +++++
be/src/env/env_remote.cpp | 353 ++++++++++++++++
be/src/env/env_remote.h | 107 +++++
be/src/olap/base_tablet.cpp | 14 +-
be/src/olap/base_tablet.h | 8 +-
be/src/olap/compaction.cpp | 2 +-
be/src/olap/data_dir.cpp | 315 ++++++--------
be/src/olap/data_dir.h | 25 +-
be/src/olap/delta_writer.cpp | 2 +-
be/src/olap/fs/CMakeLists.txt | 1 +
be/src/olap/fs/block_manager.h | 17 +-
be/src/olap/fs/file_block_manager.cpp | 87 ++--
be/src/olap/fs/file_block_manager.h | 17 +-
be/src/olap/fs/fs_util.cpp | 30 +-
be/src/olap/fs/fs_util.h | 6 +-
be/src/olap/fs/remote_block_manager.cpp | 339 +++++++++++++++
be/src/olap/fs/remote_block_manager.h | 70 ++++
be/src/olap/olap_common.h | 3 +-
be/src/olap/olap_define.h | 1 +
be/src/olap/olap_server.cpp | 6 +
be/src/olap/options.h | 1 +
be/src/olap/push_handler.cpp | 4 +-
be/src/olap/rowset/alpha_rowset.cpp | 12 +-
be/src/olap/rowset/alpha_rowset.h | 4 +-
be/src/olap/rowset/alpha_rowset_writer.cpp | 10 +-
be/src/olap/rowset/beta_rowset.cpp | 106 +++--
be/src/olap/rowset/beta_rowset.h | 8 +-
be/src/olap/rowset/beta_rowset_writer.cpp | 21 +-
be/src/olap/rowset/rowset.cpp | 4 +-
be/src/olap/rowset/rowset.h | 11 +-
be/src/olap/rowset/rowset_converter.cpp | 14 +-
be/src/olap/rowset/rowset_converter.h | 8 +-
be/src/olap/rowset/rowset_factory.cpp | 6 +-
be/src/olap/rowset/rowset_factory.h | 2 +-
be/src/olap/rowset/rowset_writer_context.h | 3 +-
be/src/olap/rowset/segment_group.cpp | 4 +-
.../olap/rowset/segment_v2/bitmap_index_reader.cpp | 4 +-
.../olap/rowset/segment_v2/bitmap_index_reader.h | 6 +-
.../segment_v2/bloom_filter_index_reader.cpp | 2 +-
.../rowset/segment_v2/bloom_filter_index_reader.h | 6 +-
be/src/olap/rowset/segment_v2/column_reader.cpp | 28 +-
be/src/olap/rowset/segment_v2/column_reader.h | 6 +-
.../rowset/segment_v2/indexed_column_reader.cpp | 4 +-
.../olap/rowset/segment_v2/indexed_column_reader.h | 13 +-
.../olap/rowset/segment_v2/ordinal_page_index.cpp | 6 +-
be/src/olap/rowset/segment_v2/ordinal_page_index.h | 7 +-
be/src/olap/rowset/segment_v2/page_io.cpp | 2 +-
be/src/olap/rowset/segment_v2/segment.cpp | 38 +-
be/src/olap/rowset/segment_v2/segment.h | 9 +-
be/src/olap/rowset/segment_v2/segment_iterator.cpp | 4 +-
be/src/olap/rowset/segment_v2/zone_map_index.cpp | 2 +-
be/src/olap/rowset/segment_v2/zone_map_index.h | 7 +-
be/src/olap/schema_change.cpp | 6 +-
be/src/olap/snapshot_manager.cpp | 70 ++--
be/src/olap/snapshot_manager.h | 12 +-
be/src/olap/storage_engine.cpp | 13 +-
be/src/olap/tablet.cpp | 10 +-
be/src/olap/tablet_manager.cpp | 37 +-
be/src/olap/tablet_meta.cpp | 7 +-
be/src/olap/tablet_meta.h | 2 +-
be/src/olap/task/engine_clone_task.cpp | 6 +-
be/src/olap/task/engine_storage_migration_task.cpp | 9 +-
be/src/olap/utils.cpp | 116 ------
be/src/olap/utils.h | 4 -
be/src/runtime/snapshot_loader.cpp | 8 +-
be/src/service/backend_service.cpp | 6 +-
be/src/service/doris_main.cpp | 6 +
be/src/tools/meta_tool.cpp | 8 +-
be/src/util/broker_storage_backend.cpp | 28 +-
be/src/util/broker_storage_backend.h | 9 +-
be/src/util/coding.cpp | 1 -
be/src/util/coding.h | 1 -
be/src/util/file_utils.cpp | 106 +----
be/src/util/file_utils.h | 9 +-
be/src/util/path_util.cpp | 7 +
be/src/util/path_util.h | 4 +
be/src/util/s3_storage_backend.cpp | 112 ++++-
be/src/util/s3_storage_backend.h | 9 +-
be/src/util/s3_util.cpp | 25 +-
be/src/util/s3_util.h | 10 +
be/src/util/storage_backend.h | 9 +-
be/test/env/env_posix_test.cpp | 19 +-
be/test/olap/cumulative_compaction_policy_test.cpp | 4 +-
be/test/olap/delete_handler_test.cpp | 12 +-
be/test/olap/file_utils_test.cpp | 8 +-
be/test/olap/fs/file_block_manager_test.cpp | 6 +-
be/test/olap/memory/mem_tablet_test.cpp | 3 +-
be/test/olap/olap_snapshot_converter_test.cpp | 2 +-
be/test/olap/rowset/alpha_rowset_test.cpp | 2 +-
be/test/olap/rowset/beta_rowset_test.cpp | 2 +-
be/test/olap/rowset/rowset_converter_test.cpp | 10 +-
.../olap/rowset/segment_v2/bitmap_index_test.cpp | 4 +-
.../bloom_filter_index_reader_writer_test.cpp | 4 +-
.../segment_v2/column_reader_writer_test.cpp | 24 +-
.../rowset/segment_v2/ordinal_page_index_test.cpp | 10 +-
be/test/olap/rowset/segment_v2/segment_test.cpp | 20 +-
.../olap/rowset/segment_v2/zone_map_index_test.cpp | 10 +-
be/test/olap/tablet_meta_test.cpp | 2 +-
be/test/olap/tablet_mgr_test.cpp | 8 +-
be/test/olap/tablet_test.cpp | 2 +-
be/test/olap/txn_manager_test.cpp | 10 +-
be/test/plugin/plugin_zip_test.cpp | 4 +-
be/test/tools/benchmark_tool.cpp | 2 +-
be/test/util/broker_storage_backend_test.cpp | 2 +-
be/test/util/s3_storage_backend_test.cpp | 4 +-
be/test/util/zip_util_test.cpp | 4 +-
gensrc/proto/olap_file.proto | 7 +
gensrc/thrift/Types.thrift | 1 +
114 files changed, 2302 insertions(+), 1042 deletions(-)
diff --git a/be/src/agent/task_worker_pool.cpp b/be/src/agent/task_worker_pool.cpp
index e02668c..a6fd374 100644
--- a/be/src/agent/task_worker_pool.cpp
+++ b/be/src/agent/task_worker_pool.cpp
@@ -1167,14 +1167,14 @@ void TaskWorkerPool::_report_disk_state_worker_thread_callback() {
map<string, TDisk> disks;
for (auto& root_path_info : data_dir_infos) {
TDisk disk;
- disk.__set_root_path(root_path_info.path);
+ disk.__set_root_path(root_path_info.path_desc.filepath);
disk.__set_path_hash(root_path_info.path_hash);
disk.__set_storage_medium(root_path_info.storage_medium);
disk.__set_disk_total_capacity(root_path_info.disk_capacity);
disk.__set_data_used_capacity(root_path_info.data_used_capacity);
disk.__set_disk_available_capacity(root_path_info.available);
disk.__set_used(root_path_info.is_used);
- disks[root_path_info.path] = disk;
+ disks[root_path_info.path_desc.filepath] = disk;
}
request.__set_disks(disks);
_handle_report(request, ReportType::DISK);
@@ -1583,7 +1583,7 @@ AgentStatus TaskWorkerPool::_move_dir(const TTabletId tablet_id, const TSchemaHa
return DORIS_TASK_REQUEST_ERROR;
}
- std::string dest_tablet_dir = tablet->tablet_path();
+ std::string dest_tablet_dir = tablet->tablet_path_desc().filepath;
SnapshotLoader loader(_env, job_id, tablet_id);
Status status = loader.move(src, tablet, overwrite);
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 8b4e70d..09843fb 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -642,6 +642,14 @@ CONF_mInt32(external_table_connect_timeout_sec, "5");
// So the value of this config should corresponding to the number of rowsets on this BE.
CONF_mInt32(segment_cache_capacity, "1000000");
+// s3 config
+CONF_String(s3_ak, "");
+CONF_String(s3_sk, "");
+CONF_String(s3_endpoint, "");
+CONF_String(s3_region, "");
+CONF_mInt32(s3_max_conn, "50");
+CONF_mInt32(s3_request_timeout_ms, "3000");
+CONF_mInt32(s3_conn_timeout_ms, "1000");
// Set to true to disable the minidump feature.
CONF_Bool(disable_minidump , "false");
diff --git a/be/src/env/CMakeLists.txt b/be/src/env/CMakeLists.txt
index 7e21c99..2819525 100644
--- a/be/src/env/CMakeLists.txt
+++ b/be/src/env/CMakeLists.txt
@@ -22,6 +22,8 @@ set(LIBRARY_OUTPUT_PATH "${BUILD_DIR}/src/env")
set(EXECUTABLE_OUTPUT_PATH "${BUILD_DIR}/src/env")
add_library(Env STATIC
+ env.cpp
+ env_remote.cpp
env_posix.cpp
env_util.cpp
)
diff --git a/be/src/olap/fs/fs_util.cpp b/be/src/env/env.cpp
similarity index 53%
copy from be/src/olap/fs/fs_util.cpp
copy to be/src/env/env.cpp
index 3ff1c76..68c1aa4 100644
--- a/be/src/olap/fs/fs_util.cpp
+++ b/be/src/env/env.cpp
@@ -15,25 +15,36 @@
// specific language governing permissions and limitations
// under the License.
-#include "olap/fs/fs_util.h"
-
-#include "common/status.h"
#include "env/env.h"
-#include "olap/fs/file_block_manager.h"
-#include "olap/storage_engine.h"
-#include "runtime/exec_env.h"
+#include "env/env_posix.h"
+#include "env/env_remote.h"
namespace doris {
-namespace fs {
-namespace fs_util {
-BlockManager* block_manager() {
- fs::BlockManagerOptions bm_opts;
- bm_opts.read_only = false;
- static FileBlockManager block_mgr(Env::Default(), std::move(bm_opts));
- return &block_mgr;
+std::shared_ptr<PosixEnv> Env::_posix_env(new PosixEnv());
+std::shared_ptr<RemoteEnv> Env::_remote_env(new RemoteEnv());
+
+// Default Posix Env
+Env *Env::Default() {
+ return _posix_env.get();
+}
+
+Env* Env::get_env(TStorageMedium::type storage_medium) {
+ switch (storage_medium) {
+ case TStorageMedium::S3:
+ return _remote_env.get();
+ case TStorageMedium::SSD:
+ case TStorageMedium::HDD:
+ default:
+ return Default();
+ }
+}
+
+Status Env::init() {
+ RETURN_IF_ERROR(_posix_env->init_conf());
+ RETURN_IF_ERROR(_remote_env->init_conf());
+ LOG(INFO) << "Env init successfully.";
+ return Status::OK();
}
-} // namespace fs_util
-} // namespace fs
-} // namespace doris
+} // end namespace doris
\ No newline at end of file
diff --git a/be/src/env/env.h b/be/src/env/env.h
index 0b8eac7..d47e5a7 100644
--- a/be/src/env/env.h
+++ b/be/src/env/env.h
@@ -2,11 +2,9 @@
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
-//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors
-
#pragma once
#include <memory>
@@ -14,6 +12,7 @@
#include "common/status.h"
#include "util/slice.h"
+#include "gen_cpp/Types_types.h"
namespace doris {
@@ -21,6 +20,9 @@ class RandomAccessFile;
class RandomRWFile;
class WritableFile;
class SequentialFile;
+class PosixEnv;
+class RemoteEnv;
+struct FilePathDesc;
struct WritableFileOptions;
struct RandomAccessFileOptions;
struct RandomRWFileOptions;
@@ -44,6 +46,7 @@ public:
// system. Sophisticated users may wish to provide their own Env
// implementation instead of relying on this default environment.
static Env* Default();
+ static Env* get_env(TStorageMedium::type storage_medium);
// Create a brand new sequentially-readable file with the specified name.
// On success, stores a pointer to the new file in *result and returns OK.
@@ -100,7 +103,7 @@ public:
// the calling process does not have permission to determine
// whether this file exists, or if the path is invalid.
// IOError if an IO Error was encountered
- virtual Status path_exists(const std::string& fname) = 0;
+ virtual Status path_exists(const std::string& fname, bool is_dir = false) = 0;
// Store in *result the names of the children of the specified directory.
// The names are relative to "dir".
@@ -161,11 +164,114 @@ public:
// Store the last modification time of fname in *file_mtime.
virtual Status get_file_modified_time(const std::string& fname, uint64_t* file_mtime) = 0;
+
+ // copy path from src to target.
+ virtual Status copy_path(const std::string& src, const std::string& target) = 0;
// Rename file src to target.
virtual Status rename_file(const std::string& src, const std::string& target) = 0;
+ // Rename dir src to target.
+ virtual Status rename_dir(const std::string& src, const std::string& target) = 0;
// create a hard-link
virtual Status link_file(const std::string& /*old_path*/, const std::string& /*new_path*/) = 0;
+
+ // get space info for local and remote system
+ virtual Status get_space_info(const std::string& path, int64_t* capacity, int64_t* available) = 0;
+
+ virtual bool is_remote_env() = 0;
+
+ // Create directory of dir_path,
+ // This function will create directory recursively,
+ // if dir's parent directory doesn't exist
+ //
+ // RETURNS:
+ // Status::OK() if create directory success or directory already exists
+ virtual Status create_dirs(const std::string& dirname) = 0;
+
+ static Status init();
+
+ virtual Status init_conf() = 0;
+
+private:
+ static std::shared_ptr<PosixEnv> _posix_env;
+ static std::shared_ptr<RemoteEnv> _remote_env;
+};
+
+struct FilePathDesc {
+ FilePathDesc(const std::string& path) {
+ filepath = path;
+ }
+ FilePathDesc() {}
+ TStorageMedium::type storage_medium = TStorageMedium::HDD;
+ std::string filepath;
+ std::string remote_path;
+ std::string debug_string() const {
+ std::stringstream ss;
+ ss << "local_path: " << filepath;
+ if (!remote_path.empty()) {
+ ss << ", remote_path: " << remote_path;
+ }
+ return ss.str();
+ }
+};
+
+class FilePathDescStream {
+public:
+ FilePathDescStream& operator<<(const FilePathDesc& val) {
+ _filepath_stream << val.filepath;
+ _storage_medium = val.storage_medium;
+ if (Env::get_env(_storage_medium)->is_remote_env()) {
+ _remote_path_stream << val.remote_path;
+ }
+ return *this;
+ }
+ FilePathDescStream& operator<<(const std::string& val) {
+ _filepath_stream << val;
+ if (Env::get_env(_storage_medium)->is_remote_env()) {
+ _remote_path_stream << val;
+ }
+ return *this;
+ }
+ FilePathDescStream& operator<<(uint64_t val) {
+ _filepath_stream << val;
+ if (Env::get_env(_storage_medium)->is_remote_env()) {
+ _remote_path_stream << val;
+ }
+ return *this;
+ }
+ FilePathDescStream& operator<<(int64_t val) {
+ _filepath_stream << val;
+ if (Env::get_env(_storage_medium)->is_remote_env()) {
+ _remote_path_stream << val;
+ }
+ return *this;
+ }
+ FilePathDescStream& operator<<(uint32_t val) {
+ _filepath_stream << val;
+ if (Env::get_env(_storage_medium)->is_remote_env()) {
+ _remote_path_stream << val;
+ }
+ return *this;
+ }
+ FilePathDescStream& operator<<(int32_t val) {
+ _filepath_stream << val;
+ if (Env::get_env(_storage_medium)->is_remote_env()) {
+ _remote_path_stream << val;
+ }
+ return *this;
+ }
+ FilePathDesc path_desc() {
+ FilePathDesc path_desc(_filepath_stream.str());
+ path_desc.storage_medium = _storage_medium;
+ if (Env::get_env(_storage_medium)->is_remote_env()) {
+ path_desc.remote_path = _remote_path_stream.str();
+ }
+ return path_desc;
+ }
+private:
+ TStorageMedium::type _storage_medium = TStorageMedium::HDD;
+ std::stringstream _filepath_stream;
+ std::stringstream _remote_path_stream;
};
struct RandomAccessFileOptions {
@@ -231,7 +337,7 @@ public:
// possible to read exactly 'length' bytes, an IOError is returned.
//
// Safe for concurrent use by multiple threads.
- virtual Status read_at(uint64_t offset, const Slice& result) const = 0;
+ virtual Status read_at(uint64_t offset, const Slice* result) const = 0;
// Reads up to the "results" aggregate size, based on each Slice's "size",
// from the file starting at 'offset'. The Slices must point to already-allocated
@@ -244,7 +350,10 @@ public:
// possible to read exactly 'length' bytes, an IOError is returned.
//
// Safe for concurrent use by multiple threads.
- virtual Status readv_at(uint64_t offset, const Slice* res, size_t res_cnt) const = 0;
+ virtual Status readv_at(uint64_t offset, const Slice* result, size_t res_cnt) const = 0;
+
+ // read all data from this file
+ virtual Status read_all(std::string* content) const = 0;
// Return the size of this file
virtual Status size(uint64_t* size) const = 0;
@@ -315,7 +424,7 @@ public:
RandomRWFile() {}
virtual ~RandomRWFile() {}
- virtual Status read_at(uint64_t offset, const Slice& result) const = 0;
+ virtual Status read_at(uint64_t offset, const Slice* result) const = 0;
virtual Status readv_at(uint64_t offset, const Slice* res, size_t res_cnt) const = 0;
diff --git a/be/src/env/env_posix.cpp b/be/src/env/env_posix.cpp
index 0ccdcb3..0466504 100644
--- a/be/src/env/env_posix.cpp
+++ b/be/src/env/env_posix.cpp
@@ -1,11 +1,19 @@
-// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
-// This source code is licensed under both the GPLv2 (found in the
-// COPYING file in the root directory) and Apache 2.0 License
-// (found in the LICENSE.Apache file in the root directory).
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
//
-// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style license that can be
-// found in the LICENSE file. See the AUTHORS file for names of contributors
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
#include <dirent.h>
#include <fcntl.h>
@@ -16,9 +24,12 @@
#include <unistd.h>
#include <memory>
+#include <filesystem>
+#include <fstream>
#include "common/logging.h"
#include "env/env.h"
+#include "env/env_posix.h"
#include "gutil/gscoped_ptr.h"
#include "gutil/macros.h"
#include "gutil/port.h"
@@ -266,13 +277,38 @@ public:
}
}
- Status read_at(uint64_t offset, const Slice& result) const override {
- return do_readv_at(_fd, _filename, offset, &result, 1);
+ Status read_at(uint64_t offset, const Slice* result) const override {
+ return readv_at(offset, result, 1);
+ }
+
+ Status readv_at(uint64_t offset, const Slice* result, size_t res_cnt) const override {
+ return do_readv_at(_fd, _filename, offset, result, res_cnt);
}
- Status readv_at(uint64_t offset, const Slice* res, size_t res_cnt) const override {
- return do_readv_at(_fd, _filename, offset, res, res_cnt);
+ Status read_all(std::string* content) const override {
+ std::fstream fs(_filename.c_str(), std::fstream::in);
+ if (!fs.is_open()) {
+ RETURN_NOT_OK_STATUS_WITH_WARN(
+ Status::IOError(
+ strings::Substitute("failed to open cluster id file $0", _filename)),
+ "open file failed");
+ }
+ std::string data;
+ fs >> data;
+ fs.close();
+ if ((fs.rdstate() & std::fstream::eofbit) != 0) {
+ *content = data;
+ } else {
+ RETURN_NOT_OK_STATUS_WITH_WARN(
+ Status::Corruption(strings::Substitute(
+ "read_all from file $0 is corrupt. [eofbit=$1 failbit=$2 badbit=$3]",
+ _filename, fs.rdstate() & std::fstream::eofbit,
+ fs.rdstate() & std::fstream::failbit, fs.rdstate() & std::fstream::badbit)),
+ "read_all is error");
+ }
+ return Status::OK();
}
+
Status size(uint64_t* size) const override {
struct stat st;
auto res = fstat(_fd, &st);
@@ -413,12 +449,12 @@ public:
~PosixRandomRWFile() { WARN_IF_ERROR(close(), "Failed to close " + _filename); }
- virtual Status read_at(uint64_t offset, const Slice& result) const override {
- return do_readv_at(_fd, _filename, offset, &result, 1);
+ Status read_at(uint64_t offset, const Slice* result) const override {
+ return readv_at(offset, result, 1);
}
- Status readv_at(uint64_t offset, const Slice* res, size_t res_cnt) const override {
- return do_readv_at(_fd, _filename, offset, res, res_cnt);
+ Status readv_at(uint64_t offset, const Slice* result, size_t res_cnt) const override {
+ return do_readv_at(_fd, _filename, offset, result, res_cnt);
}
Status write_at(uint64_t offset, const Slice& data) override {
@@ -491,219 +527,289 @@ private:
bool _closed = false;
};
-class PosixEnv : public Env {
-public:
- ~PosixEnv() override {}
+Status PosixEnv::init_conf() {
+ return Status::OK();
+}
- Status new_sequential_file(const string& fname,
- std::unique_ptr<SequentialFile>* result) override {
- FILE* f;
- POINTER_RETRY_ON_EINTR(f, fopen(fname.c_str(), "r"));
- if (f == nullptr) {
- return io_error(fname, errno);
- }
- result->reset(new PosixSequentialFile(fname, f));
- return Status::OK();
+Status PosixEnv::new_sequential_file(const string& fname,
+ std::unique_ptr<SequentialFile>* result) {
+ FILE* f;
+ POINTER_RETRY_ON_EINTR(f, fopen(fname.c_str(), "r"));
+ if (f == nullptr) {
+ return io_error(fname, errno);
}
+ result->reset(new PosixSequentialFile(fname, f));
+ return Status::OK();
+}
- // get a RandomAccessFile pointer without file cache
- Status new_random_access_file(const std::string& fname,
- std::unique_ptr<RandomAccessFile>* result) override {
- return new_random_access_file(RandomAccessFileOptions(), fname, result);
- }
+// get a RandomAccessFile pointer without file cache
+Status PosixEnv::new_random_access_file(const std::string& fname,
+ std::unique_ptr<RandomAccessFile>* result) {
+ return new_random_access_file(RandomAccessFileOptions(), fname, result);
+}
- Status new_random_access_file(const RandomAccessFileOptions& opts, const std::string& fname,
- std::unique_ptr<RandomAccessFile>* result) override {
- int fd;
- RETRY_ON_EINTR(fd, open(fname.c_str(), O_RDONLY));
- if (fd < 0) {
- return io_error(fname, errno);
- }
- result->reset(new PosixRandomAccessFile(fname, fd));
- return Status::OK();
+Status PosixEnv::new_random_access_file(const RandomAccessFileOptions& opts, const std::string& fname,
+ std::unique_ptr<RandomAccessFile>* result) {
+ int fd;
+ RETRY_ON_EINTR(fd, open(fname.c_str(), O_RDONLY));
+ if (fd < 0) {
+ return io_error(fname, errno);
}
+ result->reset(new PosixRandomAccessFile(fname, fd));
+ return Status::OK();
+}
- Status new_writable_file(const string& fname, std::unique_ptr<WritableFile>* result) override {
- return new_writable_file(WritableFileOptions(), fname, result);
- }
+Status PosixEnv::new_writable_file(const string& fname, std::unique_ptr<WritableFile>* result) {
+ return new_writable_file(WritableFileOptions(), fname, result);
+}
- Status new_writable_file(const WritableFileOptions& opts, const string& fname,
- std::unique_ptr<WritableFile>* result) override {
- int fd;
- RETURN_IF_ERROR(do_open(fname, opts.mode, &fd));
+Status PosixEnv::new_writable_file(const WritableFileOptions& opts, const string& fname,
+ std::unique_ptr<WritableFile>* result) {
+ int fd;
+ RETURN_IF_ERROR(do_open(fname, opts.mode, &fd));
- uint64_t file_size = 0;
- if (opts.mode == MUST_EXIST) {
- RETURN_IF_ERROR(get_file_size(fname, &file_size));
- }
- result->reset(new PosixWritableFile(fname, fd, file_size, opts.sync_on_close));
- return Status::OK();
+ uint64_t file_size = 0;
+ if (opts.mode == MUST_EXIST) {
+ RETURN_IF_ERROR(get_file_size(fname, &file_size));
}
+ result->reset(new PosixWritableFile(fname, fd, file_size, opts.sync_on_close));
+ return Status::OK();
+}
+
+Status PosixEnv::new_random_rw_file(const string& fname, std::unique_ptr<RandomRWFile>* result) {
+ return new_random_rw_file(RandomRWFileOptions(), fname, result);
+}
+
+Status PosixEnv::new_random_rw_file(const RandomRWFileOptions& opts, const string& fname,
+ std::unique_ptr<RandomRWFile>* result) {
+ int fd;
+ RETURN_IF_ERROR(do_open(fname, opts.mode, &fd));
+ result->reset(new PosixRandomRWFile(fname, fd, opts.sync_on_close));
+ return Status::OK();
+}
- Status new_random_rw_file(const string& fname, std::unique_ptr<RandomRWFile>* result) override {
- return new_random_rw_file(RandomRWFileOptions(), fname, result);
+Status PosixEnv::path_exists(const std::string& fname, bool is_dir) {
+ if (access(fname.c_str(), F_OK) != 0) {
+ return io_error(fname, errno);
}
+ return Status::OK();
+}
- Status new_random_rw_file(const RandomRWFileOptions& opts, const string& fname,
- std::unique_ptr<RandomRWFile>* result) override {
- int fd;
- RETURN_IF_ERROR(do_open(fname, opts.mode, &fd));
- result->reset(new PosixRandomRWFile(fname, fd, opts.sync_on_close));
- return Status::OK();
+Status PosixEnv::get_children(const std::string& dir, std::vector<std::string>* result) {
+ result->clear();
+ DIR* d = opendir(dir.c_str());
+ if (d == nullptr) {
+ return io_error(dir, errno);
+ }
+ struct dirent* entry;
+ while ((entry = readdir(d)) != nullptr) {
+ result->push_back(entry->d_name);
}
+ closedir(d);
+ return Status::OK();
+}
- Status path_exists(const std::string& fname) override {
- if (access(fname.c_str(), F_OK) != 0) {
- return io_error(fname, errno);
+Status PosixEnv::iterate_dir(const std::string& dir,
+ const std::function<bool(const char*)>& cb) {
+ DIR* d = opendir(dir.c_str());
+ if (d == nullptr) {
+ return io_error(dir, errno);
+ }
+ struct dirent* entry;
+ while ((entry = readdir(d)) != nullptr) {
+ // callback returning false means to terminate iteration
+ if (!cb(entry->d_name)) {
+ break;
}
- return Status::OK();
}
+ closedir(d);
+ return Status::OK();
+}
- Status get_children(const std::string& dir, std::vector<std::string>* result) override {
- result->clear();
- DIR* d = opendir(dir.c_str());
- if (d == nullptr) {
- return io_error(dir, errno);
- }
- struct dirent* entry;
- while ((entry = readdir(d)) != nullptr) {
- result->push_back(entry->d_name);
- }
- closedir(d);
- return Status::OK();
+Status PosixEnv::delete_file(const std::string& fname) {
+ if (unlink(fname.c_str()) != 0) {
+ return io_error(fname, errno);
}
+ return Status::OK();
+}
- Status iterate_dir(const std::string& dir,
- const std::function<bool(const char*)>& cb) override {
- DIR* d = opendir(dir.c_str());
- if (d == nullptr) {
- return io_error(dir, errno);
- }
- struct dirent* entry;
- while ((entry = readdir(d)) != nullptr) {
- // callback returning false means to terminate iteration
- if (!cb(entry->d_name)) {
- break;
- }
- }
- closedir(d);
- return Status::OK();
+Status PosixEnv::create_dir(const std::string& name) {
+ if (mkdir(name.c_str(), 0755) != 0) {
+ return io_error(name, errno);
}
+ return Status::OK();
+}
- Status delete_file(const std::string& fname) override {
- if (unlink(fname.c_str()) != 0) {
- return io_error(fname, errno);
- }
- return Status::OK();
+Status PosixEnv::create_dir_if_missing(const string& dirname, bool* created) {
+ Status s = create_dir(dirname);
+ if (created != nullptr) {
+ *created = s.ok();
}
- Status create_dir(const std::string& name) override {
- if (mkdir(name.c_str(), 0755) != 0) {
- return io_error(name, errno);
+ // Check that dirname is actually a directory.
+ if (s.is_already_exist()) {
+ bool is_dir = false;
+ RETURN_IF_ERROR(is_directory(dirname, &is_dir));
+ if (is_dir) {
+ return Status::OK();
+ } else {
+ return s.clone_and_append("path already exists but not a dir");
}
- return Status::OK();
+ }
+ return s;
+}
+
+Status PosixEnv::create_dirs(const string& dirname) {
+ if (dirname.empty()) {
+ return Status::InvalidArgument(strings::Substitute("Unknown primitive type($0)", dirname));
}
- Status create_dir_if_missing(const string& dirname, bool* created = nullptr) override {
- Status s = create_dir(dirname);
- if (created != nullptr) {
- *created = s.ok();
- }
+ std::filesystem::path p(dirname);
+
+ std::string partial_path;
+ for (std::filesystem::path::iterator it = p.begin(); it != p.end(); ++it) {
+ partial_path = partial_path + it->string() + "/";
+ bool is_dir = false;
- // Check that dirname is actually a directory.
- if (s.is_already_exist()) {
- bool is_dir = false;
- RETURN_IF_ERROR(is_directory(dirname, &is_dir));
+ Status s = is_directory(partial_path, &is_dir);
+
+ if (s.ok()) {
if (is_dir) {
- return Status::OK();
+ // It's a normal directory.
+ continue;
+ }
+
+ // Maybe a file or a symlink. Let's try to follow the symlink.
+ std::string real_partial_path;
+ RETURN_IF_ERROR(canonicalize(partial_path, &real_partial_path));
+
+ RETURN_IF_ERROR(is_directory(real_partial_path, &is_dir));
+ if (is_dir) {
+ // It's a symlink to a directory.
+ continue;
} else {
- return s.clone_and_append("path already exists but not a dir");
+ return Status::IOError(partial_path + " exists but is not a directory");
}
}
- return s;
+
+ RETURN_IF_ERROR(create_dir_if_missing(partial_path));
}
- // Delete the specified directory.
- Status delete_dir(const std::string& dirname) override {
- if (rmdir(dirname.c_str()) != 0) {
- return io_error(dirname, errno);
- }
- return Status::OK();
+ return Status::OK();
+}
+
+// Delete the specified directory.
+Status PosixEnv::delete_dir(const std::string& dirname) {
+ std::filesystem::path boost_path(dirname);
+ std::error_code ec;
+ std::filesystem::remove_all(boost_path, ec);
+ if (ec) {
+ std::stringstream ss;
+ ss << "remove all(" << dirname << ") failed, because: " << ec;
+ return Status::InternalError(ss.str());
}
+ return Status::OK();
+}
- Status sync_dir(const string& dirname) override {
- int dir_fd;
- RETRY_ON_EINTR(dir_fd, open(dirname.c_str(), O_DIRECTORY | O_RDONLY));
- if (dir_fd < 0) {
- return io_error(dirname, errno);
- }
- ScopedFdCloser fd_closer(dir_fd);
- if (fsync(dir_fd) != 0) {
- return io_error(dirname, errno);
- }
- return Status::OK();
+Status PosixEnv::sync_dir(const string& dirname) {
+ int dir_fd;
+ RETRY_ON_EINTR(dir_fd, open(dirname.c_str(), O_DIRECTORY | O_RDONLY));
+ if (dir_fd < 0) {
+ return io_error(dirname, errno);
}
+ ScopedFdCloser fd_closer(dir_fd);
+ if (fsync(dir_fd) != 0) {
+ return io_error(dirname, errno);
+ }
+ return Status::OK();
+}
- Status is_directory(const std::string& path, bool* is_dir) override {
- struct stat path_stat;
- if (stat(path.c_str(), &path_stat) != 0) {
- return io_error(path, errno);
- } else {
- *is_dir = S_ISDIR(path_stat.st_mode);
- }
+Status PosixEnv::is_directory(const std::string& path, bool* is_dir) {
+ struct stat path_stat;
+ if (stat(path.c_str(), &path_stat) != 0) {
+ return io_error(path, errno);
+ } else {
+ *is_dir = S_ISDIR(path_stat.st_mode);
+ }
- return Status::OK();
+ return Status::OK();
+}
+
+Status PosixEnv::canonicalize(const std::string& path, std::string* result) {
+ // NOTE: we must use free() to release the buffer retruned by realpath(),
+ // because the buffer is allocated by malloc(), see `man 3 realpath`.
+ std::unique_ptr<char[], FreeDeleter> r(realpath(path.c_str(), nullptr));
+ if (r == nullptr) {
+ return io_error(strings::Substitute("Unable to canonicalize $0", path), errno);
}
+ *result = std::string(r.get());
+ return Status::OK();
+}
- Status canonicalize(const std::string& path, std::string* result) override {
- // NOTE: we must use free() to release the buffer retruned by realpath(),
- // because the buffer is allocated by malloc(), see `man 3 realpath`.
- std::unique_ptr<char[], FreeDeleter> r(realpath(path.c_str(), nullptr));
- if (r == nullptr) {
- return io_error(strings::Substitute("Unable to canonicalize $0", path), errno);
- }
- *result = std::string(r.get());
- return Status::OK();
+Status PosixEnv::get_file_size(const string& fname, uint64_t* size) {
+ struct stat sbuf;
+ if (stat(fname.c_str(), &sbuf) != 0) {
+ return io_error(fname, errno);
+ } else {
+ *size = sbuf.st_size;
}
+ return Status::OK();
+}
- Status get_file_size(const string& fname, uint64_t* size) override {
- struct stat sbuf;
- if (stat(fname.c_str(), &sbuf) != 0) {
- return io_error(fname, errno);
- } else {
- *size = sbuf.st_size;
- }
- return Status::OK();
+Status PosixEnv::get_file_modified_time(const std::string& fname, uint64_t* file_mtime) {
+ struct stat s;
+ if (stat(fname.c_str(), &s) != 0) {
+ return io_error(fname, errno);
}
+ *file_mtime = static_cast<uint64_t>(s.st_mtime);
+ return Status::OK();
+}
- Status get_file_modified_time(const std::string& fname, uint64_t* file_mtime) override {
- struct stat s;
- if (stat(fname.c_str(), &s) != 0) {
- return io_error(fname, errno);
- }
- *file_mtime = static_cast<uint64_t>(s.st_mtime);
- return Status::OK();
+Status PosixEnv::copy_path(const std::string& src, const std::string& target) {
+ try {
+ std::filesystem::copy(src, target, std::filesystem::copy_options::recursive);
+ } catch (const std::filesystem::filesystem_error& e) {
+ std::stringstream ss;
+ ss << "failed to copy_path: from " << src << " to " << target << ". err: " << e.what();
+ LOG(WARNING) << ss.str();
+ return Status::InternalError(ss.str());
+ }
+ return Status::OK();
+}
+
+Status PosixEnv::rename_file(const std::string& src, const std::string& target) {
+ if (rename(src.c_str(), target.c_str()) != 0) {
+ return io_error(src, errno);
}
+ return Status::OK();
+}
- Status rename_file(const std::string& src, const std::string& target) override {
- if (rename(src.c_str(), target.c_str()) != 0) {
- return io_error(src, errno);
- }
- return Status::OK();
+Status PosixEnv::rename_dir(const std::string& src, const std::string& target) {
+ return rename_file(src, target);
+}
+
+Status PosixEnv::link_file(const std::string& old_path, const std::string& new_path) {
+ if (link(old_path.c_str(), new_path.c_str()) != 0) {
+ return io_error(old_path, errno);
}
+ return Status::OK();
+}
- Status link_file(const std::string& old_path, const std::string& new_path) override {
- if (link(old_path.c_str(), new_path.c_str()) != 0) {
- return io_error(old_path, errno);
+Status PosixEnv::get_space_info(const std::string& path, int64_t* capacity, int64_t* available) {
+ try {
+ std::filesystem::path path_name(path);
+ std::filesystem::space_info path_info = std::filesystem::space(path_name);
+ if (*capacity <= 0) {
+ *capacity = path_info.capacity;
}
- return Status::OK();
+ *available = path_info.available;
+ } catch (std::filesystem::filesystem_error& e) {
+ RETURN_NOT_OK_STATUS_WITH_WARN(
+ Status::IOError(strings::Substitute(
+ "get path $0 available capacity failed, error=$1", path, e.what())),
+ "std::filesystem::space failed");
}
-};
-
-// Default Posix Env
-Env* Env::Default() {
- static PosixEnv default_env;
- return &default_env;
+ return Status::OK();
}
} // end namespace doris
diff --git a/be/src/env/env_posix.h b/be/src/env/env_posix.h
new file mode 100644
index 0000000..d9caf6b
--- /dev/null
+++ b/be/src/env/env_posix.h
@@ -0,0 +1,101 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "env/env.h"
+
+namespace doris {
+
+class RandomAccessFile;
+class RandomRWFile;
+class WritableFile;
+class SequentialFile;
+struct WritableFileOptions;
+struct RandomAccessFileOptions;
+struct RandomRWFileOptions;
+
+class PosixEnv : public Env {
+public:
+ ~PosixEnv() override {}
+
+ Status init_conf();
+
+ Status new_sequential_file(const std::string& fname,
+ std::unique_ptr<SequentialFile>* result) override;
+
+ // get a RandomAccessFile pointer without file cache
+ Status new_random_access_file(const std::string& fname,
+ std::unique_ptr<RandomAccessFile>* result) override;
+
+ Status new_random_access_file(const RandomAccessFileOptions& opts, const std::string& fname,
+ std::unique_ptr<RandomAccessFile>* result) override;
+
+ Status new_writable_file(const std::string& fname, std::unique_ptr<WritableFile>* result) override;
+
+ Status new_writable_file(const WritableFileOptions& opts, const std::string& fname,
+ std::unique_ptr<WritableFile>* result) override;
+
+ Status new_random_rw_file(const std::string& fname, std::unique_ptr<RandomRWFile>* result) override;
+
+ Status new_random_rw_file(const RandomRWFileOptions& opts, const std::string& fname,
+ std::unique_ptr<RandomRWFile>* result) override;
+
+ Status path_exists(const std::string& fname, bool is_dir = false) override;
+
+ Status get_children(const std::string& dir, std::vector<std::string>* result) override;
+
+ Status iterate_dir(const std::string& dir,
+ const std::function<bool(const char*)>& cb) override;
+
+ Status delete_file(const std::string& fname) override;
+
+ Status create_dir(const std::string& name) override;
+
+ Status create_dir_if_missing(const std::string& dirname, bool* created = nullptr) override;
+
+ Status create_dirs(const std::string& dirname) override;
+
+ // Delete the specified directory.
+ Status delete_dir(const std::string& dirname) override;
+
+ Status sync_dir(const std::string& dirname) override;
+
+ Status is_directory(const std::string& path, bool* is_dir) override;
+
+ Status canonicalize(const std::string& path, std::string* result) override;
+
+ Status get_file_size(const std::string& fname, uint64_t* size) override;
+
+ Status get_file_modified_time(const std::string& fname, uint64_t* file_mtime) override;
+
+ Status copy_path(const std::string& src, const std::string& target) override;
+
+ Status rename_file(const std::string& src, const std::string& target) override;
+
+ Status rename_dir(const std::string& src, const std::string& target) override;
+
+ Status link_file(const std::string& old_path, const std::string& new_path) override;
+
+ Status get_space_info(const std::string& path, int64_t* capacity, int64_t* available) override;
+
+ bool is_remote_env() override {
+ return false;
+ }
+};
+
+} // namespace doris
\ No newline at end of file
diff --git a/be/src/env/env_remote.cpp b/be/src/env/env_remote.cpp
new file mode 100644
index 0000000..d359b6a
--- /dev/null
+++ b/be/src/env/env_remote.cpp
@@ -0,0 +1,353 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "common/config.h"
+#include "common/logging.h"
+#include "common/status.h"
+#include "env/env.h"
+#include "env/env_remote.h"
+#include "util/s3_storage_backend.h"
+#include "util/s3_util.h"
+
+#include "gutil/strings/substitute.h"
+
+namespace doris {
+
+using std::string;
+using strings::Substitute;
+
+class RemoteRandomAccessFile : public RandomAccessFile {
+public:
+ RemoteRandomAccessFile(std::string filename, std::shared_ptr<StorageBackend> storage_backend)
+ : _filename(std::move(filename)), _storage_backend(storage_backend) {}
+ ~RemoteRandomAccessFile() {
+ }
+
+ Status read_at(uint64_t offset, const Slice* result) const override {
+ return readv_at(offset, result, 1);
+ }
+
+ Status readv_at(uint64_t offset, const Slice* result, size_t res_cnt) const override {
+ return Status::NotSupported("No support", 1, "");
+ }
+ Status read_all(std::string* content) const override {
+ return _storage_backend->direct_download(_filename, content);
+ }
+ Status size(uint64_t* size) const override {
+ return Status::NotSupported("No support", 1, "");
+ }
+
+ const std::string& file_name() const override { return _filename; }
+
+private:
+ const std::string _filename;
+ std::shared_ptr<StorageBackend> _storage_backend;
+};
+
+class RemoteWritableFile : public WritableFile {
+public:
+ RemoteWritableFile(std::string filename, std::shared_ptr<StorageBackend> storage_backend, uint64_t filesize)
+ : _filename(std::move(filename)),
+ _storage_backend(storage_backend),
+ _filesize(filesize) {}
+
+ ~RemoteWritableFile() override {
+ WARN_IF_ERROR(close(), "Failed to close file, file=" + _filename);
+ }
+
+ Status append(const Slice& data) override { return appendv(&data, 1); }
+
+ Status appendv(const Slice* data, size_t data_cnt) override {
+ size_t bytes_written = 0;
+ std::string content;
+ for (size_t i = 0; i < data_cnt; i++) {
+ content += data[i].to_string();
+ bytes_written += data[i].size;
+ }
+ Status status = _storage_backend->direct_upload(_filename, content);
+ RETURN_NOT_OK_STATUS_WITH_WARN(status, strings::Substitute(
+ "direct_upload failed: $0, err=$1", _filename, status.to_string()));
+ _filesize += bytes_written;
+ return Status::OK();
+ }
+
+ Status pre_allocate(uint64_t size) override {
+ return Status::NotSupported("No support", 1, "");
+ }
+
+ Status close() override {
+ return Status::OK();
+ }
+
+ Status flush(FlushMode mode) override {
+ return Status::OK();
+ }
+
+ Status sync() override {
+ return Status::OK();
+ }
+
+ uint64_t size() const override { return _filesize; }
+ const string& filename() const override { return _filename; }
+
+private:
+ std::string _filename;
+ std::shared_ptr<StorageBackend> _storage_backend;
+ uint64_t _filesize = 0;
+};
+
+class RemoteRandomRWFile : public RandomRWFile {
+public:
+ RemoteRandomRWFile(const FilePathDesc& path_desc) : _path_desc(path_desc) {}
+
+ ~RemoteRandomRWFile() { WARN_IF_ERROR(close(), "Failed to close " + _path_desc.filepath); }
+
+ virtual Status read_at(uint64_t offset, const Slice& result) const {
+ return Status::NotSupported("No support", 1, "");
+ }
+
+ Status readv_at(uint64_t offset, const Slice* res, size_t res_cnt) const {
+ return Status::NotSupported("No support", 1, "");
+ }
+
+ Status write_at(uint64_t offset, const Slice& data) {
+ return Status::NotSupported("No support", 1, "");
+ }
+
+ Status writev_at(uint64_t offset, const Slice* data, size_t data_cnt) {
+ return Status::NotSupported("No support", 1, "");
+ }
+
+ Status flush(FlushMode mode, uint64_t offset, size_t length) {
+ return Status::NotSupported("No support", 1, "");
+ }
+
+ Status sync() {
+ return Status::NotSupported("No support", 1, "");
+ }
+
+ Status close() {
+ return Status::NotSupported("No support", 1, "");
+ }
+
+ Status size(uint64_t* size) const {
+ return Status::NotSupported("No support", 1, "");
+ }
+
+ const string& filename() const { return _path_desc.filepath; }
+
+private:
+ const FilePathDesc _path_desc;
+ const bool _sync_on_close = false;
+ bool _closed = false;
+};
+
+Status RemoteEnv::init_conf() {
+ std::map<std::string, std::string> storage_prop;
+ storage_prop[S3_AK] = doris::config::s3_ak;
+ storage_prop[S3_SK] = doris::config::s3_sk;
+ storage_prop[S3_ENDPOINT] = doris::config::s3_endpoint;
+ storage_prop[S3_REGION] = doris::config::s3_region;
+ storage_prop[S3_MAX_CONN_SIZE] = std::to_string(doris::config::s3_max_conn);
+ storage_prop[S3_REQUEST_TIMEOUT_MS] = std::to_string(doris::config::s3_request_timeout_ms);
+ storage_prop[S3_CONN_TIMEOUT_MS] = std::to_string(doris::config::s3_conn_timeout_ms);
+
+ if (ClientFactory::is_s3_conf_valid(storage_prop)) {
+ _storage_backend.reset(new S3StorageBackend(storage_prop));
+ }
+ return Status::OK();
+}
+
+Status RemoteEnv::new_sequential_file(const std::string& fname,
+ std::unique_ptr<SequentialFile>* result) {
+ return Status::IOError(strings::Substitute("Unable to new_sequential_file $0", fname), 0, "");
+}
+
+// get a RandomAccessFile pointer without file cache
+Status RemoteEnv::new_random_access_file(const std::string& fname,
+ std::unique_ptr<RandomAccessFile>* result) {
+ return new_random_access_file(RandomAccessFileOptions(), fname, result);
+}
+
+Status RemoteEnv::new_random_access_file(const RandomAccessFileOptions& opts, const std::string& fname,
+ std::unique_ptr<RandomAccessFile>* result) {
+ result->reset(new RemoteRandomAccessFile(fname, _storage_backend));
+ return Status::OK();
+}
+
+Status RemoteEnv::new_writable_file(const std::string& fname, std::unique_ptr<WritableFile>* result) {
+ return new_writable_file(WritableFileOptions(), fname, result);
+}
+
+Status RemoteEnv::new_writable_file(const WritableFileOptions& opts, const std::string& fname,
+ std::unique_ptr<WritableFile>* result) {
+ uint64_t file_size = 0;
+ if (opts.mode == MUST_EXIST) {
+ RETURN_IF_ERROR(get_file_size(fname, &file_size));
+ }
+ result->reset(new RemoteWritableFile(fname, _storage_backend, file_size));
+ return Status::OK();
+}
+
+Status RemoteEnv::new_random_rw_file(const std::string& fname, std::unique_ptr<RandomRWFile>* result) {
+ return new_random_rw_file(RandomRWFileOptions(), fname, result);
+}
+
+Status RemoteEnv::new_random_rw_file(const RandomRWFileOptions& opts, const std::string& fname,
+ std::unique_ptr<RandomRWFile>* result) {
+ return Status::IOError(strings::Substitute("Unable to new_random_rw_file $0", fname), 0, "");
+}
+
+Status RemoteEnv::path_exists(const std::string& fname, bool is_dir) {
+ std::shared_ptr<StorageBackend> storage_backend = get_storage_backend();
+ Status status = Status::OK();
+ if (is_dir) {
+ status = storage_backend->exist_dir(fname);
+ } else {
+ status = storage_backend->exist(fname);
+ }
+ RETURN_NOT_OK_STATUS_WITH_WARN(status, strings::Substitute(
+ "path_exists failed: $0, err=$1", fname, status.to_string()));
+ return Status::OK();
+}
+
+Status RemoteEnv::get_children(const std::string& dir, std::vector<std::string>* result) {
+ return Status::IOError(strings::Substitute("Unable to get_children $0", dir), 0, "");
+}
+
+Status RemoteEnv::iterate_dir(const std::string& dir,
+ const std::function<bool(const char*)>& cb) {
+ return Status::IOError(strings::Substitute("Unable to iterate_dir $0", dir), 0, "");
+}
+
+Status RemoteEnv::delete_file(const std::string& fname) {
+ std::shared_ptr<StorageBackend> storage_backend = get_storage_backend();
+ Status status = storage_backend->rm(fname);
+ RETURN_NOT_OK_STATUS_WITH_WARN(status, strings::Substitute(
+ "delete_file failed: $0, err=$1", fname, status.to_string()));
+ return Status::OK();
+}
+
+Status RemoteEnv::create_dir(const std::string& name) {
+ std::shared_ptr<StorageBackend> storage_backend = get_storage_backend();
+ return storage_backend->mkdir(name);
+}
+
+Status RemoteEnv::create_dir_if_missing(const string& dirname, bool* created) {
+ std::shared_ptr<StorageBackend> storage_backend = get_storage_backend();
+ if (storage_backend->exist_dir(dirname)) {
+ *created = true;
+ return Status::OK();
+ }
+ return storage_backend->mkdir(dirname);
+}
+
+Status RemoteEnv::create_dirs(const string& dirname) {
+ std::shared_ptr<StorageBackend> storage_backend = get_storage_backend();
+ return storage_backend->mkdirs(dirname);
+}
+
+// Delete the specified directory.
+Status RemoteEnv::delete_dir(const std::string& dirname) {
+ std::shared_ptr<StorageBackend> storage_backend = get_storage_backend();
+ Status status = storage_backend->rmdir(dirname);
+ RETURN_NOT_OK_STATUS_WITH_WARN(status, strings::Substitute(
+ "delete_dir failed: $0, err=$1", dirname, status.to_string()));
+ return Status::OK();
+}
+
+Status RemoteEnv::sync_dir(const string& dirname) {
+ return Status::IOError(strings::Substitute("Unable to sync_dir $0", dirname), 0, "");
+}
+
+Status RemoteEnv::is_directory(const std::string& path, bool* is_dir) {
+ std::shared_ptr<StorageBackend> storage_backend = get_storage_backend();
+ Status status = storage_backend->exist(path);
+ if (status.ok()) {
+ *is_dir = false;
+ return Status::OK();
+ }
+ if (!status.is_not_found()) {
+ return status;
+ }
+
+ status = storage_backend->exist_dir(path);
+ if (status.ok()) {
+ *is_dir = true;
+ return Status::OK();
+ }
+ if (!status.is_not_found()) {
+ return status;
+ }
+
+ *is_dir = false;
+ return Status::OK();
+}
+
+Status RemoteEnv::canonicalize(const std::string& path, std::string* result) {
+ *result = path;
+ return Status::OK();
+}
+
+Status RemoteEnv::get_file_size(const std::string& fname, uint64_t* size) {
+ return Status::OK();
+ // return EnvBos::get_file_size(fname, size);
+}
+
+Status RemoteEnv::get_file_modified_time(const std::string& fname, uint64_t* file_mtime) {
+ return Status::IOError(strings::Substitute("Unable to get_file_modified_time $0", fname), 0, "");
+}
+
+Status RemoteEnv::copy_path(const std::string& src, const std::string& target) {
+ return Status::IOError(strings::Substitute("Unable to copy_path $0 to $1", src, target), 0, "");
+}
+
+Status RemoteEnv::rename_file(const std::string& src, const std::string& target) {
+ std::shared_ptr<StorageBackend> storage_backend = get_storage_backend();
+ Status status = storage_backend->rename(src, target);
+ RETURN_NOT_OK_STATUS_WITH_WARN(status, strings::Substitute(
+ "rename_file failed: from $0 to $1, err=$2", src, target, status.to_string()));
+ return Status::OK();
+}
+
+Status RemoteEnv::rename_dir(const std::string& src, const std::string& target) {
+ std::shared_ptr<StorageBackend> storage_backend = get_storage_backend();
+ Status status = storage_backend->rename_dir(src, target);
+ RETURN_NOT_OK_STATUS_WITH_WARN(status, strings::Substitute(
+ "rename_dir failed: from $0 to $1, err=$2", src, target, status.to_string()));
+ return Status::OK();
+}
+
+Status RemoteEnv::link_file(const std::string& old_path, const std::string& new_path) {
+ std::shared_ptr<StorageBackend> storage_backend = get_storage_backend();
+ Status status = storage_backend->copy(old_path, new_path);
+ RETURN_NOT_OK_STATUS_WITH_WARN(status, strings::Substitute(
+ "link_file failed: from $0 to $1, err=$2", old_path, new_path, status.to_string()));
+ return Status::OK();
+}
+
+Status RemoteEnv::get_space_info(const std::string& path, int64_t* capacity, int64_t* available) {
+ *capacity = -1;
+ *available = -1;
+ return Status::OK();
+}
+
+std::shared_ptr<StorageBackend> RemoteEnv::get_storage_backend() {
+ return _storage_backend;
+}
+
+} // end namespace doris
diff --git a/be/src/env/env_remote.h b/be/src/env/env_remote.h
new file mode 100644
index 0000000..dc7fd53
--- /dev/null
+++ b/be/src/env/env_remote.h
@@ -0,0 +1,107 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "env/env.h"
+#include "util/storage_backend.h"
+
+namespace doris {
+
+class RandomAccessFile;
+class RandomRWFile;
+class WritableFile;
+class SequentialFile;
+struct WritableFileOptions;
+struct RandomAccessFileOptions;
+struct RandomRWFileOptions;
+
+class RemoteEnv : public Env {
+public:
+ ~RemoteEnv() override {}
+
+ Status init_conf();
+
+ Status new_sequential_file(const std::string& fname,
+ std::unique_ptr<SequentialFile>* result) override;
+
+ // get a RandomAccessFile pointer without file cache
+ Status new_random_access_file(const std::string& fname,
+ std::unique_ptr<RandomAccessFile>* result) override;
+
+ Status new_random_access_file(const RandomAccessFileOptions& opts, const std::string& fname,
+ std::unique_ptr<RandomAccessFile>* result) override;
+
+ Status new_writable_file(const std::string& fname, std::unique_ptr<WritableFile>* result) override;
+
+ Status new_writable_file(const WritableFileOptions& opts, const std::string& fname,
+ std::unique_ptr<WritableFile>* result) override;
+
+ Status new_random_rw_file(const std::string& fname, std::unique_ptr<RandomRWFile>* result) override;
+
+ Status new_random_rw_file(const RandomRWFileOptions& opts, const std::string& fname,
+ std::unique_ptr<RandomRWFile>* result) override;
+
+ Status path_exists(const std::string& fname, bool is_dir = false) override;
+
+ Status get_children(const std::string& dir, std::vector<std::string>* result) override;
+
+ Status iterate_dir(const std::string& dir,
+ const std::function<bool(const char*)>& cb) override;
+
+ Status delete_file(const std::string& fname) override;
+
+ Status create_dir(const std::string& name) override;
+
+ Status create_dir_if_missing(const std::string& dirname, bool* created = nullptr) override;
+
+ Status create_dirs(const std::string& dirname) override;
+
+ // Delete the specified directory.
+ Status delete_dir(const std::string& dirname) override;
+
+ Status sync_dir(const std::string& dirname) override;
+
+ Status is_directory(const std::string& path, bool* is_dir) override;
+
+ Status canonicalize(const std::string& path, std::string* result) override;
+
+ Status get_file_size(const std::string& fname, uint64_t* size) override;
+
+ Status get_file_modified_time(const std::string& fname, uint64_t* file_mtime) override;
+
+ Status copy_path(const std::string& src, const std::string& target) override;
+
+ Status rename_file(const std::string& src, const std::string& target) override;
+
+ Status rename_dir(const std::string& src, const std::string& target) override;
+
+ Status link_file(const std::string& old_path, const std::string& new_path) override;
+
+ Status get_space_info(const std::string& path, int64_t* capacity, int64_t* available) override;
+
+ bool is_remote_env() override {
+ return true;
+ }
+
+ std::shared_ptr<StorageBackend> get_storage_backend();
+
+private:
+ std::shared_ptr<StorageBackend> _storage_backend;
+};
+
+} // namespace doris
\ No newline at end of file
diff --git a/be/src/olap/base_tablet.cpp b/be/src/olap/base_tablet.cpp
index 0f634ba..5406ac6 100644
--- a/be/src/olap/base_tablet.cpp
+++ b/be/src/olap/base_tablet.cpp
@@ -64,11 +64,15 @@ OLAPStatus BaseTablet::set_tablet_state(TabletState state) {
void BaseTablet::_gen_tablet_path() {
if (_data_dir != nullptr) {
- std::string path = _data_dir->path() + DATA_PREFIX;
- path = path_util::join_path_segments(path, std::to_string(_tablet_meta->shard_id()));
- path = path_util::join_path_segments(path, std::to_string(_tablet_meta->tablet_id()));
- path = path_util::join_path_segments(path, std::to_string(_tablet_meta->schema_hash()));
- _tablet_path = path;
+ FilePathDescStream desc_s;
+ desc_s << _data_dir->path_desc() << DATA_PREFIX;
+ FilePathDesc path_desc = path_util::join_path_desc_segments(
+ desc_s.path_desc(), std::to_string(_tablet_meta->shard_id()));
+ path_desc = path_util::join_path_desc_segments(path_desc, std::to_string(_tablet_meta->tablet_id()));
+ _tablet_path_desc = path_util::join_path_desc_segments(path_desc, std::to_string(_tablet_meta->schema_hash()));
+ if (Env::get_env(_tablet_path_desc.storage_medium)->is_remote_env()) {
+ _tablet_path_desc.remote_path += "/" + _tablet_meta->tablet_uid().to_string();
+ }
}
}
diff --git a/be/src/olap/base_tablet.h b/be/src/olap/base_tablet.h
index 3e1386b..97cc99f 100644
--- a/be/src/olap/base_tablet.h
+++ b/be/src/olap/base_tablet.h
@@ -39,7 +39,7 @@ public:
virtual ~BaseTablet();
inline DataDir* data_dir() const;
- std::string tablet_path() const;
+ FilePathDesc tablet_path_desc() const;
TabletState tablet_state() const { return _state; }
OLAPStatus set_tablet_state(TabletState state);
@@ -73,7 +73,7 @@ protected:
const TabletSchema& _schema;
DataDir* _data_dir;
- std::string _tablet_path;
+ FilePathDesc _tablet_path_desc;
// metrics of this tablet
std::shared_ptr<MetricEntity> _metric_entity = nullptr;
@@ -93,8 +93,8 @@ inline DataDir* BaseTablet::data_dir() const {
return _data_dir;
}
-inline std::string BaseTablet::tablet_path() const {
- return _tablet_path;
+inline FilePathDesc BaseTablet::tablet_path_desc() const {
+ return _tablet_path_desc;
}
inline const TabletMetaSharedPtr BaseTablet::tablet_meta() {
diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp
index f735be8..d6fc2e9 100644
--- a/be/src/olap/compaction.cpp
+++ b/be/src/olap/compaction.cpp
@@ -159,7 +159,7 @@ OLAPStatus Compaction::construct_output_rowset_writer() {
if (_tablet->tablet_meta()->preferred_rowset_type() == BETA_ROWSET) {
context.rowset_type = BETA_ROWSET;
}
- context.rowset_path_prefix = _tablet->tablet_path();
+ context.path_desc = _tablet->tablet_path_desc();
context.tablet_schema = &(_tablet->tablet_schema());
context.rowset_state = VISIBLE;
context.version = _output_version;
diff --git a/be/src/olap/data_dir.cpp b/be/src/olap/data_dir.cpp
index 9000ce2..da43f5b 100644
--- a/be/src/olap/data_dir.cpp
+++ b/be/src/olap/data_dir.cpp
@@ -28,13 +28,10 @@
#include <boost/algorithm/string/predicate.hpp>
#include <boost/algorithm/string/split.hpp>
#include <boost/algorithm/string/trim.hpp>
-#include <boost/interprocess/sync/file_lock.hpp>
-#include <filesystem>
-#include <fstream>
#include <set>
#include <sstream>
-#include "env/env.h"
+#include "env/env_util.h"
#include "gutil/strings/substitute.h"
#include "olap/file_helper.h"
#include "olap/olap_define.h"
@@ -66,9 +63,9 @@ static const char* const kMtabPath = "/etc/mtab";
static const char* const kTestFilePath = "/.testfile";
DataDir::DataDir(const std::string& path, int64_t capacity_bytes,
- TStorageMedium::type storage_medium, TabletManager* tablet_manager,
- TxnManager* txn_manager)
- : _path(path),
+ TStorageMedium::type storage_medium, const std::string& remote_path,
+ TabletManager* tablet_manager, TxnManager* txn_manager)
+ : _path_desc(path),
_capacity_bytes(capacity_bytes),
_available_bytes(0),
_disk_capacity_bytes(0),
@@ -77,9 +74,13 @@ DataDir::DataDir(const std::string& path, int64_t capacity_bytes,
_tablet_manager(tablet_manager),
_txn_manager(txn_manager),
_cluster_id(-1),
+ _cluster_id_incomplete(false),
_to_be_deleted(false),
_current_shard(0),
+ _env(Env::get_env(storage_medium)),
_meta(nullptr) {
+ _path_desc.storage_medium = storage_medium;
+ _path_desc.remote_path = remote_path;
_data_dir_metric_entity = DorisMetrics::instance()->metric_registry()->register_entity(
std::string("data_dir.") + path, {{"path", path}});
INT_GAUGE_METRIC_REGISTER(_data_dir_metric_entity, disks_total_capacity);
@@ -97,16 +98,15 @@ DataDir::~DataDir() {
}
Status DataDir::init() {
- if (!FileUtils::check_exist(_path)) {
+ if (!Env::Default()->path_exists(_path_desc.filepath).ok()) {
RETURN_NOT_OK_STATUS_WITH_WARN(
- Status::IOError(strings::Substitute("opendir failed, path=$0", _path)),
+ Status::IOError(strings::Substitute("opendir failed, path=$0", _path_desc.filepath)),
"check file exist failed");
}
RETURN_NOT_OK_STATUS_WITH_WARN(update_capacity(), "update_capacity failed");
RETURN_NOT_OK_STATUS_WITH_WARN(_init_cluster_id(), "_init_cluster_id failed");
RETURN_NOT_OK_STATUS_WITH_WARN(_init_capacity(), "_init_capacity failed");
- RETURN_NOT_OK_STATUS_WITH_WARN(_init_file_system(), "_init_file_system failed");
RETURN_NOT_OK_STATUS_WITH_WARN(_init_meta(), "_init_meta failed");
_is_used = true;
@@ -120,162 +120,87 @@ void DataDir::stop_bg_worker() {
}
Status DataDir::_init_cluster_id() {
- std::string cluster_id_path = _path + CLUSTER_ID_PREFIX;
- if (access(cluster_id_path.c_str(), F_OK) != 0) {
- int fd = open(cluster_id_path.c_str(), O_RDWR | O_CREAT,
- S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP);
- if (fd < 0 || close(fd) < 0) {
- RETURN_NOT_OK_STATUS_WITH_WARN(Status::IOError(strings::Substitute(
- "failed to create cluster id file $0, err=$1",
- cluster_id_path, errno_to_string(errno))),
- "create file failed");
+ FilePathDescStream path_desc_s;
+ path_desc_s << _path_desc << CLUSTER_ID_PREFIX;
+ FilePathDesc cluster_id_path_desc = path_desc_s.path_desc();
+ RETURN_IF_ERROR(read_cluster_id(Env::Default(), cluster_id_path_desc.filepath, &_cluster_id));
+ if (_cluster_id == -1) {
+ _cluster_id_incomplete = true;
+ }
+ if (is_remote()) {
+ int32_t remote_cluster_id;
+ RETURN_IF_ERROR(read_cluster_id(_env, cluster_id_path_desc.remote_path, &remote_cluster_id));
+ if (remote_cluster_id == -1) {
+ _cluster_id_incomplete = true;
+ }
+ if (remote_cluster_id != -1 && _cluster_id != -1 && _cluster_id != remote_cluster_id) {
+ return Status::InternalError(strings::Substitute(
+ "cluster id $0 is not equal with remote cluster id $1",
+ _cluster_id, remote_cluster_id));
+ }
+ if (_cluster_id == -1) {
+ _cluster_id = remote_cluster_id;
}
}
- // obtain lock of all cluster id paths
- FILE* fp = nullptr;
- fp = fopen(cluster_id_path.c_str(), "r+b");
- if (fp == nullptr) {
- RETURN_NOT_OK_STATUS_WITH_WARN(
- Status::IOError(
- strings::Substitute("failed to open cluster id file $0", cluster_id_path)),
- "open file failed");
- }
-
- int lock_res = flock(fp->_fileno, LOCK_EX | LOCK_NB);
- if (lock_res < 0) {
- fclose(fp);
- fp = nullptr;
- RETURN_NOT_OK_STATUS_WITH_WARN(
- Status::IOError(
- strings::Substitute("failed to flock cluster id file $0", cluster_id_path)),
- "flock file failed");
- }
-
- // obtain cluster id of all root paths
- auto st = _read_cluster_id(cluster_id_path, &_cluster_id);
- fclose(fp);
- return st;
+ return Status::OK();
}
-Status DataDir::_read_cluster_id(const std::string& cluster_id_path, int32_t* cluster_id) {
- int32_t tmp_cluster_id = -1;
-
- std::fstream fs(cluster_id_path.c_str(), std::fstream::in);
- if (!fs.is_open()) {
- RETURN_NOT_OK_STATUS_WITH_WARN(
- Status::IOError(
- strings::Substitute("failed to open cluster id file $0", cluster_id_path)),
- "open file failed");
- }
-
- fs >> tmp_cluster_id;
- fs.close();
-
- if (tmp_cluster_id == -1 && (fs.rdstate() & std::fstream::eofbit) != 0) {
+Status DataDir::read_cluster_id(Env* env, const std::string& cluster_id_path, int32_t* cluster_id) {
+ std::unique_ptr<RandomAccessFile> input_file;
+ Status exist_status = env->path_exists(cluster_id_path);
+ if (exist_status.ok()) {
+ Status status = env->new_random_access_file(cluster_id_path, &input_file);
+ RETURN_NOT_OK_STATUS_WITH_WARN(status, strings::Substitute("open file failed: $0, err=$1",
+ cluster_id_path, status.to_string()));
+ std::string content;
+ RETURN_IF_ERROR(input_file->read_all(&content));
+ if (content.size() > 0) {
+ *cluster_id = std::stoi(content);
+ } else {
+ *cluster_id = -1;
+ }
+ } else if (exist_status.is_not_found()) {
*cluster_id = -1;
- } else if (tmp_cluster_id >= 0 && (fs.rdstate() & std::fstream::eofbit) != 0) {
- *cluster_id = tmp_cluster_id;
} else {
- RETURN_NOT_OK_STATUS_WITH_WARN(
- Status::Corruption(strings::Substitute(
- "cluster id file $0 is corrupt. [id=$1 eofbit=$2 failbit=$3 badbit=$4]",
- cluster_id_path, tmp_cluster_id, fs.rdstate() & std::fstream::eofbit,
- fs.rdstate() & std::fstream::failbit, fs.rdstate() & std::fstream::badbit)),
- "file content is error");
+ RETURN_NOT_OK_STATUS_WITH_WARN(exist_status, strings::Substitute("check exist failed: $0, err=$1",
+ cluster_id_path, exist_status.to_string()));
}
return Status::OK();
}
Status DataDir::_init_capacity() {
- std::filesystem::path boost_path = _path;
- int64_t disk_capacity = std::filesystem::space(boost_path).capacity;
+ int64_t disk_capacity = -1;
+ int64_t available = -1;
+ RETURN_NOT_OK_STATUS_WITH_WARN(Env::Default()->get_space_info(_path_desc.filepath, &disk_capacity, &available),
+ strings::Substitute("get_space_info failed: $0", _path_desc.filepath));
if (_capacity_bytes == -1) {
_capacity_bytes = disk_capacity;
} else if (_capacity_bytes > disk_capacity) {
RETURN_NOT_OK_STATUS_WITH_WARN(
Status::InvalidArgument(strings::Substitute(
- "root path $0's capacity $1 should not larger than disk capacity $2", _path,
+ "root path $0's capacity $1 should not larger than disk capacity $2", _path_desc.filepath,
_capacity_bytes, disk_capacity)),
"init capacity failed");
}
- std::string data_path = _path + DATA_PREFIX;
- if (!FileUtils::check_exist(data_path) && !FileUtils::create_dir(data_path).ok()) {
+ std::string data_path = _path_desc.filepath + DATA_PREFIX;
+ Status exist_status = Env::Default()->path_exists(data_path);
+ if (!exist_status.ok() && (!exist_status.is_not_found() || !Env::Default()->create_dirs(data_path).ok())) {
RETURN_NOT_OK_STATUS_WITH_WARN(Status::IOError(strings::Substitute(
- "failed to create data root path $0", data_path)),
- "check_exist failed");
- }
-
- return Status::OK();
-}
-
-Status DataDir::_init_file_system() {
- struct stat s;
- if (stat(_path.c_str(), &s) != 0) {
- RETURN_NOT_OK_STATUS_WITH_WARN(
- Status::IOError(strings::Substitute("stat file $0 failed, err=$1", _path,
- errno_to_string(errno))),
- "stat file failed");
- }
-
- dev_t mount_device;
- if ((s.st_mode & S_IFMT) == S_IFBLK) {
- mount_device = s.st_rdev;
- } else {
- mount_device = s.st_dev;
- }
-
- FILE* mount_tablet = nullptr;
- if ((mount_tablet = setmntent(kMtabPath, "r")) == nullptr) {
- RETURN_NOT_OK_STATUS_WITH_WARN(
- Status::IOError(strings::Substitute("setmntent file $0 failed, err=$1", _path,
- errno_to_string(errno))),
- "setmntent file failed");
- }
-
- bool is_find = false;
- struct mntent* mount_entry = nullptr;
- struct mntent ent;
- char buf[1024];
- while ((mount_entry = getmntent_r(mount_tablet, &ent, buf, sizeof(buf))) != nullptr) {
- if (strcmp(_path.c_str(), mount_entry->mnt_dir) == 0 ||
- strcmp(_path.c_str(), mount_entry->mnt_fsname) == 0) {
- is_find = true;
- break;
- }
-
- if (stat(mount_entry->mnt_fsname, &s) == 0 && s.st_rdev == mount_device) {
- is_find = true;
- break;
- }
-
- if (stat(mount_entry->mnt_dir, &s) == 0 && s.st_dev == mount_device) {
- is_find = true;
- break;
- }
- }
-
- endmntent(mount_tablet);
-
- if (!is_find) {
- RETURN_NOT_OK_STATUS_WITH_WARN(
- Status::IOError(strings::Substitute("file system $0 not found", _path)),
- "find file system failed");
+ "failed to create data root path $0", data_path)), "create_dirs failed");
}
- _file_system = mount_entry->mnt_fsname;
-
return Status::OK();
}
Status DataDir::_init_meta() {
// init path hash
- _path_hash = hash_of_path(BackendOptions::get_localhost(), _path);
- LOG(INFO) << "path: " << _path << ", hash: " << _path_hash;
+ _path_hash = hash_of_path(BackendOptions::get_localhost(), _path_desc.filepath);
+ LOG(INFO) << "path: " << _path_desc.filepath << ", hash: " << _path_hash;
// init meta
- _meta = new (std::nothrow) OlapMeta(_path);
+ _meta = new (std::nothrow) OlapMeta(_path_desc.filepath);
if (_meta == nullptr) {
RETURN_NOT_OK_STATUS_WITH_WARN(
Status::MemoryAllocFailed("allocate memory for OlapMeta failed"),
@@ -284,32 +209,41 @@ Status DataDir::_init_meta() {
OLAPStatus res = _meta->init();
if (res != OLAP_SUCCESS) {
RETURN_NOT_OK_STATUS_WITH_WARN(
- Status::IOError(strings::Substitute("open rocksdb failed, path=$0", _path)),
+ Status::IOError(strings::Substitute("open rocksdb failed, path=$0", _path_desc.filepath)),
"init OlapMeta failed");
}
return Status::OK();
}
Status DataDir::set_cluster_id(int32_t cluster_id) {
- if (_cluster_id != -1) {
- if (_cluster_id == cluster_id) {
- return Status::OK();
- }
+ if (_cluster_id != -1 && _cluster_id != cluster_id) {
LOG(ERROR) << "going to set cluster id to already assigned store, cluster_id="
<< _cluster_id << ", new_cluster_id=" << cluster_id;
return Status::InternalError("going to set cluster id to already assigned store");
}
- return _write_cluster_id_to_path(_cluster_id_path(), cluster_id);
+ if (!_cluster_id_incomplete) {
+ return Status::OK();
+ }
+ FilePathDescStream path_desc_s;
+ path_desc_s << _path_desc << CLUSTER_ID_PREFIX;
+ return _write_cluster_id_to_path(path_desc_s.path_desc(), cluster_id);
}
-Status DataDir::_write_cluster_id_to_path(const std::string& path, int32_t cluster_id) {
- std::fstream fs(path.c_str(), std::fstream::out);
- if (!fs.is_open()) {
- LOG(WARNING) << "fail to open cluster id path. path=" << path;
- return Status::InternalError("IO Error");
+Status DataDir::_write_cluster_id_to_path(const FilePathDesc& path_desc, int32_t cluster_id) {
+ std::stringstream cluster_id_ss;
+ cluster_id_ss << cluster_id;
+ std::unique_ptr<WritableFile> wfile;
+ if (!Env::Default()->path_exists(path_desc.filepath).ok()) {
+ RETURN_IF_ERROR(env_util::write_string_to_file_sync(Env::Default(), Slice(cluster_id_ss.str()), path_desc.filepath));
+ }
+ if (_env->is_remote_env()) {
+ Status exist_status = _env->path_exists(path_desc.remote_path);
+ if (exist_status.is_not_found()) {
+ RETURN_IF_ERROR(env_util::write_string_to_file_sync(_env, Slice(cluster_id_ss.str()), path_desc.remote_path));
+ } else {
+ RETURN_IF_ERROR(exist_status);
+ }
}
- fs << cluster_id;
- fs.close();
return Status::OK();
}
@@ -318,7 +252,7 @@ void DataDir::health_check() {
if (_is_used) {
OLAPStatus res = OLAP_SUCCESS;
if ((res = _read_and_write_test_file()) != OLAP_SUCCESS) {
- LOG(WARNING) << "store read/write test file occur IO Error. path=" << _path;
+ LOG(WARNING) << "store read/write test file occur IO Error. path=" << _path_desc.filepath;
if (is_io_error(res)) {
_is_used = false;
}
@@ -328,9 +262,8 @@ void DataDir::health_check() {
}
OLAPStatus DataDir::_read_and_write_test_file() {
- std::string test_file = _path + kTestFilePath;
+ std::string test_file = _path_desc.filepath + kTestFilePath;
return read_write_test_file(test_file);
- ;
}
OLAPStatus DataDir::get_shard(uint64_t* shard) {
@@ -341,12 +274,10 @@ OLAPStatus DataDir::get_shard(uint64_t* shard) {
next_shard = _current_shard;
_current_shard = (_current_shard + 1) % MAX_SHARD_NUM;
}
- shard_path_stream << _path << DATA_PREFIX << "/" << next_shard;
+ shard_path_stream << _path_desc.filepath << DATA_PREFIX << "/" << next_shard;
std::string shard_path = shard_path_stream.str();
- if (!FileUtils::check_exist(shard_path)) {
- RETURN_WITH_WARN_IF_ERROR(FileUtils::create_dir(shard_path), OLAP_ERR_CANNOT_CREATE_DIR,
- "fail to create path. path=" + shard_path);
- }
+ RETURN_WITH_WARN_IF_ERROR(Env::Default()->create_dirs(shard_path), OLAP_ERR_CANNOT_CREATE_DIR,
+ "fail to create path. path=" + shard_path);
*shard = next_shard;
return OLAP_SUCCESS;
@@ -374,7 +305,7 @@ void DataDir::clear_tablets(std::vector<TabletInfo>* tablet_infos) {
}
std::string DataDir::get_absolute_shard_path(int64_t shard_id) {
- return strings::Substitute("$0$1/$2", _path, DATA_PREFIX, shard_id);
+ return strings::Substitute("$0$1/$2", _path_desc.filepath, DATA_PREFIX, shard_id);
}
std::string DataDir::get_absolute_tablet_path(int64_t shard_id, int64_t tablet_id,
@@ -385,7 +316,7 @@ std::string DataDir::get_absolute_tablet_path(int64_t shard_id, int64_t tablet_i
void DataDir::find_tablet_in_trash(int64_t tablet_id, std::vector<std::string>* paths) {
// path: /root_path/trash/time_label/tablet_id/schema_hash
- std::string trash_path = _path + TRASH_PREFIX;
+ std::string trash_path = _path_desc.filepath + TRASH_PREFIX;
std::vector<std::string> sub_dirs;
FileUtils::list_files(Env::Default(), trash_path, &sub_dirs);
for (auto& sub_dir : sub_dirs) {
@@ -417,31 +348,31 @@ OLAPStatus DataDir::_clean_unfinished_converting_data() {
const std::string& value) -> bool {
TabletMetaManager::remove(this, tablet_id, schema_hash, HEADER_PREFIX);
LOG(INFO) << "successfully clean temp tablet meta for tablet=" << tablet_id << "."
- << schema_hash << "from data dir: " << _path;
+ << schema_hash << "from data dir: " << _path_desc.filepath;
return true;
};
OLAPStatus clean_unfinished_meta_status = TabletMetaManager::traverse_headers(
_meta, clean_unifinished_tablet_meta_func, HEADER_PREFIX);
if (clean_unfinished_meta_status != OLAP_SUCCESS) {
// If failed to clean meta just skip the error, there will be useless metas in rocksdb column family
- LOG(WARNING) << "there is failure when clean temp tablet meta from data dir=" << _path;
+ LOG(WARNING) << "there is failure when clean temp tablet meta from data dir=" << _path_desc.filepath;
} else {
- LOG(INFO) << "successfully clean temp tablet meta from data dir=" << _path;
+ LOG(INFO) << "successfully clean temp tablet meta from data dir=" << _path_desc.filepath;
}
auto clean_unifinished_rowset_meta_func = [this](TabletUid tablet_uid, RowsetId rowset_id,
const std::string& value) -> bool {
RowsetMetaManager::remove(_meta, tablet_uid, rowset_id);
LOG(INFO) << "successfully clean temp rowset meta for rowset_id=" << rowset_id
- << " from data dir=" << _path;
+ << " from data dir=" << _path_desc.filepath;
return true;
};
OLAPStatus clean_unfinished_rowset_meta_status =
RowsetMetaManager::traverse_rowset_metas(_meta, clean_unifinished_rowset_meta_func);
if (clean_unfinished_rowset_meta_status != OLAP_SUCCESS) {
// If failed to clean meta just skip the error, there will be useless metas in rocksdb column family
- LOG(FATAL) << "fail to clean temp rowset meta from data dir=" << _path;
+ LOG(FATAL) << "fail to clean temp rowset meta from data dir=" << _path_desc.filepath;
} else {
- LOG(INFO) << "success to clean temp rowset meta from data dir=" << _path;
+ LOG(INFO) << "success to clean temp rowset meta from data dir=" << _path_desc.filepath;
}
return OLAP_SUCCESS;
}
@@ -453,7 +384,7 @@ bool DataDir::convert_old_data_success() {
OLAPStatus DataDir::set_convert_finished() {
OLAPStatus res = _meta->set_tablet_convert_finished();
if (res != OLAP_SUCCESS) {
- LOG(FATAL) << "save convert flag failed after convert old tablet. dir=" << _path;
+ LOG(FATAL) << "save convert flag failed after convert old tablet. dir=" << _path_desc.filepath;
return res;
}
return OLAP_SUCCESS;
@@ -482,16 +413,16 @@ OLAPStatus DataDir::_check_incompatible_old_format_tablet() {
_meta, check_incompatible_old_func, OLD_HEADER_PREFIX);
if (check_incompatible_old_status != OLAP_SUCCESS) {
LOG(WARNING) << "check incompatible old format meta fails, it may lead to data missing!!! "
- << _path;
+ << _path_desc.filepath;
} else {
- LOG(INFO) << "successfully check incompatible old format meta " << _path;
+ LOG(INFO) << "successfully check incompatible old format meta " << _path_desc.filepath;
}
return check_incompatible_old_status;
}
// TODO(ygl): deal with rowsets and tablets when load failed
OLAPStatus DataDir::load() {
- LOG(INFO) << "start to load tablets from " << _path;
+ LOG(INFO) << "start to load tablets from " << _path_desc.filepath;
// load rowset meta from meta env and create rowset
// COMMITTED: add to txn manager
// VISIBLE: add to tablet
@@ -518,9 +449,9 @@ OLAPStatus DataDir::load() {
RowsetMetaManager::traverse_rowset_metas(_meta, load_rowset_func);
if (load_rowset_status != OLAP_SUCCESS) {
- LOG(WARNING) << "errors when load rowset meta from meta env, skip this data dir:" << _path;
+ LOG(WARNING) << "errors when load rowset meta from meta env, skip this data dir:" << _path_desc.filepath;
} else {
- LOG(INFO) << "load rowset from meta finished, data dir: " << _path;
+ LOG(INFO) << "load rowset from meta finished, data dir: " << _path_desc.filepath;
}
// load tablet
@@ -561,19 +492,19 @@ OLAPStatus DataDir::load() {
if (failed_tablet_ids.size() != 0) {
LOG(WARNING) << "load tablets from header failed"
<< ", loaded tablet: " << tablet_ids.size()
- << ", error tablet: " << failed_tablet_ids.size() << ", path: " << _path;
+ << ", error tablet: " << failed_tablet_ids.size() << ", path: " << _path_desc.filepath;
if (!config::ignore_load_tablet_failure) {
- LOG(FATAL) << "load tablets encounter failure. stop BE process. path: " << _path;
+ LOG(FATAL) << "load tablets encounter failure. stop BE process. path: " << _path_desc.filepath;
}
}
if (load_tablet_status != OLAP_SUCCESS) {
LOG(WARNING) << "there is failure when loading tablet headers"
<< ", loaded tablet: " << tablet_ids.size()
- << ", error tablet: " << failed_tablet_ids.size() << ", path: " << _path;
+ << ", error tablet: " << failed_tablet_ids.size() << ", path: " << _path_desc.filepath;
} else {
LOG(INFO) << "load tablet from meta finished"
<< ", loaded tablet: " << tablet_ids.size()
- << ", error tablet: " << failed_tablet_ids.size() << ", path: " << _path;
+ << ", error tablet: " << failed_tablet_ids.size() << ", path: " << _path_desc.filepath;
}
// traverse rowset
@@ -592,7 +523,7 @@ OLAPStatus DataDir::load() {
}
RowsetSharedPtr rowset;
OLAPStatus create_status = RowsetFactory::create_rowset(
- &tablet->tablet_schema(), tablet->tablet_path(), rowset_meta, &rowset);
+ &tablet->tablet_schema(), tablet->tablet_path_desc(), rowset_meta, &rowset);
if (create_status != OLAP_SUCCESS) {
LOG(WARNING) << "could not create rowset from rowsetmeta: "
<< " rowset_id: " << rowset_meta->rowset_id()
@@ -751,9 +682,9 @@ void DataDir::perform_path_scan() {
LOG(INFO) << "_all_check_paths is not empty when path scan.";
return;
}
- LOG(INFO) << "start to scan data dir path:" << _path;
+ LOG(INFO) << "start to scan data dir path:" << _path_desc.filepath;
std::set<std::string> shards;
- std::string data_path = _path + DATA_PREFIX;
+ std::string data_path = _path_desc.filepath + DATA_PREFIX;
Status ret = FileUtils::list_dirs_files(data_path, &shards, nullptr, Env::Default());
if (!ret.ok()) {
@@ -800,13 +731,13 @@ void DataDir::perform_path_scan() {
}
}
}
- LOG(INFO) << "scan data dir path: " << _path << " finished. path size: "
- << _all_check_paths.size() + _all_tablet_schemahash_paths.size();
+ LOG(INFO) << "scan data dir path: " << _path_desc.filepath
+ << " finished. path size: " << _all_check_paths.size() + _all_tablet_schemahash_paths.size();
_check_path_cv.notify_one();
}
void DataDir::_process_garbage_path(const std::string& path) {
- if (FileUtils::check_exist(path)) {
+ if (_env->path_exists(path).ok()) {
LOG(INFO) << "collect garbage dir path: " << path;
WARN_IF_ERROR(FileUtils::remove_all(path), "remove garbage dir failed. path: " + path);
}
@@ -818,24 +749,18 @@ bool DataDir::_check_pending_ids(const std::string& id) {
}
Status DataDir::update_capacity() {
- try {
- std::filesystem::path path_name(_path);
- std::filesystem::space_info path_info = std::filesystem::space(path_name);
- _available_bytes = path_info.available;
- if (_disk_capacity_bytes == 0) {
- // disk capacity only need to be set once
- _disk_capacity_bytes = path_info.capacity;
- }
- } catch (std::filesystem::filesystem_error& e) {
- RETURN_NOT_OK_STATUS_WITH_WARN(
- Status::IOError(strings::Substitute(
- "get path $0 available capacity failed, error=$1", _path, e.what())),
- "std::filesystem::space failed");
+ RETURN_NOT_OK_STATUS_WITH_WARN(_env->get_space_info(_path_desc.filepath, &_disk_capacity_bytes, &_available_bytes),
+ strings::Substitute("get_space_info failed: $0", _path_desc.filepath));
+ if (_disk_capacity_bytes < 0) {
+ _disk_capacity_bytes = _capacity_bytes;
+ }
+ if (_available_bytes < 0) {
+ _available_bytes = _capacity_bytes;
}
disks_total_capacity->set_value(_disk_capacity_bytes);
disks_avail_capacity->set_value(_available_bytes);
- LOG(INFO) << "path: " << _path << " total capacity: " << _disk_capacity_bytes
+ LOG(INFO) << "path: " << _path_desc.filepath << " total capacity: " << _disk_capacity_bytes
<< ", available capacity: " << _available_bytes;
return Status::OK();
@@ -858,7 +783,7 @@ bool DataDir::reach_capacity_limit(int64_t incoming_data_size) {
if (used_pct >= config::storage_flood_stage_usage_percent / 100.0 &&
left_bytes <= config::storage_flood_stage_left_capacity_bytes) {
LOG(WARNING) << "reach capacity limit. used pct: " << used_pct
- << ", left bytes: " << left_bytes << ", path: " << _path;
+ << ", left bytes: " << left_bytes << ", path: " << _path_desc.filepath;
return true;
}
return false;
diff --git a/be/src/olap/data_dir.h b/be/src/olap/data_dir.h
index cd9844f..845a336 100644
--- a/be/src/olap/data_dir.h
+++ b/be/src/olap/data_dir.h
@@ -24,6 +24,7 @@
#include <string>
#include "common/status.h"
+#include "env/env.h"
#include "gen_cpp/Types_types.h"
#include "gen_cpp/olap_file.pb.h"
#include "olap/olap_common.h"
@@ -44,21 +45,24 @@ class DataDir {
public:
DataDir(const std::string& path, int64_t capacity_bytes = -1,
TStorageMedium::type storage_medium = TStorageMedium::HDD,
+ const std::string& remote_path = "",
TabletManager* tablet_manager = nullptr, TxnManager* txn_manager = nullptr);
~DataDir();
Status init();
void stop_bg_worker();
- const std::string& path() const { return _path; }
+ const std::string& path() const { return _path_desc.filepath; }
+ const FilePathDesc& path_desc() const { return _path_desc;}
size_t path_hash() const { return _path_hash; }
bool is_used() const { return _is_used; }
void set_is_used(bool is_used) { _is_used = is_used; }
int32_t cluster_id() const { return _cluster_id; }
+ bool cluster_id_incomplete() const { return _cluster_id_incomplete; }
DataDirInfo get_dir_info() {
DataDirInfo info;
- info.path = _path;
+ info.path_desc = _path_desc;
info.path_hash = _path_hash;
info.disk_capacity = _disk_capacity_bytes;
info.available = _available_bytes;
@@ -79,6 +83,8 @@ public:
bool is_ssd_disk() const { return _storage_medium == TStorageMedium::SSD; }
+ bool is_remote() const { return _env->is_remote_env(); }
+
TStorageMedium::type storage_medium() const { return _storage_medium; }
void register_tablet(Tablet* tablet);
@@ -130,17 +136,19 @@ public:
void disks_compaction_num_increment(int64_t delta);
+ Env* env() {
+ return _env;
+ }
+
private:
- std::string _cluster_id_path() const { return _path + CLUSTER_ID_PREFIX; }
Status _init_cluster_id();
Status _init_capacity();
- Status _init_file_system();
Status _init_meta();
Status _check_disk();
OLAPStatus _read_and_write_test_file();
- Status _read_cluster_id(const std::string& cluster_id_path, int32_t* cluster_id);
- Status _write_cluster_id_to_path(const std::string& path, int32_t cluster_id);
+ Status read_cluster_id(Env* env, const std::string& cluster_id_path, int32_t* cluster_id);
+ Status _write_cluster_id_to_path(const FilePathDesc& path_desc, int32_t cluster_id);
OLAPStatus _clean_unfinished_converting_data();
// Check whether has old format (hdr_ start) in olap. When doris updating to current version,
// it may lead to data missing. When conf::storage_strict_check_incompatible_old_format is true,
@@ -156,7 +164,7 @@ private:
private:
bool _stop_bg_worker = false;
- std::string _path;
+ FilePathDesc _path_desc;
size_t _path_hash;
// user specified capacity
int64_t _capacity_bytes;
@@ -168,11 +176,12 @@ private:
int64_t _disk_capacity_bytes;
TStorageMedium::type _storage_medium;
bool _is_used;
+ Env* _env = nullptr;
- std::string _file_system;
TabletManager* _tablet_manager;
TxnManager* _txn_manager;
int32_t _cluster_id;
+ bool _cluster_id_incomplete = false;
// This flag will be set true if this store was not in root path when reloading
bool _to_be_deleted;
diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp
index 42712a6..c9467dd 100644
--- a/be/src/olap/delta_writer.cpp
+++ b/be/src/olap/delta_writer.cpp
@@ -134,7 +134,7 @@ OLAPStatus DeltaWriter::init() {
} else {
writer_context.rowset_type = ALPHA_ROWSET;
}
- writer_context.rowset_path_prefix = _tablet->tablet_path();
+ writer_context.path_desc = _tablet->tablet_path_desc();
writer_context.tablet_schema = &(_tablet->tablet_schema());
writer_context.rowset_state = PREPARED;
writer_context.txn_id = _req.txn_id;
diff --git a/be/src/olap/fs/CMakeLists.txt b/be/src/olap/fs/CMakeLists.txt
index 107e66b..83f27a3 100644
--- a/be/src/olap/fs/CMakeLists.txt
+++ b/be/src/olap/fs/CMakeLists.txt
@@ -27,4 +27,5 @@ add_library(OlapFs STATIC
block_manager.cpp
fs_util.cpp
file_block_manager.cpp
+ remote_block_manager.cpp
)
diff --git a/be/src/olap/fs/block_manager.h b/be/src/olap/fs/block_manager.h
index 75ab135..55be413 100644
--- a/be/src/olap/fs/block_manager.h
+++ b/be/src/olap/fs/block_manager.h
@@ -24,6 +24,7 @@
#include <vector>
#include "common/status.h"
+#include "env/env.h"
namespace doris {
@@ -57,7 +58,7 @@ public:
// blocks correspond to a file).
// For convenience, the path interface is directly exposed. At that time, the path()
// method should be removed.
- virtual const std::string& path() const = 0;
+ virtual const FilePathDesc& path_desc() const = 0;
};
// A block that has been opened for writing. There may only be a single
@@ -170,8 +171,14 @@ public:
// used to specify directories based on block type (e.g. to prefer bloom block
// placement into SSD-backed directories).
struct CreateBlockOptions {
+ CreateBlockOptions(const FilePathDesc& new_path_desc) {
+ path_desc = new_path_desc;
+ }
+ CreateBlockOptions(const std::string& path) {
+ path_desc.filepath = path;
+ }
// const std::string tablet_id;
- const std::string path;
+ FilePathDesc path_desc;
};
// Block manager creation options.
@@ -229,7 +236,7 @@ public:
// may fail.
//
// Does not modify 'block' on error.
- virtual Status open_block(const std::string& path, std::unique_ptr<ReadableBlock>* block) = 0;
+ virtual Status open_block(const FilePathDesc& path_desc, std::unique_ptr<ReadableBlock>* block) = 0;
// Retrieves the IDs of all blocks under management by this block manager.
// These include ReadableBlocks as well as WritableBlocks.
@@ -240,6 +247,10 @@ public:
// even exist after the call.
virtual Status get_all_block_ids(std::vector<BlockId>* block_ids) = 0;
+ virtual Status delete_block(const FilePathDesc& path_desc, bool is_dir = false) = 0;
+
+ virtual Status link_file(const FilePathDesc& src_path_desc, const FilePathDesc& dest_path_desc) = 0;
+
static const std::string block_manager_preflush_control;
};
diff --git a/be/src/olap/fs/file_block_manager.cpp b/be/src/olap/fs/file_block_manager.cpp
index f62f95d..a946d28 100644
--- a/be/src/olap/fs/file_block_manager.cpp
+++ b/be/src/olap/fs/file_block_manager.cpp
@@ -63,7 +63,7 @@ namespace internal {
// FileWritableBlock instances is expected to be low.
class FileWritableBlock : public WritableBlock {
public:
- FileWritableBlock(FileBlockManager* block_manager, string path,
+ FileWritableBlock(FileBlockManager* block_manager, const FilePathDesc& path_desc,
shared_ptr<WritableFile> writer);
virtual ~FileWritableBlock();
@@ -75,7 +75,7 @@ public:
virtual BlockManager* block_manager() const override;
virtual const BlockId& id() const override;
- virtual const std::string& path() const override;
+ virtual const FilePathDesc& path_desc() const override;
virtual Status append(const Slice& data) override;
@@ -106,7 +106,7 @@ private:
FileBlockManager* _block_manager;
const BlockId _block_id;
- const string _path;
+ FilePathDesc _path_desc;
// The underlying opened file backing this block.
shared_ptr<WritableFile> _writer;
@@ -117,11 +117,11 @@ private:
size_t _bytes_appended;
};
-FileWritableBlock::FileWritableBlock(FileBlockManager* block_manager, string path,
+FileWritableBlock::FileWritableBlock(FileBlockManager* block_manager, const FilePathDesc& path_desc,
shared_ptr<WritableFile> writer)
: _block_manager(block_manager),
- _path(std::move(path)),
- _writer(std::move(writer)),
+ _path_desc(path_desc),
+ _writer(writer),
_state(CLEAN),
_bytes_appended(0) {
if (_block_manager->_metrics) {
@@ -132,7 +132,7 @@ FileWritableBlock::FileWritableBlock(FileBlockManager* block_manager, string pat
FileWritableBlock::~FileWritableBlock() {
if (_state != CLOSED) {
- WARN_IF_ERROR(abort(), strings::Substitute("Failed to close block $0", _path));
+ WARN_IF_ERROR(abort(), strings::Substitute("Failed to close block $0", _path_desc.filepath));
}
}
@@ -142,7 +142,7 @@ Status FileWritableBlock::close() {
Status FileWritableBlock::abort() {
RETURN_IF_ERROR(_close(NO_SYNC));
- return _block_manager->_delete_block(_path);
+ return _block_manager->delete_block(_path_desc);
}
BlockManager* FileWritableBlock::block_manager() const {
@@ -154,8 +154,8 @@ const BlockId& FileWritableBlock::id() const {
return _block_id;
}
-const string& FileWritableBlock::path() const {
- return _path;
+const FilePathDesc& FileWritableBlock::path_desc() const {
+ return _path_desc;
}
Status FileWritableBlock::append(const Slice& data) {
@@ -163,7 +163,7 @@ Status FileWritableBlock::append(const Slice& data) {
}
Status FileWritableBlock::appendv(const Slice* data, size_t data_cnt) {
- DCHECK(_state == CLEAN || _state == DIRTY) << "path=" << _path << " invalid state=" << _state;
+ DCHECK(_state == CLEAN || _state == DIRTY) << "path=" << _path_desc.filepath << " invalid state=" << _state;
RETURN_IF_ERROR(_writer->appendv(data, data_cnt));
_state = DIRTY;
@@ -175,19 +175,19 @@ Status FileWritableBlock::appendv(const Slice* data, size_t data_cnt) {
}
Status FileWritableBlock::flush_data_async() {
- VLOG_NOTICE << "Flushing block " << _path;
+ VLOG_NOTICE << "Flushing block " << _path_desc.filepath;
RETURN_IF_ERROR(_writer->flush(WritableFile::FLUSH_ASYNC));
return Status::OK();
}
Status FileWritableBlock::finalize() {
DCHECK(_state == CLEAN || _state == DIRTY || _state == FINALIZED)
- << "path=" << _path << "Invalid state: " << _state;
+ << "path=" << _path_desc.filepath << "Invalid state: " << _state;
if (_state == FINALIZED) {
return Status::OK();
}
- VLOG_NOTICE << "Finalizing block " << _path;
+ VLOG_NOTICE << "Finalizing block " << _path_desc.filepath;
if (_state == DIRTY && BlockManager::block_manager_preflush_control == "finalize") {
flush_data_async();
}
@@ -211,15 +211,15 @@ Status FileWritableBlock::_close(SyncMode mode) {
Status sync;
if (mode == SYNC && (_state == CLEAN || _state == DIRTY || _state == FINALIZED)) {
// Safer to synchronize data first, then metadata.
- VLOG_NOTICE << "Syncing block " << _path;
+ VLOG_NOTICE << "Syncing block " << _path_desc.filepath;
if (_block_manager->_metrics) {
_block_manager->_metrics->total_disk_sync->increment(1);
}
sync = _writer->sync();
if (sync.ok()) {
- sync = _block_manager->_sync_metadata(_path);
+ sync = _block_manager->_sync_metadata(_path_desc.filepath);
}
- WARN_IF_ERROR(sync, strings::Substitute("Failed to sync when closing block $0", _path));
+ WARN_IF_ERROR(sync, strings::Substitute("Failed to sync when closing block $0", _path_desc.filepath));
}
Status close = _writer->close();
@@ -250,7 +250,7 @@ Status FileWritableBlock::_close(SyncMode mode) {
// embed a FileBlockLocation, using the simpler BlockId instead.
class FileReadableBlock : public ReadableBlock {
public:
- FileReadableBlock(FileBlockManager* block_manager, string path,
+ FileReadableBlock(FileBlockManager* block_manager, const FilePathDesc& path_desc,
std::shared_ptr<OpenedFileHandle<RandomAccessFile>> file_handle);
virtual ~FileReadableBlock();
@@ -260,7 +260,7 @@ public:
virtual BlockManager* block_manager() const override;
virtual const BlockId& id() const override;
- virtual const std::string& path() const override;
+ virtual const FilePathDesc& path_desc() const override;
virtual Status size(uint64_t* sz) const override;
@@ -276,7 +276,7 @@ private:
// The block's identifier.
const BlockId _block_id;
- const string _path;
+ const FilePathDesc _path_desc;
// The underlying opened file backing this block.
std::shared_ptr<OpenedFileHandle<RandomAccessFile>> _file_handle;
@@ -291,11 +291,11 @@ private:
};
FileReadableBlock::FileReadableBlock(
- FileBlockManager* block_manager, string path,
+ FileBlockManager* block_manager, const FilePathDesc& path_desc,
std::shared_ptr<OpenedFileHandle<RandomAccessFile>> file_handle)
: _block_manager(block_manager),
- _path(std::move(path)),
- _file_handle(std::move(file_handle)),
+ _path_desc(path_desc),
+ _file_handle(file_handle),
_closed(false) {
if (_block_manager->_metrics) {
_block_manager->_metrics->blocks_open_reading->increment(1);
@@ -305,7 +305,7 @@ FileReadableBlock::FileReadableBlock(
}
FileReadableBlock::~FileReadableBlock() {
- WARN_IF_ERROR(close(), strings::Substitute("Failed to close block $0", _path));
+ WARN_IF_ERROR(close(), strings::Substitute("Failed to close block $0", _path_desc.filepath));
}
Status FileReadableBlock::close() {
@@ -329,8 +329,8 @@ const BlockId& FileReadableBlock::id() const {
return _block_id;
}
-const string& FileReadableBlock::path() const {
- return _path;
+const FilePathDesc& FileReadableBlock::path_desc() const {
+ return _path_desc;
}
Status FileReadableBlock::size(uint64_t* sz) const {
@@ -397,35 +397,35 @@ Status FileBlockManager::create_block(const CreateBlockOptions& opts,
shared_ptr<WritableFile> writer;
WritableFileOptions wr_opts;
wr_opts.mode = Env::MUST_CREATE;
- RETURN_IF_ERROR(env_util::open_file_for_write(wr_opts, _env, opts.path, &writer));
+ RETURN_IF_ERROR(env_util::open_file_for_write(wr_opts, _env, opts.path_desc.filepath, &writer));
- VLOG_CRITICAL << "Creating new block at " << opts.path;
- block->reset(new internal::FileWritableBlock(this, opts.path, writer));
+ VLOG_CRITICAL << "Creating new block at " << opts.path_desc.filepath;
+ block->reset(new internal::FileWritableBlock(this, opts.path_desc, writer));
return Status::OK();
}
-Status FileBlockManager::open_block(const std::string& path,
+Status FileBlockManager::open_block(const FilePathDesc& path_desc,
std::unique_ptr<ReadableBlock>* block) {
- VLOG_CRITICAL << "Opening block with path at " << path;
+ VLOG_CRITICAL << "Opening block with path at " << path_desc.filepath;
std::shared_ptr<OpenedFileHandle<RandomAccessFile>> file_handle(
new OpenedFileHandle<RandomAccessFile>());
- bool found = _file_cache->lookup(path, file_handle.get());
+ bool found = _file_cache->lookup(path_desc.filepath, file_handle.get());
if (!found) {
std::unique_ptr<RandomAccessFile> file;
- RETURN_IF_ERROR(_env->new_random_access_file(path, &file));
- _file_cache->insert(path, file.release(), file_handle.get());
+ RETURN_IF_ERROR(_env->new_random_access_file(path_desc.filepath, &file));
+ _file_cache->insert(path_desc.filepath, file.release(), file_handle.get());
}
- block->reset(new internal::FileReadableBlock(this, path, file_handle));
+ block->reset(new internal::FileReadableBlock(this, path_desc, file_handle));
return Status::OK();
}
// TODO(lingbin): We should do something to ensure that deletion can only be done
// after the last reader or writer has finished
-Status FileBlockManager::_delete_block(const string& path) {
+Status FileBlockManager::delete_block(const FilePathDesc& path_desc, bool is_dir) {
CHECK(!_opts.read_only);
- RETURN_IF_ERROR(_env->delete_file(path));
+ RETURN_IF_ERROR(_env->delete_file(path_desc.filepath));
// We don't bother fsyncing the parent directory as there's nothing to be
// gained by ensuring that the deletion is made durable. Even if we did
@@ -439,9 +439,18 @@ Status FileBlockManager::_delete_block(const string& path) {
return Status::OK();
}
+Status FileBlockManager::link_file(const FilePathDesc& src_path_desc, const FilePathDesc& dest_path_desc) {
+ if (link(src_path_desc.filepath.c_str(), dest_path_desc.filepath.c_str()) != 0) {
+ LOG(WARNING) << "fail to create hard link. from=" << src_path_desc.filepath << ", "
+ << "to=" << dest_path_desc.filepath << ", " << "errno=" << Errno::no();
+ return Status::InternalError("link file failed");
+ }
+ return Status::OK();
+}
+
// TODO(lingbin): only one level is enough?
-Status FileBlockManager::_sync_metadata(const string& path) {
- string dir = path_util::dir_name(path);
+Status FileBlockManager::_sync_metadata(const FilePathDesc& path_desc) {
+ string dir = path_util::dir_name(path_desc.filepath);
if (_metrics) {
_metrics->total_disk_sync->increment(1);
}
diff --git a/be/src/olap/fs/file_block_manager.h b/be/src/olap/fs/file_block_manager.h
index b73da5d..118d619 100644
--- a/be/src/olap/fs/file_block_manager.h
+++ b/be/src/olap/fs/file_block_manager.h
@@ -70,27 +70,30 @@ public:
Status create_block(const CreateBlockOptions& opts,
std::unique_ptr<WritableBlock>* block) override;
- Status open_block(const std::string& path, std::unique_ptr<ReadableBlock>* block) override;
+ Status open_block(const FilePathDesc& path_desc, std::unique_ptr<ReadableBlock>* block) override;
Status get_all_block_ids(std::vector<BlockId>* block_ids) override {
// TODO(lingbin): to be implemented after we assign each block an id
return Status::OK();
};
-private:
- friend class internal::FileReadableBlock;
- friend class internal::FileWritableBlock;
-
// Deletes an existing block, allowing its space to be reclaimed by the
// filesystem. The change is immediately made durable.
//
// Blocks may be deleted while they are open for reading or writing;
// the actual deletion will take place after the last open reader or
// writer is closed.
- Status _delete_block(const std::string& path);
+ // is_dir: whether this path is a dir or file. if it is true, delete all files in this path
+ Status delete_block(const FilePathDesc& path_desc, bool is_dir = false);
+
+ Status link_file(const FilePathDesc& src_path_desc, const FilePathDesc& dest_path_desc) override;
+
+private:
+ friend class internal::FileReadableBlock;
+ friend class internal::FileWritableBlock;
// Synchronizes the metadata for a block with the given location.
- Status _sync_metadata(const std::string& path);
+ Status _sync_metadata(const FilePathDesc& path_desc);
Env* env() const { return _env; }
diff --git a/be/src/olap/fs/fs_util.cpp b/be/src/olap/fs/fs_util.cpp
index 3ff1c76..7414e76 100644
--- a/be/src/olap/fs/fs_util.cpp
+++ b/be/src/olap/fs/fs_util.cpp
@@ -19,7 +19,9 @@
#include "common/status.h"
#include "env/env.h"
+#include "env/env_remote.h"
#include "olap/fs/file_block_manager.h"
+#include "olap/fs/remote_block_manager.h"
#include "olap/storage_engine.h"
#include "runtime/exec_env.h"
@@ -27,11 +29,33 @@ namespace doris {
namespace fs {
namespace fs_util {
-BlockManager* block_manager() {
+BlockManager* block_manager(TStorageMedium::type storage_medium) {
fs::BlockManagerOptions bm_opts;
bm_opts.read_only = false;
- static FileBlockManager block_mgr(Env::Default(), std::move(bm_opts));
- return &block_mgr;
+ switch (storage_medium) {
+ case TStorageMedium::S3:
+ bm_opts.read_only = true;
+ static RemoteBlockManager remote_block_mgr(
+ Env::Default(), dynamic_cast<RemoteEnv*>(Env::get_env(storage_medium)), bm_opts);
+ return &remote_block_mgr;
+ case TStorageMedium::SSD:
+ case TStorageMedium::HDD:
+ default:
+ static FileBlockManager block_mgr(Env::Default(), std::move(bm_opts));
+ return &block_mgr;
+ }
+}
+
+StorageMediumPB get_storage_medium_pb(TStorageMedium::type t_storage_medium) {
+ switch (t_storage_medium) {
+ case TStorageMedium::S3:
+ return StorageMediumPB::S3;
+ case TStorageMedium::SSD:
+ return StorageMediumPB::SSD;
+ case TStorageMedium::HDD:
+ default:
+ return StorageMediumPB::HDD;
+ }
}
} // namespace fs_util
diff --git a/be/src/olap/fs/fs_util.h b/be/src/olap/fs/fs_util.h
index c4c9a5e..3c02249 100644
--- a/be/src/olap/fs/fs_util.h
+++ b/be/src/olap/fs/fs_util.h
@@ -18,6 +18,8 @@
#pragma once
#include "common/status.h"
+#include "gen_cpp/olap_file.pb.h"
+#include "gen_cpp/Types_types.h"
#include "olap/fs/block_manager.h"
namespace doris {
@@ -26,7 +28,9 @@ namespace fs_util {
// Each BlockManager type may have different params, so we provide a separate
// method for each type(instead of a factory method which require same params)
-BlockManager* block_manager();
+BlockManager* block_manager(TStorageMedium::type storage_medium);
+
+StorageMediumPB get_storage_medium_pb(TStorageMedium::type t_storage_medium);
} // namespace fs_util
} // namespace fs
diff --git a/be/src/olap/fs/remote_block_manager.cpp b/be/src/olap/fs/remote_block_manager.cpp
new file mode 100644
index 0000000..575c0f7
--- /dev/null
+++ b/be/src/olap/fs/remote_block_manager.cpp
@@ -0,0 +1,339 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "olap/fs/remote_block_manager.h"
+
+#include <atomic>
+#include <cstddef>
+#include <memory>
+#include <numeric>
+#include <string>
+#include <utility>
+
+#include "common/config.h"
+#include "common/logging.h"
+#include "env/env.h"
+#include "env/env_posix.h"
+#include "env/env_remote.h"
+#include "env/env_util.h"
+#include "gutil/strings/substitute.h"
+#include "olap/fs/block_id.h"
+
+using std::shared_ptr;
+using std::string;
+
+using strings::Substitute;
+
+namespace doris {
+namespace fs {
+
+namespace internal {
+
+////////////////////////////////////////////////////////////
+// RemoteWritableBlock
+////////////////////////////////////////////////////////////
+
+// A remote-backed block that has been opened for writing.
+//
+// Contains a pointer to the block manager as well as file path
+// so that dirty metadata can be synced via BlockManager::SyncMetadata()
+// at Close() time. Embedding a file path (and not a simpler
+// BlockId) consumes more memory, but the number of outstanding
+// RemoteWritableBlock instances is expected to be low.
+class RemoteWritableBlock : public WritableBlock {
+public:
+ RemoteWritableBlock(RemoteBlockManager* block_manager, const FilePathDesc& path_desc,
+ shared_ptr<WritableFile> writer);
+
+ virtual ~RemoteWritableBlock();
+
+ virtual Status close() override;
+
+ virtual Status abort() override;
+
+ virtual BlockManager* block_manager() const override;
+
+ virtual const BlockId& id() const override;
+ virtual const FilePathDesc& path_desc() const override;
+
+ virtual Status append(const Slice& data) override;
+
+ virtual Status appendv(const Slice* data, size_t data_cnt) override;
+
+ virtual Status finalize() override;
+
+ virtual size_t bytes_appended() const override;
+
+ virtual State state() const override;
+
+ void handle_error(const Status& s) const;
+
+ // Starts an asynchronous flush of dirty block data to disk.
+ Status flush_data_async();
+
+private:
+ DISALLOW_COPY_AND_ASSIGN(RemoteWritableBlock);
+
+ enum SyncMode { SYNC, NO_SYNC };
+
+ // Close the block, optionally synchronizing dirty data and metadata.
+ Status _close(SyncMode mode);
+
+ // Back pointer to the block manager.
+ //
+ // Should remain alive for the lifetime of this block.
+ RemoteBlockManager* _block_manager;
+
+ const BlockId _block_id;
+ FilePathDesc _path_desc;
+
+ // The underlying opened file backing this block.
+ shared_ptr<WritableFile> _local_writer;
+
+ State _state;
+
+ // The number of bytes successfully appended to the block.
+ size_t _bytes_appended;
+};
+
+RemoteWritableBlock::RemoteWritableBlock(RemoteBlockManager* block_manager, const FilePathDesc& path_desc,
+ shared_ptr<WritableFile> local_writer) : _block_manager(block_manager),
+ _path_desc(path_desc), _local_writer(local_writer) {
+}
+
+RemoteWritableBlock::~RemoteWritableBlock() {
+}
+
+Status RemoteWritableBlock::close() {
+ return Status::IOError("invalid function", 0, "");
+}
+
+Status RemoteWritableBlock::abort() {
+ return Status::IOError("invalid function", 0, "");
+}
+
+BlockManager* RemoteWritableBlock::block_manager() const {
+ return _block_manager;
+}
+
+const BlockId& RemoteWritableBlock::id() const {
+ CHECK(false) << "Not support Block.id(). (TODO)";
+ return _block_id;
+}
+
+const FilePathDesc& RemoteWritableBlock::path_desc() const {
+ return _path_desc;
+}
+
+Status RemoteWritableBlock::append(const Slice& data) {
+ return appendv(&data, 1);
+}
+
+Status RemoteWritableBlock::appendv(const Slice* data, size_t data_cnt) {
+ return Status::IOError("invalid function", 0, "");
+}
+
+Status RemoteWritableBlock::flush_data_async() {
+ return Status::IOError("invalid function", 0, "");
+}
+
+Status RemoteWritableBlock::finalize() {
+ return Status::IOError("invalid function", 0, "");
+}
+
+size_t RemoteWritableBlock::bytes_appended() const {
+ return _bytes_appended;
+}
+
+WritableBlock::State RemoteWritableBlock::state() const {
+ return _state;
+}
+
+Status RemoteWritableBlock::_close(SyncMode mode) {
+ return Status::IOError("invalid function", 0, "");
+}
+
+////////////////////////////////////////////////////////////
+// RemoteReadableBlock
+////////////////////////////////////////////////////////////
+
+// A file-backed block that has been opened for reading.
+//
+// There may be millions of instances of RemoteReadableBlock outstanding, so
+// great care must be taken to reduce its size. To that end, it does _not_
+// embed a FileBlockLocation, using the simpler BlockId instead.
+class RemoteReadableBlock : public ReadableBlock {
+public:
+ RemoteReadableBlock(RemoteBlockManager* block_manager, const FilePathDesc& path_desc,
+ std::shared_ptr<OpenedFileHandle<RandomAccessFile>> file_handle);
+
+ virtual ~RemoteReadableBlock();
+
+ virtual Status close() override;
+
+ virtual BlockManager* block_manager() const override;
+
+ virtual const BlockId& id() const override;
+ virtual const FilePathDesc& path_desc() const override;
+
+ virtual Status size(uint64_t* sz) const override;
+
+ virtual Status read(uint64_t offset, Slice result) const override;
+
+ virtual Status readv(uint64_t offset, const Slice* results, size_t res_cnt) const override;
+
+ void handle_error(const Status& s) const;
+
+private:
+ // Back pointer to the owning block manager.
+ RemoteBlockManager* _block_manager;
+
+ // The block's identifier.
+ const BlockId _block_id;
+ const FilePathDesc _path_desc;
+
+ // The underlying opened file backing this block.
+ std::shared_ptr<OpenedFileHandle<RandomAccessFile>> _file_handle;
+ // the backing file of OpenedFileHandle, not owned.
+ RandomAccessFile* _file;
+
+ // Whether or not this block has been closed. Close() is thread-safe, so
+ // this must be an atomic primitive.
+ std::atomic_bool _closed;
+
+ DISALLOW_COPY_AND_ASSIGN(RemoteReadableBlock);
+};
+
+RemoteReadableBlock::RemoteReadableBlock(
+ RemoteBlockManager* block_manager, const FilePathDesc& path_desc,
+ std::shared_ptr<OpenedFileHandle<RandomAccessFile>> file_handle) {
+}
+
+RemoteReadableBlock::~RemoteReadableBlock() {
+}
+
+Status RemoteReadableBlock::close() {
+ return Status::IOError("invalid function", 0, "");
+}
+
+BlockManager* RemoteReadableBlock::block_manager() const {
+ return _block_manager;
+}
+
+const BlockId& RemoteReadableBlock::id() const {
+ CHECK(false) << "Not support Block.id(). (TODO)";
+ return _block_id;
+}
+
+const FilePathDesc& RemoteReadableBlock::path_desc() const {
+ return _path_desc;
+}
+
+Status RemoteReadableBlock::size(uint64_t* sz) const {
+ return Status::IOError("invalid function", 0, "");
+}
+
+Status RemoteReadableBlock::read(uint64_t offset, Slice result) const {
+ return readv(offset, &result, 1);
+}
+
+Status RemoteReadableBlock::readv(uint64_t offset, const Slice* results, size_t res_cnt) const {
+ return Status::IOError("invalid function", 0, "");
+}
+
+} // namespace internal
+
+////////////////////////////////////////////////////////////
+// RemoteBlockManager
+////////////////////////////////////////////////////////////
+
+RemoteBlockManager::RemoteBlockManager(Env* local_env, RemoteEnv* remote_env,
+ const BlockManagerOptions& opts)
+ : _local_env(local_env), _remote_env(remote_env), _opts(opts) {
+}
+
+RemoteBlockManager::~RemoteBlockManager() {}
+
+Status RemoteBlockManager::open() {
+ return Status::NotSupported("to be implemented. (TODO)");
+}
+
+Status RemoteBlockManager::create_block(const CreateBlockOptions& opts,
+ std::unique_ptr<WritableBlock>* block) {
+ CHECK(!_opts.read_only);
+
+ shared_ptr<WritableFile> local_writer;
+ WritableFileOptions wr_opts;
+ wr_opts.mode = Env::MUST_CREATE;
+ RETURN_IF_ERROR(env_util::open_file_for_write(
+ wr_opts, Env::Default(), opts.path_desc.filepath, &local_writer));
+
+ VLOG_CRITICAL << "Creating new remote block. local: " << opts.path_desc.filepath
+ << ", remote: " << opts.path_desc.remote_path;
+ block->reset(new internal::RemoteWritableBlock(this, opts.path_desc, local_writer));
+ return Status::OK();
+}
+
+Status RemoteBlockManager::open_block(const FilePathDesc& path_desc, std::unique_ptr<ReadableBlock>* block) {
+ VLOG_CRITICAL << "Opening remote block. local: "
+ << path_desc.filepath << ", remote: " << path_desc.remote_path;
+ std::shared_ptr<OpenedFileHandle<RandomAccessFile>> file_handle;
+ if (Env::Default()->path_exists(path_desc.filepath).ok()) {
+ file_handle.reset(new OpenedFileHandle<RandomAccessFile>());
+ bool found = _file_cache->lookup(path_desc.filepath, file_handle.get());
+ if (!found) {
+ std::unique_ptr<RandomAccessFile> file;
+ RETURN_IF_ERROR(Env::Default()->new_random_access_file(path_desc.filepath, &file));
+ _file_cache->insert(path_desc.filepath, file.release(), file_handle.get());
+ }
+ }
+
+ block->reset(new internal::RemoteReadableBlock(this, path_desc, file_handle));
+ return Status::OK();
+}
+
+Status RemoteBlockManager::delete_block(const FilePathDesc& path_desc, bool is_dir) {
+ if (is_dir) {
+ if (_local_env->path_exists(path_desc.filepath).ok()) {
+ RETURN_IF_ERROR(_local_env->delete_dir(path_desc.filepath));
+ }
+ if (!path_desc.remote_path.empty()) {
+ RETURN_IF_ERROR(_remote_env->delete_dir(path_desc.remote_path));
+ }
+ } else {
+ if (_local_env->path_exists(path_desc.filepath).ok()) {
+ RETURN_IF_ERROR(_local_env->delete_file(path_desc.filepath));
+ }
+ if (_remote_env->path_exists(path_desc.remote_path).ok()) {
+ RETURN_IF_ERROR(_remote_env->delete_file(path_desc.remote_path));
+ }
+ }
+ return Status::OK();
+}
+
+Status RemoteBlockManager::link_file(const FilePathDesc& src_path_desc, const FilePathDesc& dest_path_desc) {
+ if (_local_env->path_exists(src_path_desc.filepath).ok()) {
+ RETURN_IF_ERROR(_local_env->link_file(src_path_desc.filepath, dest_path_desc.filepath));
+ }
+ if (_remote_env->path_exists(src_path_desc.remote_path).ok()) {
+ RETURN_IF_ERROR(_remote_env->link_file(src_path_desc.remote_path, dest_path_desc.remote_path));
+ }
+ return Status::OK();
+}
+
+} // namespace fs
+} // namespace doris
\ No newline at end of file
diff --git a/be/src/olap/fs/remote_block_manager.h b/be/src/olap/fs/remote_block_manager.h
new file mode 100644
index 0000000..56db5d9
--- /dev/null
+++ b/be/src/olap/fs/remote_block_manager.h
@@ -0,0 +1,70 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <cstdint>
+#include <memory>
+#include <string>
+#include <unordered_set>
+#include <vector>
+
+#include "common/status.h"
+#include "olap/fs/block_manager.h"
+#include "util/file_cache.h"
+
+namespace doris {
+
+class BlockId;
+class Env;
+class RemoteEnv;
+
+namespace fs {
+
+// The remote-backed block manager.
+class RemoteBlockManager : public BlockManager {
+public:
+ // Note: all objects passed as pointers should remain alive for the lifetime
+ // of the block manager.
+ RemoteBlockManager(Env* local_env, RemoteEnv* remote_env, const BlockManagerOptions& opts);
+ virtual ~RemoteBlockManager();
+
+ Status open() override;
+
+ Status create_block(const CreateBlockOptions& opts,
+ std::unique_ptr<WritableBlock>* block) override;
+ Status open_block(const FilePathDesc& path_desc, std::unique_ptr<ReadableBlock>* block) override;
+
+ Status get_all_block_ids(std::vector<BlockId>* block_ids) override {
+ // TODO(lingbin): to be implemented after we assign each block an id
+ return Status::OK();
+ };
+
+ Status delete_block(const FilePathDesc& path_desc, bool is_dir = false);
+
+ Status link_file(const FilePathDesc& src_path_desc, const FilePathDesc& dest_path_desc) override;
+
+private:
+ Env* _local_env;
+ RemoteEnv* _remote_env;
+ const BlockManagerOptions _opts;
+ // Underlying cache instance. Caches opened files.
+ std::unique_ptr<FileCache<RandomAccessFile>> _file_cache;
+};
+
+} // namespace fs
+} // namespace doris
\ No newline at end of file
diff --git a/be/src/olap/olap_common.h b/be/src/olap/olap_common.h
index 7fe970e..64802e5 100644
--- a/be/src/olap/olap_common.h
+++ b/be/src/olap/olap_common.h
@@ -31,6 +31,7 @@
#include <unordered_map>
#include <unordered_set>
+#include "env/env.h"
#include "gen_cpp/Types_types.h"
#include "olap/olap_define.h"
#include "util/hash_util.hpp"
@@ -52,7 +53,7 @@ typedef UniqueId TabletUid;
enum CompactionType { BASE_COMPACTION = 1, CUMULATIVE_COMPACTION = 2 };
struct DataDirInfo {
- std::string path;
+ FilePathDesc path_desc;
size_t path_hash = 0;
int64_t disk_capacity = 1; // actual disk capacity
int64_t available = 0; // 可用空间,单位字节
diff --git a/be/src/olap/olap_define.h b/be/src/olap/olap_define.h
index ea34452..c2d4b7f 100644
--- a/be/src/olap/olap_define.h
+++ b/be/src/olap/olap_define.h
@@ -79,6 +79,7 @@ enum OLAPDataVersion {
static const std::string MINI_PREFIX = "/mini_download";
static const std::string CLUSTER_ID_PREFIX = "/cluster_id";
static const std::string DATA_PREFIX = "/data";
+static const std::string TABLET_UID = "/tablet_uid";
static const std::string DPP_PREFIX = "/dpp_download";
static const std::string SNAPSHOT_PREFIX = "/snapshot";
static const std::string TRASH_PREFIX = "/trash";
diff --git a/be/src/olap/olap_server.cpp b/be/src/olap/olap_server.cpp
index ddff058..77243b4 100644
--- a/be/src/olap/olap_server.cpp
+++ b/be/src/olap/olap_server.cpp
@@ -100,6 +100,9 @@ Status StorageEngine::start_bg_threads() {
// path scan and gc thread
if (config::path_gc_check) {
for (auto data_dir : get_stores()) {
+ if (data_dir->is_remote()) {
+ continue;
+ }
scoped_refptr<Thread> path_scan_thread;
RETURN_IF_ERROR(Thread::create(
"StorageEngine", "path_scan_thread",
@@ -491,6 +494,9 @@ std::vector<TabletSharedPtr> StorageEngine::_generate_compaction_tasks(
? copied_cumu_map[data_dir]
: copied_base_map[data_dir],
&disk_max_score, _cumulative_compaction_policy);
+ if (data_dir->is_remote()) {
+ continue;
+ }
if (tablet != nullptr) {
if (need_pick_tablet) {
tablets_compaction.emplace_back(tablet);
diff --git a/be/src/olap/options.h b/be/src/olap/options.h
index fd6d9c9..65165de 100644
--- a/be/src/olap/options.h
+++ b/be/src/olap/options.h
@@ -35,6 +35,7 @@ struct StorePath {
std::string path;
int64_t capacity_bytes;
TStorageMedium::type storage_medium;
+ std::string remote_path;
};
// parse a single root path of storage_root_path
diff --git a/be/src/olap/push_handler.cpp b/be/src/olap/push_handler.cpp
index e75324f..f4ab2f9 100644
--- a/be/src/olap/push_handler.cpp
+++ b/be/src/olap/push_handler.cpp
@@ -228,7 +228,7 @@ OLAPStatus PushHandler::_convert_v2(TabletSharedPtr cur_tablet, TabletSharedPtr
if (cur_tablet->tablet_meta()->preferred_rowset_type() == BETA_ROWSET) {
context.rowset_type = BETA_ROWSET;
}
- context.rowset_path_prefix = cur_tablet->tablet_path();
+ context.path_desc = cur_tablet->tablet_path_desc();
context.tablet_schema = &(cur_tablet->tablet_schema());
context.rowset_state = PREPARED;
context.txn_id = _request.transaction_id;
@@ -410,7 +410,7 @@ OLAPStatus PushHandler::_convert(TabletSharedPtr cur_tablet, TabletSharedPtr new
if (cur_tablet->tablet_meta()->preferred_rowset_type() == BETA_ROWSET) {
context.rowset_type = BETA_ROWSET;
}
- context.rowset_path_prefix = cur_tablet->tablet_path();
+ context.path_desc = cur_tablet->tablet_path_desc();
context.tablet_schema = &(cur_tablet->tablet_schema());
context.rowset_state = PREPARED;
context.txn_id = _request.transaction_id;
diff --git a/be/src/olap/rowset/alpha_rowset.cpp b/be/src/olap/rowset/alpha_rowset.cpp
index 7a98d82..62ae5c9 100644
--- a/be/src/olap/rowset/alpha_rowset.cpp
+++ b/be/src/olap/rowset/alpha_rowset.cpp
@@ -26,9 +26,9 @@
namespace doris {
-AlphaRowset::AlphaRowset(const TabletSchema* schema, std::string rowset_path,
+AlphaRowset::AlphaRowset(const TabletSchema* schema, const FilePathDesc& rowset_path_desc,
RowsetMetaSharedPtr rowset_meta)
- : Rowset(schema, std::move(rowset_path), std::move(rowset_meta)) {}
+ : Rowset(schema, rowset_path_desc, std::move(rowset_meta)) {}
OLAPStatus AlphaRowset::do_load(bool use_cache) {
for (auto& segment_group : _segment_groups) {
@@ -98,9 +98,9 @@ void AlphaRowset::make_visible_extra(Version version, VersionHash version_hash)
}
}
-OLAPStatus AlphaRowset::link_files_to(const std::string& dir, RowsetId new_rowset_id) {
+OLAPStatus AlphaRowset::link_files_to(const FilePathDesc& dir_desc, RowsetId new_rowset_id) {
for (auto& segment_group : _segment_groups) {
- auto status = segment_group->link_segments_to_path(dir, new_rowset_id);
+ auto status = segment_group->link_segments_to_path(dir_desc.filepath, new_rowset_id);
if (status != OLAP_SUCCESS) {
LOG(WARNING) << "create hard links failed for segment group:"
<< segment_group->segment_group_id();
@@ -316,12 +316,12 @@ OLAPStatus AlphaRowset::init() {
std::shared_ptr<SegmentGroup> segment_group;
if (_is_pending) {
segment_group.reset(new SegmentGroup(
- _rowset_meta->tablet_id(), _rowset_meta->rowset_id(), _schema, _rowset_path,
+ _rowset_meta->tablet_id(), _rowset_meta->rowset_id(), _schema, _rowset_path_desc.filepath,
false, segment_group_meta.segment_group_id(), segment_group_meta.num_segments(),
true, _rowset_meta->partition_id(), _rowset_meta->txn_id()));
} else {
segment_group.reset(new SegmentGroup(
- _rowset_meta->tablet_id(), _rowset_meta->rowset_id(), _schema, _rowset_path,
+ _rowset_meta->tablet_id(), _rowset_meta->rowset_id(), _schema, _rowset_path_desc.filepath,
_rowset_meta->version(), _rowset_meta->version_hash(), false,
segment_group_meta.segment_group_id(), segment_group_meta.num_segments()));
}
diff --git a/be/src/olap/rowset/alpha_rowset.h b/be/src/olap/rowset/alpha_rowset.h
index 0fc3f71..84b17ad 100644
--- a/be/src/olap/rowset/alpha_rowset.h
+++ b/be/src/olap/rowset/alpha_rowset.h
@@ -51,7 +51,7 @@ public:
OLAPStatus remove() override;
- OLAPStatus link_files_to(const std::string& dir, RowsetId new_rowset_id) override;
+ OLAPStatus link_files_to(const FilePathDesc& dir_desc, RowsetId new_rowset_id) override;
OLAPStatus copy_files_to(const std::string& dir) override;
@@ -70,7 +70,7 @@ public:
protected:
friend class RowsetFactory;
- AlphaRowset(const TabletSchema* schema, std::string rowset_path,
+ AlphaRowset(const TabletSchema* schema, const FilePathDesc& rowset_path_desc,
RowsetMetaSharedPtr rowset_meta);
// init segment groups
diff --git a/be/src/olap/rowset/alpha_rowset_writer.cpp b/be/src/olap/rowset/alpha_rowset_writer.cpp
index 8cd75e7..58630a8 100644
--- a/be/src/olap/rowset/alpha_rowset_writer.cpp
+++ b/be/src/olap/rowset/alpha_rowset_writer.cpp
@@ -99,7 +99,7 @@ OLAPStatus AlphaRowsetWriter::add_rowset(RowsetSharedPtr rowset) {
for (auto& segment_group : alpha_rowset->_segment_groups) {
RETURN_NOT_OK(_init());
RETURN_NOT_OK(segment_group->link_segments_to_path(
- _rowset_writer_context.rowset_path_prefix, _rowset_writer_context.rowset_id));
+ _rowset_writer_context.path_desc.filepath, _rowset_writer_context.rowset_id));
_cur_segment_group->set_empty(segment_group->empty());
_cur_segment_group->set_num_segments(segment_group->num_segments());
_cur_segment_group->add_zone_maps(segment_group->get_zone_maps());
@@ -123,7 +123,7 @@ OLAPStatus AlphaRowsetWriter::add_rowset_for_linked_schema_change(
for (auto& segment_group : alpha_rowset->_segment_groups) {
RETURN_NOT_OK(_init());
RETURN_NOT_OK(segment_group->link_segments_to_path(
- _rowset_writer_context.rowset_path_prefix, _rowset_writer_context.rowset_id));
+ _rowset_writer_context.path_desc.filepath, _rowset_writer_context.rowset_id));
_cur_segment_group->set_empty(segment_group->empty());
_cur_segment_group->set_num_segments(segment_group->num_segments());
_cur_segment_group->add_zone_maps_for_linked_schema_change(segment_group->get_zone_maps(),
@@ -222,7 +222,7 @@ RowsetSharedPtr AlphaRowsetWriter::build() {
RowsetSharedPtr rowset;
auto status = RowsetFactory::create_rowset(_rowset_writer_context.tablet_schema,
- _rowset_writer_context.rowset_path_prefix,
+ _rowset_writer_context.path_desc,
_current_rowset_meta, &rowset);
if (status != OLAP_SUCCESS) {
LOG(WARNING) << "rowset init failed when build new rowset, res=" << status;
@@ -252,13 +252,13 @@ OLAPStatus AlphaRowsetWriter::_init() {
if (_is_pending_rowset) {
_cur_segment_group = new (std::nothrow) SegmentGroup(
_rowset_writer_context.tablet_id, _rowset_writer_context.rowset_id,
- _rowset_writer_context.tablet_schema, _rowset_writer_context.rowset_path_prefix,
+ _rowset_writer_context.tablet_schema, _rowset_writer_context.path_desc.filepath,
false, _segment_group_id, 0, true, _rowset_writer_context.partition_id,
_rowset_writer_context.txn_id);
} else {
_cur_segment_group = new (std::nothrow) SegmentGroup(
_rowset_writer_context.tablet_id, _rowset_writer_context.rowset_id,
- _rowset_writer_context.tablet_schema, _rowset_writer_context.rowset_path_prefix,
+ _rowset_writer_context.tablet_schema, _rowset_writer_context.path_desc.filepath,
_rowset_writer_context.version, _rowset_writer_context.version_hash, false,
_segment_group_id, 0);
}
diff --git a/be/src/olap/rowset/beta_rowset.cpp b/be/src/olap/rowset/beta_rowset.cpp
index 8e228b4..99ce9e5 100644
--- a/be/src/olap/rowset/beta_rowset.cpp
+++ b/be/src/olap/rowset/beta_rowset.cpp
@@ -23,20 +23,23 @@
#include <set>
+#include "env/env_remote.h"
#include "gutil/strings/substitute.h"
#include "olap/rowset/beta_rowset_reader.h"
#include "olap/utils.h"
namespace doris {
-std::string BetaRowset::segment_file_path(const std::string& dir, const RowsetId& rowset_id,
+FilePathDesc BetaRowset::segment_file_path(const FilePathDesc& segment_dir_desc, const RowsetId& rowset_id,
int segment_id) {
- return strings::Substitute("$0/$1_$2.dat", dir, rowset_id.to_string(), segment_id);
+ FilePathDescStream path_desc_s;
+ path_desc_s << segment_dir_desc << "/" << rowset_id.to_string() << "_" << segment_id << ".dat";
+ return path_desc_s.path_desc();
}
-BetaRowset::BetaRowset(const TabletSchema* schema, string rowset_path,
+BetaRowset::BetaRowset(const TabletSchema* schema, const FilePathDesc& rowset_path_desc,
RowsetMetaSharedPtr rowset_meta)
- : Rowset(schema, std::move(rowset_path), std::move(rowset_meta)) {}
+ : Rowset(schema, rowset_path_desc, std::move(rowset_meta)) {}
BetaRowset::~BetaRowset() {}
@@ -52,12 +55,12 @@ OLAPStatus BetaRowset::do_load(bool /*use_cache*/) {
OLAPStatus BetaRowset::load_segments(std::vector<segment_v2::SegmentSharedPtr>* segments) {
for (int seg_id = 0; seg_id < num_segments(); ++seg_id) {
- std::string seg_path = segment_file_path(_rowset_path, rowset_id(), seg_id);
+ FilePathDesc seg_path_desc = segment_file_path(_rowset_path_desc, rowset_id(), seg_id);
std::shared_ptr<segment_v2::Segment> segment;
- auto s = segment_v2::Segment::open(seg_path, seg_id, _schema, &segment);
+ auto s = segment_v2::Segment::open(seg_path_desc, seg_id, _schema, &segment);
if (!s.ok()) {
- LOG(WARNING) << "failed to open segment " << seg_path << " under rowset " << unique_id()
- << " : " << s.to_string();
+ LOG(WARNING) << "failed to open segment. " << seg_path_desc.debug_string()
+ << " under rowset " << unique_id() << " : " << s.to_string();
return OLAP_ERR_ROWSET_LOAD_FAILED;
}
segments->push_back(std::move(segment));
@@ -94,13 +97,13 @@ OLAPStatus BetaRowset::remove() {
<< ", tabletid:" << _rowset_meta->tablet_id();
bool success = true;
for (int i = 0; i < num_segments(); ++i) {
- std::string path = segment_file_path(_rowset_path, rowset_id(), i);
- LOG(INFO) << "deleting " << path;
- // TODO(lingbin): use Env API
- if (::remove(path.c_str()) != 0) {
+ FilePathDesc path_desc = segment_file_path(_rowset_path_desc, rowset_id(), i);
+ LOG(INFO) << "deleting " << path_desc.debug_string();
+ fs::BlockManager* block_mgr = fs::fs_util::block_manager(path_desc.storage_medium);
+ if (!block_mgr->delete_block(path_desc).ok()) {
char errmsg[64];
LOG(WARNING) << "failed to delete file. err=" << strerror_r(errno, errmsg, 64)
- << ", path=" << path;
+ << ", " << path_desc.debug_string();
success = false;
}
}
@@ -115,21 +118,21 @@ void BetaRowset::do_close() {
// do nothing.
}
-OLAPStatus BetaRowset::link_files_to(const std::string& dir, RowsetId new_rowset_id) {
+OLAPStatus BetaRowset::link_files_to(const FilePathDesc& dir_desc, RowsetId new_rowset_id) {
for (int i = 0; i < num_segments(); ++i) {
- std::string dst_link_path = segment_file_path(dir, new_rowset_id, i);
+ FilePathDesc dst_link_path_desc = segment_file_path(dir_desc, new_rowset_id, i);
// TODO(lingbin): use Env API? or EnvUtil?
- if (FileUtils::check_exist(dst_link_path)) {
- LOG(WARNING) << "failed to create hard link, file already exist: " << dst_link_path;
+ if (FileUtils::check_exist(dst_link_path_desc.filepath)) {
+ LOG(WARNING) << "failed to create hard link, file already exist: " << dst_link_path_desc.filepath;
return OLAP_ERR_FILE_ALREADY_EXIST;
}
- std::string src_file_path = segment_file_path(_rowset_path, rowset_id(), i);
+ FilePathDesc src_file_path_desc = segment_file_path(_rowset_path_desc, rowset_id(), i);
// TODO(lingbin): how external storage support link?
// use copy? or keep refcount to avoid being delete?
- if (link(src_file_path.c_str(), dst_link_path.c_str()) != 0) {
- LOG(WARNING) << "fail to create hard link. from=" << src_file_path << ", "
- << "to=" << dst_link_path << ", "
- << "errno=" << Errno::no();
+ fs::BlockManager* block_mgr = fs::fs_util::block_manager(dir_desc.storage_medium);
+ if (!block_mgr->link_file(src_file_path_desc, dst_link_path_desc).ok()) {
+ LOG(WARNING) << "fail to create hard link. from=" << src_file_path_desc.debug_string() << ", "
+ << "to=" << dst_link_path_desc.debug_string() << ", errno=" << Errno::no();
return OLAP_ERR_OS_ERROR;
}
}
@@ -137,18 +140,52 @@ OLAPStatus BetaRowset::link_files_to(const std::string& dir, RowsetId new_rowset
}
OLAPStatus BetaRowset::copy_files_to(const std::string& dir) {
+ Env* env = Env::get_env(_rowset_path_desc.storage_medium);
for (int i = 0; i < num_segments(); ++i) {
- std::string dst_path = segment_file_path(dir, rowset_id(), i);
- if (FileUtils::check_exist(dst_path)) {
- LOG(WARNING) << "file already exist: " << dst_path;
+ FilePathDesc dst_path_desc = segment_file_path(dir, rowset_id(), i);
+ Status status = env->path_exists(dst_path_desc.filepath);
+ if (status.ok()) {
+ LOG(WARNING) << "file already exist: " << dst_path_desc.filepath;
return OLAP_ERR_FILE_ALREADY_EXIST;
}
- std::string src_path = segment_file_path(_rowset_path, rowset_id(), i);
- if (copy_file(src_path, dst_path) != OLAP_SUCCESS) {
- LOG(WARNING) << "fail to copy file. from=" << src_path << ", to=" << dst_path
- << ", errno=" << Errno::no();
+ if (!status.is_not_found()) {
+ LOG(WARNING) << "file check exist error: " << dst_path_desc.filepath;
return OLAP_ERR_OS_ERROR;
}
+ FilePathDesc src_path_desc = segment_file_path(_rowset_path_desc, rowset_id(), i);
+ if (!Env::get_env(_rowset_path_desc.storage_medium)->copy_path(
+ src_path_desc.filepath, dst_path_desc.filepath).ok()) {
+ LOG(WARNING) << "fail to copy file. from=" << src_path_desc.filepath << ", to="
+ << dst_path_desc.filepath << ", errno=" << Errno::no();
+ return OLAP_ERR_OS_ERROR;
+ }
+ }
+ return OLAP_SUCCESS;
+}
+
+OLAPStatus BetaRowset::upload_files_to(const FilePathDesc& dir_desc) {
+ RemoteEnv* dest_env = dynamic_cast<RemoteEnv*>(Env::get_env(_rowset_path_desc.storage_medium));
+ std::shared_ptr<StorageBackend> storage_backend = dest_env->get_storage_backend();
+ for (int i = 0; i < num_segments(); ++i) {
+ FilePathDesc dst_path_desc = segment_file_path(dir_desc, rowset_id(), i);
+ Status status = storage_backend->exist(dst_path_desc.remote_path);
+ if (status.ok()) {
+ LOG(WARNING) << "file already exist: " << dst_path_desc.remote_path;
+ return OLAP_ERR_FILE_ALREADY_EXIST;
+ }
+ if (!status.is_not_found()) {
+ LOG(WARNING) << "file check exist error: " << dst_path_desc.remote_path;
+ return OLAP_ERR_OS_ERROR;
+ }
+ FilePathDesc src_path_desc = segment_file_path(_rowset_path_desc, rowset_id(), i);
+
+ if (!storage_backend->upload(src_path_desc.filepath, dst_path_desc.remote_path).ok()) {
+ LOG(WARNING) << "fail to upload file. from=" << src_path_desc.filepath << ", to="
+ << dst_path_desc.remote_path << ", errno=" << Errno::no();
+ return OLAP_ERR_OS_ERROR;
+ }
+ LOG(INFO) << "succeed to upload file. from " << src_path_desc.filepath << " to "
+ << dst_path_desc.remote_path;
}
return OLAP_SUCCESS;
}
@@ -156,16 +193,19 @@ OLAPStatus BetaRowset::copy_files_to(const std::string& dir) {
bool BetaRowset::check_path(const std::string& path) {
std::set<std::string> valid_paths;
for (int i = 0; i < num_segments(); ++i) {
- valid_paths.insert(segment_file_path(_rowset_path, rowset_id(), i));
+ FilePathDesc path_desc = segment_file_path(_rowset_path_desc, rowset_id(), i);
+ valid_paths.insert(path_desc.filepath);
}
return valid_paths.find(path) != valid_paths.end();
}
bool BetaRowset::check_file_exist() {
+ Env* env = Env::get_env(_rowset_path_desc.storage_medium);
for (int i = 0; i < num_segments(); ++i) {
- std::string data_file = segment_file_path(_rowset_path, rowset_id(), i);
- if (!FileUtils::check_exist(data_file)) {
- LOG(WARNING) << "data file not existed: " << data_file << " for rowset_id: " << rowset_id();
+ FilePathDesc path_desc = segment_file_path(_rowset_path_desc, rowset_id(), i);
+ if (!env->path_exists(path_desc.filepath).ok()) {
+ LOG(WARNING) << "data file not existed: " << path_desc.filepath
+ << " for rowset_id: " << rowset_id();
return false;
}
}
diff --git a/be/src/olap/rowset/beta_rowset.h b/be/src/olap/rowset/beta_rowset.h
index 9db69bf..5030b29 100644
--- a/be/src/olap/rowset/beta_rowset.h
+++ b/be/src/olap/rowset/beta_rowset.h
@@ -42,7 +42,7 @@ public:
OLAPStatus create_reader(const std::shared_ptr<MemTracker>& parent_tracker,
std::shared_ptr<RowsetReader>* result) override;
- static std::string segment_file_path(const std::string& segment_dir, const RowsetId& rowset_id,
+ static FilePathDesc segment_file_path(const FilePathDesc& segment_dir_desc, const RowsetId& rowset_id,
int segment_id);
OLAPStatus split_range(const RowCursor& start_key, const RowCursor& end_key,
@@ -51,10 +51,12 @@ public:
OLAPStatus remove() override;
- OLAPStatus link_files_to(const std::string& dir, RowsetId new_rowset_id) override;
+ OLAPStatus link_files_to(const FilePathDesc& dir_desc, RowsetId new_rowset_id) override;
OLAPStatus copy_files_to(const std::string& dir) override;
+ OLAPStatus upload_files_to(const FilePathDesc& dir_desc) override;
+
// only applicable to alpha rowset, no op here
OLAPStatus remove_old_files(std::vector<std::string>* files_to_remove) override {
return OLAP_SUCCESS;
@@ -67,7 +69,7 @@ public:
OLAPStatus load_segments(std::vector<segment_v2::SegmentSharedPtr>* segments);
protected:
- BetaRowset(const TabletSchema* schema, std::string rowset_path,
+ BetaRowset(const TabletSchema* schema, const FilePathDesc& rowset_path_desc,
RowsetMetaSharedPtr rowset_meta);
// init segment groups
diff --git a/be/src/olap/rowset/beta_rowset_writer.cpp b/be/src/olap/rowset/beta_rowset_writer.cpp
index 3b15565..d2d317f 100644
--- a/be/src/olap/rowset/beta_rowset_writer.cpp
+++ b/be/src/olap/rowset/beta_rowset_writer.cpp
@@ -53,14 +53,15 @@ BetaRowsetWriter::~BetaRowsetWriter() {
if (!_already_built) { // abnormal exit, remove all files generated
_segment_writer.reset(); // ensure all files are closed
Status st;
+ Env* env = Env::get_env(_context.path_desc.storage_medium);
for (int i = 0; i < _num_segment; ++i) {
- auto path = BetaRowset::segment_file_path(_context.rowset_path_prefix,
+ auto path_desc = BetaRowset::segment_file_path(_context.path_desc,
_context.rowset_id, i);
// Even if an error is encountered, these files that have not been cleaned up
// will be cleaned up by the GC background. So here we only print the error
// message when we encounter an error.
- WARN_IF_ERROR(Env::Default()->delete_file(path),
- strings::Substitute("Failed to delete file=$0", path));
+ WARN_IF_ERROR(env->delete_file(path_desc.filepath),
+ strings::Substitute("Failed to delete file=$0", path_desc.filepath));
}
}
}
@@ -112,7 +113,7 @@ template OLAPStatus BetaRowsetWriter::_add_row(const ContiguousRow& row);
OLAPStatus BetaRowsetWriter::add_rowset(RowsetSharedPtr rowset) {
assert(rowset->rowset_meta()->rowset_type() == BETA_ROWSET);
- RETURN_NOT_OK(rowset->link_files_to(_context.rowset_path_prefix, _context.rowset_id));
+ RETURN_NOT_OK(rowset->link_files_to(_context.path_desc, _context.rowset_id));
_num_rows_written += rowset->num_rows();
_total_data_size += rowset->rowset_meta()->data_disk_size();
_total_index_size += rowset->rowset_meta()->index_disk_size();
@@ -196,7 +197,7 @@ RowsetSharedPtr BetaRowsetWriter::build() {
}
RowsetSharedPtr rowset;
- auto status = RowsetFactory::create_rowset(_context.tablet_schema, _context.rowset_path_prefix,
+ auto status = RowsetFactory::create_rowset(_context.tablet_schema, _context.path_desc,
_rowset_meta, &rowset);
if (status != OLAP_SUCCESS) {
LOG(WARNING) << "rowset init failed when build new rowset, res=" << status;
@@ -207,24 +208,24 @@ RowsetSharedPtr BetaRowsetWriter::build() {
}
OLAPStatus BetaRowsetWriter::_create_segment_writer(std::unique_ptr<segment_v2::SegmentWriter>* writer) {
- auto path = BetaRowset::segment_file_path(_context.rowset_path_prefix, _context.rowset_id,
+ auto path_desc = BetaRowset::segment_file_path(_context.path_desc, _context.rowset_id,
_num_segment++);
// TODO(lingbin): should use a more general way to get BlockManager object
// and tablets with the same type should share one BlockManager object;
- fs::BlockManager* block_mgr = fs::fs_util::block_manager();
+ fs::BlockManager* block_mgr = fs::fs_util::block_manager(_context.path_desc.storage_medium);
std::unique_ptr<fs::WritableBlock> wblock;
- fs::CreateBlockOptions opts({path});
+ fs::CreateBlockOptions opts(path_desc);
DCHECK(block_mgr != nullptr);
Status st = block_mgr->create_block(opts, &wblock);
if (!st.ok()) {
- LOG(WARNING) << "failed to create writable block. path=" << path;
+ LOG(WARNING) << "failed to create writable block. path=" << path_desc.filepath;
return OLAP_ERR_INIT_FAILED;
}
DCHECK(wblock != nullptr);
segment_v2::SegmentWriterOptions writer_options;
writer->reset(new segment_v2::SegmentWriter(wblock.get(), _num_segment,
- _context.tablet_schema, writer_options, _context.parent_mem_tracker));
+ _context.tablet_schema, writer_options, _context.parent_mem_tracker));
{
std::lock_guard<SpinLock> l(_lock);
_wblocks.push_back(std::move(wblock));
diff --git a/be/src/olap/rowset/rowset.cpp b/be/src/olap/rowset/rowset.cpp
index 385657b..79f7371 100644
--- a/be/src/olap/rowset/rowset.cpp
+++ b/be/src/olap/rowset/rowset.cpp
@@ -21,9 +21,9 @@
namespace doris {
-Rowset::Rowset(const TabletSchema* schema, std::string rowset_path, RowsetMetaSharedPtr rowset_meta)
+Rowset::Rowset(const TabletSchema* schema, const FilePathDesc& rowset_path_desc, RowsetMetaSharedPtr rowset_meta)
: _schema(schema),
- _rowset_path(std::move(rowset_path)),
+ _rowset_path_desc(rowset_path_desc),
_rowset_meta(std::move(rowset_meta)),
_refs_by_reader(0),
_rowset_state_machine(RowsetStateMachine()) {
diff --git a/be/src/olap/rowset/rowset.h b/be/src/olap/rowset/rowset.h
index 19bdf7b..1983a36 100644
--- a/be/src/olap/rowset/rowset.h
+++ b/be/src/olap/rowset/rowset.h
@@ -23,6 +23,7 @@
#include <mutex>
#include <vector>
+#include "env/env.h"
#include "gen_cpp/olap_file.pb.h"
#include "gutil/macros.h"
#include "olap/rowset/rowset_meta.h"
@@ -198,11 +199,13 @@ public:
}
// hard link all files in this rowset to `dir` to form a new rowset with id `new_rowset_id`.
- virtual OLAPStatus link_files_to(const std::string& dir, RowsetId new_rowset_id) = 0;
+ virtual OLAPStatus link_files_to(const FilePathDesc& dir_desc, RowsetId new_rowset_id) = 0;
// copy all files to `dir`
virtual OLAPStatus copy_files_to(const std::string& dir) = 0;
+ virtual OLAPStatus upload_files_to(const FilePathDesc& dir_desc) { return OLAP_SUCCESS; }
+
virtual OLAPStatus remove_old_files(std::vector<std::string>* files_to_remove) = 0;
// return whether `path` is one of the files in this rowset
@@ -211,7 +214,7 @@ public:
virtual bool check_file_exist() = 0;
// return an unique identifier string for this rowset
- std::string unique_id() const { return _rowset_path + "/" + rowset_id().to_string(); }
+ std::string unique_id() const { return _rowset_path_desc.filepath + "/" + rowset_id().to_string(); }
bool need_delete_file() const { return _need_delete_file; }
@@ -254,7 +257,7 @@ protected:
DISALLOW_COPY_AND_ASSIGN(Rowset);
// this is non-public because all clients should use RowsetFactory to obtain pointer to initialized Rowset
- Rowset(const TabletSchema* schema, std::string rowset_path, RowsetMetaSharedPtr rowset_meta);
+ Rowset(const TabletSchema* schema, const FilePathDesc& rowset_path_desc, RowsetMetaSharedPtr rowset_meta);
// this is non-public because all clients should use RowsetFactory to obtain pointer to initialized Rowset
virtual OLAPStatus init() = 0;
@@ -269,7 +272,7 @@ protected:
virtual void make_visible_extra(Version version, VersionHash version_hash) {}
const TabletSchema* _schema;
- std::string _rowset_path;
+ FilePathDesc _rowset_path_desc;
RowsetMetaSharedPtr _rowset_meta;
// init in constructor
bool _is_pending; // rowset is pending iff it's not in visible state
diff --git a/be/src/olap/rowset/rowset_converter.cpp b/be/src/olap/rowset/rowset_converter.cpp
index ce20abe..d6d3a56 100644
--- a/be/src/olap/rowset/rowset_converter.cpp
+++ b/be/src/olap/rowset/rowset_converter.cpp
@@ -24,19 +24,19 @@
namespace doris {
OLAPStatus RowsetConverter::convert_beta_to_alpha(const RowsetMetaSharedPtr& src_rowset_meta,
- const std::string& rowset_path,
+ const FilePathDesc& rowset_path_desc,
RowsetMetaPB* dst_rs_meta_pb) {
- return _convert_rowset(src_rowset_meta, rowset_path, ALPHA_ROWSET, dst_rs_meta_pb);
+ return _convert_rowset(src_rowset_meta, rowset_path_desc, ALPHA_ROWSET, dst_rs_meta_pb);
}
OLAPStatus RowsetConverter::convert_alpha_to_beta(const RowsetMetaSharedPtr& src_rowset_meta,
- const std::string& rowset_path,
+ const FilePathDesc& rowset_path_desc,
RowsetMetaPB* dst_rs_meta_pb) {
- return _convert_rowset(src_rowset_meta, rowset_path, BETA_ROWSET, dst_rs_meta_pb);
+ return _convert_rowset(src_rowset_meta, rowset_path_desc, BETA_ROWSET, dst_rs_meta_pb);
}
OLAPStatus RowsetConverter::_convert_rowset(const RowsetMetaSharedPtr& src_rowset_meta,
- const std::string& rowset_path, RowsetTypePB dst_type,
+ const FilePathDesc& rowset_path_desc, RowsetTypePB dst_type,
RowsetMetaPB* dst_rs_meta_pb) {
const TabletSchema& tablet_schema = _tablet_meta->tablet_schema();
RowsetWriterContext context;
@@ -46,7 +46,7 @@ OLAPStatus RowsetConverter::_convert_rowset(const RowsetMetaSharedPtr& src_rowse
context.partition_id = _tablet_meta->partition_id();
context.tablet_schema_hash = _tablet_meta->schema_hash();
context.rowset_type = dst_type;
- context.rowset_path_prefix = rowset_path;
+ context.path_desc = rowset_path_desc;
context.tablet_schema = &tablet_schema;
context.rowset_state = src_rowset_meta->rowset_state();
context.segments_overlap = src_rowset_meta->segments_overlap();
@@ -61,7 +61,7 @@ OLAPStatus RowsetConverter::_convert_rowset(const RowsetMetaSharedPtr& src_rowse
RETURN_NOT_OK(RowsetFactory::create_rowset_writer(context, &rowset_writer));
if (!src_rowset_meta->empty()) {
RowsetSharedPtr rowset;
- RETURN_NOT_OK(RowsetFactory::create_rowset(&tablet_schema, rowset_path, src_rowset_meta,
+ RETURN_NOT_OK(RowsetFactory::create_rowset(&tablet_schema, rowset_path_desc, src_rowset_meta,
&rowset));
RowsetReaderSharedPtr rowset_reader;
RETURN_NOT_OK(rowset->create_reader(&rowset_reader));
diff --git a/be/src/olap/rowset/rowset_converter.h b/be/src/olap/rowset/rowset_converter.h
index 90c45f1..4faa284 100644
--- a/be/src/olap/rowset/rowset_converter.h
+++ b/be/src/olap/rowset/rowset_converter.h
@@ -41,15 +41,15 @@ public:
RowsetConverter(const TabletMetaSharedPtr& tablet_meta) : _tablet_meta(tablet_meta) {}
OLAPStatus convert_beta_to_alpha(const RowsetMetaSharedPtr& src_rowset_meta,
- const std::string& rowset_path, RowsetMetaPB* dst_rs_meta_pb);
+ const FilePathDesc& rowset_path_desc, RowsetMetaPB* dst_rs_meta_pb);
OLAPStatus convert_alpha_to_beta(const RowsetMetaSharedPtr& src_rowset_meta,
- const std::string& rowset_path, RowsetMetaPB* dst_rs_meta_pb);
+ const FilePathDesc& rowset_path_desc, RowsetMetaPB* dst_rs_meta_pb);
private:
OLAPStatus _convert_rowset(const RowsetMetaSharedPtr& src_rowset_meta,
- const std::string& rowset_path, RowsetTypePB dst_type,
- RowsetMetaPB* dst_rs_meta_pb);
+ const FilePathDesc& rowset_path_desc,
+ RowsetTypePB dst_type, RowsetMetaPB* dst_rs_meta_pb);
private:
TabletMetaSharedPtr _tablet_meta;
diff --git a/be/src/olap/rowset/rowset_factory.cpp b/be/src/olap/rowset/rowset_factory.cpp
index df493b6..915c56c 100644
--- a/be/src/olap/rowset/rowset_factory.cpp
+++ b/be/src/olap/rowset/rowset_factory.cpp
@@ -28,14 +28,14 @@
namespace doris {
-OLAPStatus RowsetFactory::create_rowset(const TabletSchema* schema, const std::string& rowset_path,
+OLAPStatus RowsetFactory::create_rowset(const TabletSchema* schema, const FilePathDesc& rowset_path_desc,
RowsetMetaSharedPtr rowset_meta, RowsetSharedPtr* rowset) {
if (rowset_meta->rowset_type() == ALPHA_ROWSET) {
- rowset->reset(new AlphaRowset(schema, rowset_path, rowset_meta));
+ rowset->reset(new AlphaRowset(schema, rowset_path_desc, rowset_meta));
return (*rowset)->init();
}
if (rowset_meta->rowset_type() == BETA_ROWSET) {
- rowset->reset(new BetaRowset(schema, rowset_path, rowset_meta));
+ rowset->reset(new BetaRowset(schema, rowset_path_desc, rowset_meta));
return (*rowset)->init();
}
return OLAP_ERR_ROWSET_TYPE_NOT_FOUND; // should never happen
diff --git a/be/src/olap/rowset/rowset_factory.h b/be/src/olap/rowset/rowset_factory.h
index b259c30..2d1382e 100644
--- a/be/src/olap/rowset/rowset_factory.h
+++ b/be/src/olap/rowset/rowset_factory.h
@@ -31,7 +31,7 @@ class RowsetFactory {
public:
// return OLAP_SUCCESS and set inited rowset in `*rowset`.
// return others if failed to create or init rowset.
- static OLAPStatus create_rowset(const TabletSchema* schema, const std::string& rowset_path,
+ static OLAPStatus create_rowset(const TabletSchema* schema, const FilePathDesc& rowset_path_desc,
RowsetMetaSharedPtr rowset_meta, RowsetSharedPtr* rowset);
// create and init rowset writer.
diff --git a/be/src/olap/rowset/rowset_writer_context.h b/be/src/olap/rowset/rowset_writer_context.h
index d23a607..a0ad273 100644
--- a/be/src/olap/rowset/rowset_writer_context.h
+++ b/be/src/olap/rowset/rowset_writer_context.h
@@ -33,7 +33,6 @@ struct RowsetWriterContext {
tablet_schema_hash(0),
partition_id(0),
rowset_type(ALPHA_ROWSET),
- rowset_path_prefix(""),
tablet_schema(nullptr),
rowset_state(PREPARED),
version(Version(0, 0)),
@@ -49,7 +48,7 @@ struct RowsetWriterContext {
int64_t tablet_schema_hash;
int64_t partition_id;
RowsetTypePB rowset_type;
- std::string rowset_path_prefix;
+ FilePathDesc path_desc;
const TabletSchema* tablet_schema;
// PREPARED/COMMITTED for pending rowset
// VISIBLE for non-pending rowset
diff --git a/be/src/olap/rowset/segment_group.cpp b/be/src/olap/rowset/segment_group.cpp
index 59ce460..d0e3588 100644
--- a/be/src/olap/rowset/segment_group.cpp
+++ b/be/src/olap/rowset/segment_group.cpp
@@ -765,7 +765,7 @@ OLAPStatus SegmentGroup::copy_files_to(const std::string& dir) {
return OLAP_ERR_FILE_ALREADY_EXIST;
}
std::string data_file_to_copy = construct_data_file_path(segment_id);
- if (copy_file(data_file_to_copy, dest_data_file) != OLAP_SUCCESS) {
+ if (!FileUtils::copy_file(data_file_to_copy, dest_data_file).ok()) {
LOG(WARNING) << "fail to copy data file. from=" << data_file_to_copy
<< ", to=" << dest_data_file << ", errno=" << Errno::no();
return OLAP_ERR_OS_ERROR;
@@ -776,7 +776,7 @@ OLAPStatus SegmentGroup::copy_files_to(const std::string& dir) {
return OLAP_ERR_FILE_ALREADY_EXIST;
}
std::string index_file_to_copy = construct_index_file_path(segment_id);
- if (copy_file(index_file_to_copy, dest_index_file) != OLAP_SUCCESS) {
+ if (!FileUtils::copy_file(index_file_to_copy, dest_index_file).ok()) {
LOG(WARNING) << "fail to copy index file. from=" << index_file_to_copy
<< ", to=" << dest_index_file << ", errno=" << Errno::no();
return OLAP_ERR_OS_ERROR;
diff --git a/be/src/olap/rowset/segment_v2/bitmap_index_reader.cpp b/be/src/olap/rowset/segment_v2/bitmap_index_reader.cpp
index 43b5d68..d3603e9 100644
--- a/be/src/olap/rowset/segment_v2/bitmap_index_reader.cpp
+++ b/be/src/olap/rowset/segment_v2/bitmap_index_reader.cpp
@@ -27,8 +27,8 @@ Status BitmapIndexReader::load(bool use_page_cache, bool kept_in_memory) {
const IndexedColumnMetaPB& bitmap_meta = _bitmap_index_meta->bitmap_column();
_has_null = _bitmap_index_meta->has_null();
- _dict_column_reader.reset(new IndexedColumnReader(_file_name, dict_meta));
- _bitmap_column_reader.reset(new IndexedColumnReader(_file_name, bitmap_meta));
+ _dict_column_reader.reset(new IndexedColumnReader(_path_desc, dict_meta));
+ _bitmap_column_reader.reset(new IndexedColumnReader(_path_desc, bitmap_meta));
RETURN_IF_ERROR(_dict_column_reader->load(use_page_cache, kept_in_memory));
RETURN_IF_ERROR(_bitmap_column_reader->load(use_page_cache, kept_in_memory));
return Status::OK();
diff --git a/be/src/olap/rowset/segment_v2/bitmap_index_reader.h b/be/src/olap/rowset/segment_v2/bitmap_index_reader.h
index 7e42ae9..4313169 100644
--- a/be/src/olap/rowset/segment_v2/bitmap_index_reader.h
+++ b/be/src/olap/rowset/segment_v2/bitmap_index_reader.h
@@ -39,8 +39,8 @@ class IndexedColumnIterator;
class BitmapIndexReader {
public:
- explicit BitmapIndexReader(const std::string& file_name, const BitmapIndexPB* bitmap_index_meta)
- : _file_name(file_name), _bitmap_index_meta(bitmap_index_meta) {
+ explicit BitmapIndexReader(const FilePathDesc& path_desc, const BitmapIndexPB* bitmap_index_meta)
+ : _path_desc(path_desc), _bitmap_index_meta(bitmap_index_meta) {
_typeinfo = get_type_info(OLAP_FIELD_TYPE_VARCHAR);
}
@@ -56,7 +56,7 @@ public:
private:
friend class BitmapIndexIterator;
- std::string _file_name;
+ FilePathDesc _path_desc;
const TypeInfo* _typeinfo;
const BitmapIndexPB* _bitmap_index_meta;
bool _has_null = false;
diff --git a/be/src/olap/rowset/segment_v2/bloom_filter_index_reader.cpp b/be/src/olap/rowset/segment_v2/bloom_filter_index_reader.cpp
index 4f4fd91..d35e2be 100644
--- a/be/src/olap/rowset/segment_v2/bloom_filter_index_reader.cpp
+++ b/be/src/olap/rowset/segment_v2/bloom_filter_index_reader.cpp
@@ -26,7 +26,7 @@ namespace segment_v2 {
Status BloomFilterIndexReader::load(bool use_page_cache, bool kept_in_memory) {
const IndexedColumnMetaPB& bf_index_meta = _bloom_filter_index_meta->bloom_filter();
- _bloom_filter_reader.reset(new IndexedColumnReader(_file_name, bf_index_meta));
+ _bloom_filter_reader.reset(new IndexedColumnReader(_path_desc, bf_index_meta));
RETURN_IF_ERROR(_bloom_filter_reader->load(use_page_cache, kept_in_memory));
return Status::OK();
}
diff --git a/be/src/olap/rowset/segment_v2/bloom_filter_index_reader.h b/be/src/olap/rowset/segment_v2/bloom_filter_index_reader.h
index a46316d..bb9377e 100644
--- a/be/src/olap/rowset/segment_v2/bloom_filter_index_reader.h
+++ b/be/src/olap/rowset/segment_v2/bloom_filter_index_reader.h
@@ -42,9 +42,9 @@ class BloomFilter;
class BloomFilterIndexReader {
public:
- explicit BloomFilterIndexReader(const std::string& file_name,
+ explicit BloomFilterIndexReader(const FilePathDesc& path_desc,
const BloomFilterIndexPB* bloom_filter_index_meta)
- : _file_name(file_name), _bloom_filter_index_meta(bloom_filter_index_meta) {
+ : _path_desc(path_desc), _bloom_filter_index_meta(bloom_filter_index_meta) {
_typeinfo = get_type_info(OLAP_FIELD_TYPE_VARCHAR);
}
@@ -58,7 +58,7 @@ public:
private:
friend class BloomFilterIndexIterator;
- std::string _file_name;
+ FilePathDesc _path_desc;
const TypeInfo* _typeinfo;
const BloomFilterIndexPB* _bloom_filter_index_meta;
std::unique_ptr<IndexedColumnReader> _bloom_filter_reader;
diff --git a/be/src/olap/rowset/segment_v2/column_reader.cpp b/be/src/olap/rowset/segment_v2/column_reader.cpp
index 3d78ceb..3bbbeab 100644
--- a/be/src/olap/rowset/segment_v2/column_reader.cpp
+++ b/be/src/olap/rowset/segment_v2/column_reader.cpp
@@ -37,11 +37,11 @@ namespace segment_v2 {
using strings::Substitute;
Status ColumnReader::create(const ColumnReaderOptions& opts, const ColumnMetaPB& meta,
- uint64_t num_rows, const std::string& file_name,
+ uint64_t num_rows, const FilePathDesc& path_desc,
std::unique_ptr<ColumnReader>* reader) {
if (is_scalar_type((FieldType)meta.type())) {
std::unique_ptr<ColumnReader> reader_local(
- new ColumnReader(opts, meta, num_rows, file_name));
+ new ColumnReader(opts, meta, num_rows, path_desc));
RETURN_IF_ERROR(reader_local->init());
*reader = std::move(reader_local);
return Status::OK();
@@ -53,26 +53,26 @@ Status ColumnReader::create(const ColumnReaderOptions& opts, const ColumnMetaPB&
std::unique_ptr<ColumnReader> item_reader;
RETURN_IF_ERROR(ColumnReader::create(opts, meta.children_columns(0),
- meta.children_columns(0).num_rows(), file_name,
+ meta.children_columns(0).num_rows(), path_desc,
&item_reader));
RETURN_IF_ERROR(item_reader->init());
std::unique_ptr<ColumnReader> offset_reader;
RETURN_IF_ERROR(ColumnReader::create(opts, meta.children_columns(1),
- meta.children_columns(1).num_rows(), file_name,
+ meta.children_columns(1).num_rows(), path_desc,
&offset_reader));
RETURN_IF_ERROR(offset_reader->init());
std::unique_ptr<ColumnReader> null_reader;
if (meta.is_nullable()) {
RETURN_IF_ERROR(ColumnReader::create(opts, meta.children_columns(2),
- meta.children_columns(2).num_rows(), file_name,
+ meta.children_columns(2).num_rows(), path_desc,
&null_reader));
RETURN_IF_ERROR(null_reader->init());
}
std::unique_ptr<ColumnReader> array_reader(
- new ColumnReader(opts, meta, num_rows, file_name));
+ new ColumnReader(opts, meta, num_rows, path_desc));
// array reader do not need to init
array_reader->_sub_readers.resize(meta.children_columns_size());
array_reader->_sub_readers[0] = std::move(item_reader);
@@ -91,8 +91,8 @@ Status ColumnReader::create(const ColumnReaderOptions& opts, const ColumnMetaPB&
}
ColumnReader::ColumnReader(const ColumnReaderOptions& opts, const ColumnMetaPB& meta,
- uint64_t num_rows, const std::string& file_name)
- : _meta(meta), _opts(opts), _num_rows(num_rows), _file_name(file_name) {}
+ uint64_t num_rows, FilePathDesc path_desc)
+ : _meta(meta), _opts(opts), _num_rows(num_rows), _path_desc(path_desc) {}
ColumnReader::~ColumnReader() = default;
@@ -122,12 +122,12 @@ Status ColumnReader::init() {
break;
default:
return Status::Corruption(strings::Substitute(
- "Bad file $0: invalid column index type $1", _file_name, index_meta.type()));
+ "Bad file $0: invalid column index type $1", _path_desc.filepath, index_meta.type()));
}
}
if (_ordinal_index_meta == nullptr) {
return Status::Corruption(strings::Substitute(
- "Bad file $0: missing ordinal index for column $1", _file_name, _meta.column_id()));
+ "Bad file $0: missing ordinal index for column $1", _path_desc.filepath, _meta.column_id()));
}
return Status::OK();
}
@@ -290,13 +290,13 @@ Status ColumnReader::get_row_ranges_by_bloom_filter(CondColumn* cond_column,
Status ColumnReader::_load_ordinal_index(bool use_page_cache, bool kept_in_memory) {
DCHECK(_ordinal_index_meta != nullptr);
- _ordinal_index.reset(new OrdinalIndexReader(_file_name, _ordinal_index_meta, _num_rows));
+ _ordinal_index.reset(new OrdinalIndexReader(_path_desc, _ordinal_index_meta, _num_rows));
return _ordinal_index->load(use_page_cache, kept_in_memory);
}
Status ColumnReader::_load_zone_map_index(bool use_page_cache, bool kept_in_memory) {
if (_zone_map_index_meta != nullptr) {
- _zone_map_index.reset(new ZoneMapIndexReader(_file_name, _zone_map_index_meta));
+ _zone_map_index.reset(new ZoneMapIndexReader(_path_desc, _zone_map_index_meta));
return _zone_map_index->load(use_page_cache, kept_in_memory);
}
return Status::OK();
@@ -304,7 +304,7 @@ Status ColumnReader::_load_zone_map_index(bool use_page_cache, bool kept_in_memo
Status ColumnReader::_load_bitmap_index(bool use_page_cache, bool kept_in_memory) {
if (_bitmap_index_meta != nullptr) {
- _bitmap_index.reset(new BitmapIndexReader(_file_name, _bitmap_index_meta));
+ _bitmap_index.reset(new BitmapIndexReader(_path_desc, _bitmap_index_meta));
return _bitmap_index->load(use_page_cache, kept_in_memory);
}
return Status::OK();
@@ -312,7 +312,7 @@ Status ColumnReader::_load_bitmap_index(bool use_page_cache, bool kept_in_memory
Status ColumnReader::_load_bloom_filter_index(bool use_page_cache, bool kept_in_memory) {
if (_bf_index_meta != nullptr) {
- _bloom_filter_index.reset(new BloomFilterIndexReader(_file_name, _bf_index_meta));
+ _bloom_filter_index.reset(new BloomFilterIndexReader(_path_desc, _bf_index_meta));
return _bloom_filter_index->load(use_page_cache, kept_in_memory);
}
return Status::OK();
diff --git a/be/src/olap/rowset/segment_v2/column_reader.h b/be/src/olap/rowset/segment_v2/column_reader.h
index 0e546f5..0b3588c 100644
--- a/be/src/olap/rowset/segment_v2/column_reader.h
+++ b/be/src/olap/rowset/segment_v2/column_reader.h
@@ -89,7 +89,7 @@ public:
// Create an initialized ColumnReader in *reader.
// This should be a lightweight operation without I/O.
static Status create(const ColumnReaderOptions& opts, const ColumnMetaPB& meta,
- uint64_t num_rows, const std::string& file_name,
+ uint64_t num_rows, const FilePathDesc& path_desc,
std::unique_ptr<ColumnReader>* reader);
~ColumnReader();
@@ -132,7 +132,7 @@ public:
private:
ColumnReader(const ColumnReaderOptions& opts, const ColumnMetaPB& meta, uint64_t num_rows,
- const std::string& file_name);
+ FilePathDesc path_desc);
Status init();
// Read and load necessary column indexes into memory if it hasn't been loaded.
@@ -167,7 +167,7 @@ private:
ColumnMetaPB _meta;
ColumnReaderOptions _opts;
uint64_t _num_rows;
- std::string _file_name;
+ FilePathDesc _path_desc;
const TypeInfo* _type_info = nullptr; // initialized in init(), may changed by subclasses.
const EncodingInfo* _encoding_info =
diff --git a/be/src/olap/rowset/segment_v2/indexed_column_reader.cpp b/be/src/olap/rowset/segment_v2/indexed_column_reader.cpp
index 1ac6692..30bd7bd 100644
--- a/be/src/olap/rowset/segment_v2/indexed_column_reader.cpp
+++ b/be/src/olap/rowset/segment_v2/indexed_column_reader.cpp
@@ -41,8 +41,8 @@ Status IndexedColumnReader::load(bool use_page_cache, bool kept_in_memory) {
_value_key_coder = get_key_coder(_type_info->type());
std::unique_ptr<fs::ReadableBlock> rblock;
- fs::BlockManager* block_mgr = fs::fs_util::block_manager();
- RETURN_IF_ERROR(block_mgr->open_block(_file_name, &rblock));
+ fs::BlockManager* block_mgr = fs::fs_util::block_manager(_path_desc.storage_medium);
+ RETURN_IF_ERROR(block_mgr->open_block(_path_desc, &rblock));
// read and parse ordinal index page when exists
if (_meta.has_ordinal_index_meta()) {
if (_meta.ordinal_index_meta().is_root_data_page()) {
diff --git a/be/src/olap/rowset/segment_v2/indexed_column_reader.h b/be/src/olap/rowset/segment_v2/indexed_column_reader.h
index 1cd42d8..94e3cd3 100644
--- a/be/src/olap/rowset/segment_v2/indexed_column_reader.h
+++ b/be/src/olap/rowset/segment_v2/indexed_column_reader.h
@@ -20,6 +20,7 @@
#include <memory>
#include "common/status.h"
+#include "env/env.h"
#include "gen_cpp/segment_v2.pb.h"
#include "olap/column_block.h"
#include "olap/fs/fs_util.h"
@@ -44,8 +45,8 @@ class IndexedColumnIterator;
// thread-safe reader for IndexedColumn (see comments of `IndexedColumnWriter` to understand what IndexedColumn is)
class IndexedColumnReader {
public:
- explicit IndexedColumnReader(const std::string& file_name, const IndexedColumnMetaPB& meta)
- : _file_name(file_name), _meta(meta){};
+ explicit IndexedColumnReader(const FilePathDesc& path_desc, const IndexedColumnMetaPB& meta)
+ : _path_desc(path_desc), _meta(meta){};
Status load(bool use_page_cache, bool kept_in_memory);
@@ -65,7 +66,7 @@ private:
friend class IndexedColumnIterator;
- std::string _file_name;
+ FilePathDesc _path_desc;
IndexedColumnMetaPB _meta;
bool _use_page_cache;
@@ -93,10 +94,10 @@ public:
: _reader(reader),
_ordinal_iter(&reader->_ordinal_index_reader),
_value_iter(&reader->_value_index_reader) {
- fs::BlockManager* block_manager = fs::fs_util::block_manager();
- auto st = block_manager->open_block(_reader->_file_name, &_rblock);
+ fs::BlockManager* block_manager = fs::fs_util::block_manager(_reader->_path_desc.storage_medium);
+ auto st = block_manager->open_block(_reader->_path_desc, &_rblock);
DCHECK(st.ok());
- WARN_IF_ERROR(st, "open file failed:" + _reader->_file_name);
+ WARN_IF_ERROR(st, "open file failed:" + _reader->_path_desc.filepath);
}
// Seek to the given ordinal entry. Entry 0 is the first entry.
diff --git a/be/src/olap/rowset/segment_v2/ordinal_page_index.cpp b/be/src/olap/rowset/segment_v2/ordinal_page_index.cpp
index 6c4d94c..515c55b 100644
--- a/be/src/olap/rowset/segment_v2/ordinal_page_index.cpp
+++ b/be/src/olap/rowset/segment_v2/ordinal_page_index.cpp
@@ -35,7 +35,7 @@ void OrdinalIndexWriter::append_entry(ordinal_t ordinal, const PagePointer& data
}
Status OrdinalIndexWriter::finish(fs::WritableBlock* wblock, ColumnIndexMetaPB* meta) {
- CHECK(_page_builder->count() > 0) << "no entry has been added, filepath=" << wblock->path();
+ CHECK(_page_builder->count() > 0) << "no entry has been added, filepath=" << wblock->path_desc().filepath;
meta->set_type(ORDINAL_INDEX);
BTreeMetaPB* root_page_meta = meta->mutable_ordinal_index()->mutable_root_page();
@@ -69,8 +69,8 @@ Status OrdinalIndexReader::load(bool use_page_cache, bool kept_in_memory) {
}
// need to read index page
std::unique_ptr<fs::ReadableBlock> rblock;
- fs::BlockManager* block_mgr = fs::fs_util::block_manager();
- RETURN_IF_ERROR(block_mgr->open_block(_filename, &rblock));
+ fs::BlockManager* block_mgr = fs::fs_util::block_manager(_path_desc.storage_medium);
+ RETURN_IF_ERROR(block_mgr->open_block(_path_desc, &rblock));
PageReadOptions opts;
opts.rblock = rblock.get();
diff --git a/be/src/olap/rowset/segment_v2/ordinal_page_index.h b/be/src/olap/rowset/segment_v2/ordinal_page_index.h
index 5eef5b4..855e966 100644
--- a/be/src/olap/rowset/segment_v2/ordinal_page_index.h
+++ b/be/src/olap/rowset/segment_v2/ordinal_page_index.h
@@ -22,6 +22,7 @@
#include <string>
#include "common/status.h"
+#include "env/env.h"
#include "gutil/macros.h"
#include "olap/rowset/segment_v2/common.h"
#include "olap/rowset/segment_v2/index_page.h"
@@ -61,9 +62,9 @@ class OrdinalPageIndexIterator;
class OrdinalIndexReader {
public:
- explicit OrdinalIndexReader(const std::string& filename, const OrdinalIndexPB* index_meta,
+ explicit OrdinalIndexReader(const FilePathDesc& path_desc, const OrdinalIndexPB* index_meta,
ordinal_t num_values)
- : _filename(filename), _index_meta(index_meta), _num_values(num_values) {}
+ : _path_desc(path_desc), _index_meta(index_meta), _num_values(num_values) {}
// load and parse the index page into memory
Status load(bool use_page_cache, bool kept_in_memory);
@@ -83,7 +84,7 @@ public:
private:
friend OrdinalPageIndexIterator;
- std::string _filename;
+ FilePathDesc _path_desc;
const OrdinalIndexPB* _index_meta;
// total number of values (including NULLs) in the indexed column,
// equals to 1 + 'last ordinal of last data pages'
diff --git a/be/src/olap/rowset/segment_v2/page_io.cpp b/be/src/olap/rowset/segment_v2/page_io.cpp
index 8f455b1..739cde1 100644
--- a/be/src/olap/rowset/segment_v2/page_io.cpp
+++ b/be/src/olap/rowset/segment_v2/page_io.cpp
@@ -116,7 +116,7 @@ Status PageIO::read_and_decompress_page(const PageReadOptions& opts, PageHandle*
auto cache = StoragePageCache::instance();
PageCacheHandle cache_handle;
- StoragePageCache::CacheKey cache_key(opts.rblock->path(), opts.page_pointer.offset);
+ StoragePageCache::CacheKey cache_key(opts.rblock->path_desc().filepath, opts.page_pointer.offset);
if (opts.use_page_cache && cache->is_cache_available(opts.type) && cache->lookup(cache_key, &cache_handle, opts.type)) {
// we find page in cache, use it
*handle = PageHandle(std::move(cache_handle));
diff --git a/be/src/olap/rowset/segment_v2/segment.cpp b/be/src/olap/rowset/segment_v2/segment.cpp
index fbfd52f..470efc0 100644
--- a/be/src/olap/rowset/segment_v2/segment.cpp
+++ b/be/src/olap/rowset/segment_v2/segment.cpp
@@ -37,16 +37,18 @@ namespace segment_v2 {
using strings::Substitute;
-Status Segment::open(std::string filename, uint32_t segment_id, const TabletSchema* tablet_schema,
+Status Segment::open(const FilePathDesc& path_desc, uint32_t segment_id, const TabletSchema* tablet_schema,
std::shared_ptr<Segment>* output) {
- std::shared_ptr<Segment> segment(new Segment(std::move(filename), segment_id, tablet_schema));
- RETURN_IF_ERROR(segment->_open());
+ std::shared_ptr<Segment> segment(new Segment(path_desc, segment_id, tablet_schema));
+ if (!Env::get_env(path_desc.storage_medium)->is_remote_env()) {
+ RETURN_IF_ERROR(segment->_open());
+ }
output->swap(segment);
return Status::OK();
}
-Segment::Segment(std::string fname, uint32_t segment_id, const TabletSchema* tablet_schema)
- : _fname(std::move(fname)), _segment_id(segment_id),
+Segment::Segment(const FilePathDesc& path_desc, uint32_t segment_id, const TabletSchema* tablet_schema)
+ : _path_desc(path_desc), _segment_id(segment_id),
_tablet_schema(tablet_schema) {
#ifndef BE_TEST
_mem_tracker = MemTracker::CreateTracker(-1, "Segment", StorageEngine::instance()->tablet_mem_tracker(), false);
@@ -62,12 +64,16 @@ Segment::~Segment() {
Status Segment::_open() {
RETURN_IF_ERROR(_parse_footer());
RETURN_IF_ERROR(_create_column_readers());
+ _is_open = true;
return Status::OK();
}
Status Segment::new_iterator(const Schema& schema, const StorageReadOptions& read_options,
std::shared_ptr<MemTracker> parent,
std::unique_ptr<RowwiseIterator>* iter) {
+ if (!_is_open) {
+ RETURN_IF_ERROR(_open());
+ }
DCHECK_NOTNULL(read_options.stats);
read_options.stats->total_segment_number++;
// trying to prune the current segment by segment-level zone map
@@ -96,15 +102,15 @@ Status Segment::new_iterator(const Schema& schema, const StorageReadOptions& rea
Status Segment::_parse_footer() {
// Footer := SegmentFooterPB, FooterPBSize(4), FooterPBChecksum(4), MagicNumber(4)
std::unique_ptr<fs::ReadableBlock> rblock;
- fs::BlockManager* block_mgr = fs::fs_util::block_manager();
- RETURN_IF_ERROR(block_mgr->open_block(_fname, &rblock));
+ fs::BlockManager* block_mgr = fs::fs_util::block_manager(_path_desc.storage_medium);
+ RETURN_IF_ERROR(block_mgr->open_block(_path_desc, &rblock));
uint64_t file_size;
RETURN_IF_ERROR(rblock->size(&file_size));
if (file_size < 12) {
return Status::Corruption(
- strings::Substitute("Bad segment file $0: file size $1 < 12", _fname, file_size));
+ strings::Substitute("Bad segment file $0: file size $1 < 12", _path_desc.filepath, file_size));
}
uint8_t fixed_buf[12];
@@ -113,14 +119,14 @@ Status Segment::_parse_footer() {
// validate magic number
if (memcmp(fixed_buf + 8, k_segment_magic, k_segment_magic_length) != 0) {
return Status::Corruption(
- strings::Substitute("Bad segment file $0: magic number not match", _fname));
+ strings::Substitute("Bad segment file $0: magic number not match", _path_desc.filepath));
}
// read footer PB
uint32_t footer_length = decode_fixed32_le(fixed_buf);
if (file_size < 12 + footer_length) {
return Status::Corruption(strings::Substitute("Bad segment file $0: file size $1 < $2",
- _fname, file_size, 12 + footer_length));
+ _path_desc.filepath, file_size, 12 + footer_length));
}
_mem_tracker->Consume(footer_length);
@@ -133,14 +139,14 @@ Status Segment::_parse_footer() {
uint32_t actual_checksum = crc32c::Value(footer_buf.data(), footer_buf.size());
if (actual_checksum != expect_checksum) {
return Status::Corruption(strings::Substitute(
- "Bad segment file $0: footer checksum not match, actual=$1 vs expect=$2", _fname,
- actual_checksum, expect_checksum));
+ "Bad segment file $0: footer checksum not match, actual=$1 vs expect=$2",
+ _path_desc.filepath, actual_checksum, expect_checksum));
}
// deserialize footer PB
if (!_footer.ParseFromString(footer_buf)) {
return Status::Corruption(strings::Substitute(
- "Bad segment file $0: failed to parse SegmentFooterPB", _fname));
+ "Bad segment file $0: failed to parse SegmentFooterPB", _path_desc.filepath));
}
return Status::OK();
}
@@ -149,8 +155,8 @@ Status Segment::_load_index() {
return _load_index_once.call([this] {
// read and parse short key index page
std::unique_ptr<fs::ReadableBlock> rblock;
- fs::BlockManager* block_mgr = fs::fs_util::block_manager();
- RETURN_IF_ERROR(block_mgr->open_block(_fname, &rblock));
+ fs::BlockManager* block_mgr = fs::fs_util::block_manager(_path_desc.storage_medium);
+ RETURN_IF_ERROR(block_mgr->open_block(_path_desc, &rblock));
PageReadOptions opts;
opts.rblock = rblock.get();
@@ -190,7 +196,7 @@ Status Segment::_create_column_readers() {
opts.kept_in_memory = _tablet_schema->is_in_memory();
std::unique_ptr<ColumnReader> reader;
RETURN_IF_ERROR(ColumnReader::create(opts, _footer.columns(iter->second),
- _footer.num_rows(), _fname, &reader));
+ _footer.num_rows(), _path_desc, &reader));
_column_readers[ordinal] = std::move(reader);
}
return Status::OK();
diff --git a/be/src/olap/rowset/segment_v2/segment.h b/be/src/olap/rowset/segment_v2/segment.h
index 0cfd408..56fc852 100644
--- a/be/src/olap/rowset/segment_v2/segment.h
+++ b/be/src/olap/rowset/segment_v2/segment.h
@@ -59,7 +59,7 @@ using SegmentSharedPtr = std::shared_ptr<Segment>;
// change finished, client should disable all cached Segment for old TabletSchema.
class Segment : public std::enable_shared_from_this<Segment> {
public:
- static Status open(std::string filename, uint32_t segment_id, const TabletSchema* tablet_schema,
+ static Status open(const FilePathDesc& path_desc, uint32_t segment_id, const TabletSchema* tablet_schema,
std::shared_ptr<Segment>* output);
~Segment();
@@ -105,7 +105,7 @@ public:
private:
DISALLOW_COPY_AND_ASSIGN(Segment);
- Segment(std::string fname, uint32_t segment_id, const TabletSchema* tablet_schema);
+ Segment(const FilePathDesc& path_desc, uint32_t segment_id, const TabletSchema* tablet_schema);
// open segment file and read the minimum amount of necessary information (footer)
Status _open();
Status _parse_footer();
@@ -116,7 +116,7 @@ private:
private:
friend class SegmentIterator;
- std::string _fname;
+ FilePathDesc _path_desc;
uint32_t _segment_id;
const TabletSchema* _tablet_schema;
@@ -141,6 +141,9 @@ private:
PageHandle _sk_index_handle;
// short key index decoder
std::unique_ptr<ShortKeyIndexDecoder> _sk_index_decoder;
+ // segment footer need not to be read for remote storage, so _is_open is false. When remote file
+ // need to be read. footer will be read and _is_open will be set to true.
+ bool _is_open = false;
};
} // namespace segment_v2
diff --git a/be/src/olap/rowset/segment_v2/segment_iterator.cpp b/be/src/olap/rowset/segment_v2/segment_iterator.cpp
index 6406d95..cb7e0d5 100644
--- a/be/src/olap/rowset/segment_v2/segment_iterator.cpp
+++ b/be/src/olap/rowset/segment_v2/segment_iterator.cpp
@@ -123,8 +123,8 @@ Status SegmentIterator::init(const StorageReadOptions& opts) {
Status SegmentIterator::_init() {
DorisMetrics::instance()->segment_read_total->increment(1);
// get file handle from file descriptor of segment
- fs::BlockManager* block_mgr = fs::fs_util::block_manager();
- RETURN_IF_ERROR(block_mgr->open_block(_segment->_fname, &_rblock));
+ fs::BlockManager* block_mgr = fs::fs_util::block_manager(_segment->_path_desc.storage_medium);
+ RETURN_IF_ERROR(block_mgr->open_block(_segment->_path_desc, &_rblock));
_row_bitmap.addRange(0, _segment->num_rows());
RETURN_IF_ERROR(_init_return_column_iterators());
RETURN_IF_ERROR(_init_bitmap_index_iterators());
diff --git a/be/src/olap/rowset/segment_v2/zone_map_index.cpp b/be/src/olap/rowset/segment_v2/zone_map_index.cpp
index c051319..ce29cfd 100644
--- a/be/src/olap/rowset/segment_v2/zone_map_index.cpp
+++ b/be/src/olap/rowset/segment_v2/zone_map_index.cpp
@@ -125,7 +125,7 @@ Status ZoneMapIndexWriter::finish(fs::WritableBlock* wblock, ColumnIndexMetaPB*
}
Status ZoneMapIndexReader::load(bool use_page_cache, bool kept_in_memory) {
- IndexedColumnReader reader(_filename, _index_meta->page_zone_maps());
+ IndexedColumnReader reader(_path_desc, _index_meta->page_zone_maps());
RETURN_IF_ERROR(reader.load(use_page_cache, kept_in_memory));
IndexedColumnIterator iter(&reader);
diff --git a/be/src/olap/rowset/segment_v2/zone_map_index.h b/be/src/olap/rowset/segment_v2/zone_map_index.h
index 6efc786..0c129c5 100644
--- a/be/src/olap/rowset/segment_v2/zone_map_index.h
+++ b/be/src/olap/rowset/segment_v2/zone_map_index.h
@@ -22,6 +22,7 @@
#include <vector>
#include "common/status.h"
+#include "env/env.h"
#include "gen_cpp/segment_v2.pb.h"
#include "olap/field.h"
#include "olap/rowset/segment_v2/binary_plain_page.h"
@@ -118,8 +119,8 @@ private:
class ZoneMapIndexReader {
public:
- explicit ZoneMapIndexReader(const std::string& filename, const ZoneMapIndexPB* index_meta)
- : _filename(filename), _index_meta(index_meta) {}
+ explicit ZoneMapIndexReader(const FilePathDesc& path_desc, const ZoneMapIndexPB* index_meta)
+ : _path_desc(path_desc), _index_meta(index_meta) {}
// load all page zone maps into memory
Status load(bool use_page_cache, bool kept_in_memory);
@@ -129,7 +130,7 @@ public:
int32_t num_pages() const { return _page_zone_maps.size(); }
private:
- std::string _filename;
+ FilePathDesc _path_desc;
const ZoneMapIndexPB* _index_meta;
std::vector<ZoneMapPB> _page_zone_maps;
diff --git a/be/src/olap/schema_change.cpp b/be/src/olap/schema_change.cpp
index 2effb36..05fecb1 100644
--- a/be/src/olap/schema_change.cpp
+++ b/be/src/olap/schema_change.cpp
@@ -1354,7 +1354,7 @@ bool SchemaChangeWithSorting::_internal_sorting(const std::vector<RowBlock*>& ro
context.partition_id = new_tablet->partition_id();
context.tablet_schema_hash = new_tablet->schema_hash();
context.rowset_type = new_rowset_type;
- context.rowset_path_prefix = new_tablet->tablet_path();
+ context.path_desc = new_tablet->tablet_path_desc();
context.tablet_schema = &(new_tablet->tablet_schema());
context.rowset_state = VISIBLE;
context.version = version;
@@ -1760,7 +1760,7 @@ OLAPStatus SchemaChangeHandler::schema_version_convert(TabletSharedPtr base_tabl
if (new_tablet->tablet_meta()->preferred_rowset_type() == BETA_ROWSET) {
writer_context.rowset_type = BETA_ROWSET;
}
- writer_context.rowset_path_prefix = new_tablet->tablet_path();
+ writer_context.path_desc = new_tablet->tablet_path_desc();
writer_context.tablet_schema = &(new_tablet->tablet_schema());
writer_context.rowset_state = PREPARED;
writer_context.txn_id = (*base_rowset)->txn_id();
@@ -1907,7 +1907,7 @@ OLAPStatus SchemaChangeHandler::_convert_historical_rowsets(const SchemaChangePa
// And in this case, linked schema change will not be used.
writer_context.rowset_type = BETA_ROWSET;
}
- writer_context.rowset_path_prefix = new_tablet->tablet_path();
+ writer_context.path_desc = new_tablet->tablet_path_desc();
writer_context.tablet_schema = &(new_tablet->tablet_schema());
writer_context.rowset_state = VISIBLE;
writer_context.version = rs_reader->version();
diff --git a/be/src/olap/snapshot_manager.cpp b/be/src/olap/snapshot_manager.cpp
index 0573d1d..3789a1e 100644
--- a/be/src/olap/snapshot_manager.cpp
+++ b/be/src/olap/snapshot_manager.cpp
@@ -38,7 +38,6 @@
#include "olap/rowset/rowset_writer.h"
#include "olap/storage_engine.h"
-using std::filesystem::copy_file;
using std::filesystem::path;
using std::map;
using std::nothrow;
@@ -96,14 +95,17 @@ OLAPStatus SnapshotManager::release_snapshot(const string& snapshot_path) {
// 否则认为是非法请求,返回错误结果
auto stores = StorageEngine::instance()->get_stores();
for (auto store : stores) {
+ if (store->is_remote()) {
+ continue;
+ }
std::string abs_path;
- RETURN_WITH_WARN_IF_ERROR(FileUtils::canonicalize(store->path(), &abs_path),
+ RETURN_WITH_WARN_IF_ERROR(store->env()->canonicalize(store->path(), &abs_path),
OLAP_ERR_DIR_NOT_EXIST,
"canonical path " + store->path() + "failed");
if (snapshot_path.compare(0, abs_path.size(), abs_path) == 0 &&
snapshot_path.compare(abs_path.size(), SNAPSHOT_PREFIX.size(), SNAPSHOT_PREFIX) == 0) {
- FileUtils::remove_all(snapshot_path);
+ store->env()->delete_dir(snapshot_path);
LOG(INFO) << "success to release snapshot path. [path='" << snapshot_path << "']";
return OLAP_SUCCESS;
@@ -117,18 +119,18 @@ OLAPStatus SnapshotManager::release_snapshot(const string& snapshot_path) {
// TODO support beta rowset
// For now, alpha and beta rowset meta have same fields, so we can just use
// AlphaRowsetMeta here.
-OLAPStatus SnapshotManager::convert_rowset_ids(const string& clone_dir, int64_t tablet_id,
+OLAPStatus SnapshotManager::convert_rowset_ids(const FilePathDesc& clone_dir_desc, int64_t tablet_id,
const int32_t& schema_hash) {
OLAPStatus res = OLAP_SUCCESS;
// check clone dir existed
- if (!FileUtils::check_exist(clone_dir)) {
+ if (!FileUtils::check_exist(clone_dir_desc.filepath)) {
res = OLAP_ERR_DIR_NOT_EXIST;
- LOG(WARNING) << "clone dir not existed when convert rowsetids. clone_dir=" << clone_dir;
+ LOG(WARNING) << "clone dir not existed when convert rowsetids. clone_dir=" << clone_dir_desc.debug_string();
return res;
}
// load original tablet meta
- string cloned_meta_file = clone_dir + "/" + std::to_string(tablet_id) + ".hdr";
+ string cloned_meta_file = clone_dir_desc.filepath + "/" + std::to_string(tablet_id) + ".hdr";
TabletMeta cloned_tablet_meta;
if ((res = cloned_tablet_meta.create_from_file(cloned_meta_file)) != OLAP_SUCCESS) {
LOG(WARNING) << "fail to load original tablet meta after clone. "
@@ -156,7 +158,7 @@ OLAPStatus SnapshotManager::convert_rowset_ids(const string& clone_dir, int64_t
for (auto& visible_rowset : cloned_tablet_meta_pb.rs_metas()) {
RowsetMetaPB* rowset_meta = new_tablet_meta_pb.add_rs_metas();
RowsetId rowset_id = StorageEngine::instance()->next_rowset_id();
- RETURN_NOT_OK(_rename_rowset_id(visible_rowset, clone_dir, tablet_schema, rowset_id,
+ RETURN_NOT_OK(_rename_rowset_id(visible_rowset, clone_dir_desc, tablet_schema, rowset_id,
rowset_meta));
rowset_meta->set_tablet_id(tablet_id);
rowset_meta->set_tablet_schema_hash(schema_hash);
@@ -173,14 +175,14 @@ OLAPStatus SnapshotManager::convert_rowset_ids(const string& clone_dir, int64_t
RowsetMetaPB* rowset_meta = new_tablet_meta_pb.add_stale_rs_metas();
RowsetId rowset_id = StorageEngine::instance()->next_rowset_id();
RETURN_NOT_OK(
- _rename_rowset_id(stale_rowset, clone_dir, tablet_schema, rowset_id, rowset_meta));
+ _rename_rowset_id(stale_rowset, clone_dir_desc, tablet_schema, rowset_id, rowset_meta));
rowset_meta->set_tablet_id(tablet_id);
rowset_meta->set_tablet_schema_hash(schema_hash);
}
res = TabletMeta::save(cloned_meta_file, new_tablet_meta_pb);
if (res != OLAP_SUCCESS) {
- LOG(WARNING) << "fail to save converted tablet meta to dir='" << clone_dir;
+ LOG(WARNING) << "fail to save converted tablet meta to dir='" << clone_dir_desc.filepath;
return res;
}
@@ -188,7 +190,7 @@ OLAPStatus SnapshotManager::convert_rowset_ids(const string& clone_dir, int64_t
}
OLAPStatus SnapshotManager::_rename_rowset_id(const RowsetMetaPB& rs_meta_pb,
- const string& new_path, TabletSchema& tablet_schema,
+ const FilePathDesc& new_path_desc, TabletSchema& tablet_schema,
const RowsetId& rowset_id,
RowsetMetaPB* new_rs_meta_pb) {
OLAPStatus res = OLAP_SUCCESS;
@@ -202,7 +204,7 @@ OLAPStatus SnapshotManager::_rename_rowset_id(const RowsetMetaPB& rs_meta_pb,
alpha_rowset_meta->init_from_pb(rs_meta_pb);
RowsetSharedPtr org_rowset;
RETURN_NOT_OK(
- RowsetFactory::create_rowset(&tablet_schema, new_path, alpha_rowset_meta, &org_rowset));
+ RowsetFactory::create_rowset(&tablet_schema, new_path_desc, alpha_rowset_meta, &org_rowset));
// do not use cache to load index
// because the index file may conflict
// and the cached fd may be invalid
@@ -214,7 +216,7 @@ OLAPStatus SnapshotManager::_rename_rowset_id(const RowsetMetaPB& rs_meta_pb,
context.partition_id = org_rowset_meta->partition_id();
context.tablet_schema_hash = org_rowset_meta->tablet_schema_hash();
context.rowset_type = org_rowset_meta->rowset_type();
- context.rowset_path_prefix = new_path;
+ context.path_desc = new_path_desc;
context.tablet_schema = &tablet_schema;
context.rowset_state = org_rowset_meta->rowset_state();
context.version = org_rowset_meta->version();
@@ -270,14 +272,12 @@ OLAPStatus SnapshotManager::_calc_snapshot_id_path(const TabletSharedPtr& tablet
// location: /path/to/data/DATA_PREFIX/shard_id
// return: /path/to/data/DATA_PREFIX/shard_id/tablet_id/schema_hash
-std::string SnapshotManager::get_schema_hash_full_path(const TabletSharedPtr& ref_tablet,
- const string& location) const {
- std::stringstream schema_full_path_stream;
- schema_full_path_stream << location << "/" << ref_tablet->tablet_id() << "/"
- << ref_tablet->schema_hash();
- string schema_full_path = schema_full_path_stream.str();
-
- return schema_full_path;
+FilePathDesc SnapshotManager::get_schema_hash_full_path(const TabletSharedPtr& ref_tablet,
+ const FilePathDesc& location_desc) const {
+ FilePathDescStream schema_full_path_desc_s;
+ schema_full_path_desc_s << location_desc << "/" << ref_tablet->tablet_id()
+ << "/" << ref_tablet->schema_hash();
+ return schema_full_path_desc_s.path_desc();
}
std::string SnapshotManager::_get_header_full_path(const TabletSharedPtr& ref_tablet,
@@ -288,11 +288,11 @@ std::string SnapshotManager::_get_header_full_path(const TabletSharedPtr& ref_ta
}
OLAPStatus SnapshotManager::_link_index_and_data_files(
- const string& schema_hash_path, const TabletSharedPtr& ref_tablet,
+ const FilePathDesc& schema_hash_path_desc, const TabletSharedPtr& ref_tablet,
const std::vector<RowsetSharedPtr>& consistent_rowsets) {
OLAPStatus res = OLAP_SUCCESS;
for (auto& rs : consistent_rowsets) {
- RETURN_NOT_OK(rs->link_files_to(schema_hash_path, rs->rowset_id()));
+ RETURN_NOT_OK(rs->link_files_to(schema_hash_path_desc, rs->rowset_id()));
}
return res;
}
@@ -325,19 +325,19 @@ OLAPStatus SnapshotManager::_create_snapshot_files(const TabletSharedPtr& ref_ta
return res;
}
- // schema_full_path:
+ // schema_full_path_desc.filepath:
// /snapshot_id_path/tablet_id/schema_hash/
- string schema_full_path = get_schema_hash_full_path(ref_tablet, snapshot_id_path);
+ FilePathDesc schema_full_path_desc = get_schema_hash_full_path(ref_tablet, snapshot_id_path);
// header_path:
// /schema_full_path/tablet_id.hdr
- string header_path = _get_header_full_path(ref_tablet, schema_full_path);
- if (FileUtils::check_exist(schema_full_path)) {
+ string header_path = _get_header_full_path(ref_tablet, schema_full_path_desc.filepath);
+ if (FileUtils::check_exist(schema_full_path_desc.filepath)) {
VLOG_TRACE << "remove the old schema_full_path.";
- FileUtils::remove_all(schema_full_path);
+ FileUtils::remove_all(schema_full_path_desc.filepath);
}
- RETURN_WITH_WARN_IF_ERROR(FileUtils::create_dir(schema_full_path), OLAP_ERR_CANNOT_CREATE_DIR,
- "create path " + schema_full_path + "failed");
+ RETURN_WITH_WARN_IF_ERROR(FileUtils::create_dir(schema_full_path_desc.filepath), OLAP_ERR_CANNOT_CREATE_DIR,
+ "create path " + schema_full_path_desc.filepath + "failed");
string snapshot_id;
RETURN_WITH_WARN_IF_ERROR(FileUtils::canonicalize(snapshot_id_path, &snapshot_id),
@@ -423,7 +423,7 @@ OLAPStatus SnapshotManager::_create_snapshot_files(const TabletSharedPtr& ref_ta
std::vector<RowsetMetaSharedPtr> rs_metas;
for (auto& rs : consistent_rowsets) {
- res = rs->link_files_to(schema_full_path, rs->rowset_id());
+ res = rs->link_files_to(schema_full_path_desc, rs->rowset_id());
if (res != OLAP_SUCCESS) {
break;
}
@@ -445,8 +445,8 @@ OLAPStatus SnapshotManager::_create_snapshot_files(const TabletSharedPtr& ref_ta
if (snapshot_version == g_Types_constants.TSNAPSHOT_REQ_VERSION1) {
// convert beta rowset to alpha rowset
- res = _convert_beta_rowsets_to_alpha(new_tablet_meta, new_tablet_meta->all_rs_metas(),
- schema_full_path);
+ res = _convert_beta_rowsets_to_alpha(
+ new_tablet_meta, new_tablet_meta->all_rs_metas(), schema_full_path_desc);
if (res != OLAP_SUCCESS) {
break;
}
@@ -488,7 +488,7 @@ OLAPStatus SnapshotManager::_create_snapshot_files(const TabletSharedPtr& ref_ta
OLAPStatus SnapshotManager::_convert_beta_rowsets_to_alpha(
const TabletMetaSharedPtr& new_tablet_meta,
- const std::vector<RowsetMetaSharedPtr>& rowset_metas, const std::string& dst_path) {
+ const std::vector<RowsetMetaSharedPtr>& rowset_metas, const FilePathDesc& dst_path_desc) {
OLAPStatus res = OLAP_SUCCESS;
RowsetConverter rowset_converter(new_tablet_meta);
std::vector<RowsetMetaSharedPtr> new_rowset_metas;
@@ -498,7 +498,7 @@ OLAPStatus SnapshotManager::_convert_beta_rowsets_to_alpha(
modified = true;
RowsetMetaPB rowset_meta_pb;
auto st =
- rowset_converter.convert_beta_to_alpha(rowset_meta, dst_path, &rowset_meta_pb);
+ rowset_converter.convert_beta_to_alpha(rowset_meta, dst_path_desc, &rowset_meta_pb);
if (st != OLAP_SUCCESS) {
res = st;
LOG(WARNING) << "convert beta to alpha failed"
diff --git a/be/src/olap/snapshot_manager.h b/be/src/olap/snapshot_manager.h
index d3685a4..0efa647 100644
--- a/be/src/olap/snapshot_manager.h
+++ b/be/src/olap/snapshot_manager.h
@@ -52,8 +52,8 @@ public:
OLAPStatus make_snapshot(const TSnapshotRequest& request, std::string* snapshot_path,
bool* allow_incremental_clone);
- std::string get_schema_hash_full_path(const TabletSharedPtr& ref_tablet,
- const std::string& location) const;
+ FilePathDesc get_schema_hash_full_path(const TabletSharedPtr& ref_tablet,
+ const FilePathDesc& location_desc) const;
// @brief 释放snapshot
// @param snapshot_path [in] 要被释放的snapshot的路径,只包含到ID
@@ -61,7 +61,7 @@ public:
static SnapshotManager* instance();
- OLAPStatus convert_rowset_ids(const string& clone_dir, int64_t tablet_id,
+ OLAPStatus convert_rowset_ids(const FilePathDesc& clone_dir_desc, int64_t tablet_id,
const int32_t& schema_hash);
private:
@@ -73,7 +73,7 @@ private:
std::string _get_header_full_path(const TabletSharedPtr& ref_tablet,
const std::string& schema_hash_path) const;
- OLAPStatus _link_index_and_data_files(const std::string& header_path,
+ OLAPStatus _link_index_and_data_files(const FilePathDesc& header_path_desc,
const TabletSharedPtr& ref_tablet,
const std::vector<RowsetSharedPtr>& consistent_rowsets);
@@ -84,13 +84,13 @@ private:
OLAPStatus _prepare_snapshot_dir(const TabletSharedPtr& ref_tablet,
std::string* snapshot_id_path);
- OLAPStatus _rename_rowset_id(const RowsetMetaPB& rs_meta_pb, const string& new_path,
+ OLAPStatus _rename_rowset_id(const RowsetMetaPB& rs_meta_pb, const FilePathDesc& new_path_desc,
TabletSchema& tablet_schema, const RowsetId& next_id,
RowsetMetaPB* new_rs_meta_pb);
OLAPStatus _convert_beta_rowsets_to_alpha(const TabletMetaSharedPtr& new_tablet_meta,
const vector<RowsetMetaSharedPtr>& rowset_metas,
- const std::string& dst_path);
+ const FilePathDesc& dst_path_desc);
private:
static SnapshotManager* _s_instance;
diff --git a/be/src/olap/storage_engine.cpp b/be/src/olap/storage_engine.cpp
index ba3320f..9de74cd 100644
--- a/be/src/olap/storage_engine.cpp
+++ b/be/src/olap/storage_engine.cpp
@@ -203,7 +203,7 @@ Status StorageEngine::_init_store_map() {
std::string error_msg;
for (auto& path : _options.store_paths) {
DataDir* store = new DataDir(path.path, path.capacity_bytes, path.storage_medium,
- _tablet_manager.get(), _txn_manager.get());
+ path.remote_path, _tablet_manager.get(), _txn_manager.get());
tmp_stores.emplace_back(store);
threads.emplace_back([store, &error_msg_lock, &error_msg]() {
auto st = store->init();
@@ -428,7 +428,7 @@ Status StorageEngine::_check_all_root_path_cluster_id() {
int32_t cluster_id = -1;
for (auto& it : _store_map) {
int32_t tmp_cluster_id = it.second->cluster_id();
- if (tmp_cluster_id == -1) {
+ if (it.second->cluster_id_incomplete()) {
_is_all_cluster_id_exist = false;
} else if (tmp_cluster_id == cluster_id) {
// both have right cluster id, do nothing
@@ -669,7 +669,7 @@ OLAPStatus StorageEngine::start_trash_sweep(double* usage, bool ignore_guard) {
double tmp_usage = 0.0;
for (DataDirInfo& info : data_dir_infos) {
- LOG(INFO) << "Start to sweep path " << info.path;
+ LOG(INFO) << "Start to sweep path " << info.path_desc.filepath;
if (!info.is_used) {
continue;
}
@@ -678,7 +678,7 @@ OLAPStatus StorageEngine::start_trash_sweep(double* usage, bool ignore_guard) {
tmp_usage = std::max(tmp_usage, curr_usage);
OLAPStatus curr_res = OLAP_SUCCESS;
- string snapshot_path = info.path + SNAPSHOT_PREFIX;
+ string snapshot_path = info.path_desc.filepath + SNAPSHOT_PREFIX;
curr_res = _do_sweep(snapshot_path, local_now, snapshot_expire);
if (curr_res != OLAP_SUCCESS) {
LOG(WARNING) << "failed to sweep snapshot. path=" << snapshot_path
@@ -686,7 +686,7 @@ OLAPStatus StorageEngine::start_trash_sweep(double* usage, bool ignore_guard) {
res = curr_res;
}
- string trash_path = info.path + TRASH_PREFIX;
+ string trash_path = info.path_desc.filepath + TRASH_PREFIX;
curr_res = _do_sweep(trash_path, local_now, curr_usage > guard_space ? 0 : trash_expire);
if (curr_res != OLAP_SUCCESS) {
LOG(WARNING) << "failed to sweep trash. [path=%s" << trash_path
@@ -745,6 +745,9 @@ void StorageEngine::_clean_unused_rowset_metas() {
};
auto data_dirs = get_stores();
for (auto data_dir : data_dirs) {
+ if (data_dir->is_remote()) {
+ continue;
+ }
RowsetMetaManager::traverse_rowset_metas(data_dir->get_meta(), clean_rowset_func);
for (auto& rowset_meta : invalid_rowset_metas) {
RowsetMetaManager::remove(data_dir->get_meta(), rowset_meta->tablet_uid(),
diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp
index 5be3837..aae8e1e 100644
--- a/be/src/olap/tablet.cpp
+++ b/be/src/olap/tablet.cpp
@@ -99,7 +99,7 @@ OLAPStatus Tablet::_init_once_action() {
for (const auto& rs_meta : _tablet_meta->all_rs_metas()) {
Version version = rs_meta->version();
RowsetSharedPtr rowset;
- res = RowsetFactory::create_rowset(&_schema, _tablet_path, rs_meta, &rowset);
+ res = RowsetFactory::create_rowset(&_schema, _tablet_path_desc, rs_meta, &rowset);
if (res != OLAP_SUCCESS) {
LOG(WARNING) << "fail to init rowset. tablet_id=" << tablet_id()
<< ", schema_hash=" << schema_hash() << ", version=" << version
@@ -113,7 +113,7 @@ OLAPStatus Tablet::_init_once_action() {
for (auto& stale_rs_meta : _tablet_meta->all_stale_rs_metas()) {
Version version = stale_rs_meta->version();
RowsetSharedPtr rowset;
- res = RowsetFactory::create_rowset(&_schema, _tablet_path, stale_rs_meta, &rowset);
+ res = RowsetFactory::create_rowset(&_schema, _tablet_path_desc, stale_rs_meta, &rowset);
if (res != OLAP_SUCCESS) {
LOG(WARNING) << "fail to init stale rowset. tablet_id:" << tablet_id()
<< ", schema_hash:" << schema_hash() << ", version=" << version
@@ -183,7 +183,7 @@ OLAPStatus Tablet::revise_tablet_meta(const std::vector<RowsetMetaSharedPtr>& ro
for (auto& rs_meta : rowsets_to_clone) {
Version version = {rs_meta->start_version(), rs_meta->end_version()};
RowsetSharedPtr rowset;
- res = RowsetFactory::create_rowset(&_schema, _tablet_path, rs_meta, &rowset);
+ res = RowsetFactory::create_rowset(&_schema, _tablet_path_desc, rs_meta, &rowset);
if (res != OLAP_SUCCESS) {
LOG(WARNING) << "fail to init rowset. version=" << version;
return res;
@@ -945,10 +945,10 @@ void Tablet::delete_all_files() {
bool Tablet::check_path(const std::string& path_to_check) const {
ReadLock rdlock(&_meta_lock);
- if (path_to_check == _tablet_path) {
+ if (path_to_check == _tablet_path_desc.filepath) {
return true;
}
- std::string tablet_id_dir = path_util::dir_name(_tablet_path);
+ std::string tablet_id_dir = path_util::dir_name(_tablet_path_desc.filepath);
if (path_to_check == tablet_id_dir) {
return true;
}
diff --git a/be/src/olap/tablet_manager.cpp b/be/src/olap/tablet_manager.cpp
index 9054cdb..3cb0859 100644
--- a/be/src/olap/tablet_manager.cpp
+++ b/be/src/olap/tablet_manager.cpp
@@ -30,6 +30,7 @@
#include <filesystem>
#include "env/env.h"
+#include "env/env_util.h"
#include "gutil/strings/strcat.h"
#include "olap/base_compaction.h"
#include "olap/cumulative_compaction.h"
@@ -115,10 +116,10 @@ OLAPStatus TabletManager::_add_tablet_unlocked(TTabletId tablet_id, SchemaHash s
}
if (!force) {
- if (existed_tablet->tablet_path() == tablet->tablet_path()) {
+ if (existed_tablet->tablet_path_desc().filepath == tablet->tablet_path_desc().filepath) {
LOG(WARNING) << "add the same tablet twice! tablet_id=" << tablet_id
<< ", schema_hash=" << schema_hash
- << ", tablet_path=" << tablet->tablet_path();
+ << ", tablet_path=" << tablet->tablet_path_desc().filepath;
return OLAP_ERR_ENGINE_INSERT_EXISTS_TABLE;
}
if (existed_tablet->data_dir() == tablet->data_dir()) {
@@ -168,8 +169,8 @@ OLAPStatus TabletManager::_add_tablet_unlocked(TTabletId tablet_id, SchemaHash s
<< ", tablet_id=" << tablet_id << ", schema_hash=" << schema_hash
<< ", old_version=" << old_version << ", new_version=" << new_version
<< ", old_time=" << old_time << ", new_time=" << new_time
- << ", old_tablet_path=" << existed_tablet->tablet_path()
- << ", new_tablet_path=" << tablet->tablet_path();
+ << ", old_tablet_path=" << existed_tablet->tablet_path_desc().debug_string()
+ << ", new_tablet_path=" << tablet->tablet_path_desc().debug_string();
return res;
}
@@ -763,9 +764,9 @@ OLAPStatus TabletManager::load_tablet_from_meta(DataDir* data_dir, TTabletId tab
// For case 2, If a tablet has just been copied to local BE,
// it may be cleared by gc-thread(see perform_path_gc_by_tablet) because the tablet meta may not be loaded to memory.
// So clone task should check path and then failed and retry in this case.
- if (check_path && !Env::Default()->path_exists(tablet->tablet_path()).ok()) {
+ if (check_path && !Env::Default()->path_exists(tablet->tablet_path_desc().filepath).ok()) {
LOG(WARNING) << "tablet path not exists, create tablet failed, path="
- << tablet->tablet_path();
+ << tablet->tablet_path_desc().filepath;
return OLAP_ERR_TABLE_ALREADY_DELETED_ERROR;
}
@@ -1002,16 +1003,16 @@ OLAPStatus TabletManager::start_trash_sweep() {
continue;
}
// move data to trash
- string tablet_path = (*it)->tablet_path();
- if (Env::Default()->path_exists(tablet_path).ok()) {
+ FilePathDesc tablet_path_desc = (*it)->tablet_path_desc();
+ if (Env::Default()->path_exists(tablet_path_desc.filepath).ok()) {
// take snapshot of tablet meta
string meta_file_path = path_util::join_path_segments(
- (*it)->tablet_path(), std::to_string((*it)->tablet_id()) + ".hdr");
+ tablet_path_desc.filepath, std::to_string((*it)->tablet_id()) + ".hdr");
(*it)->tablet_meta()->save(meta_file_path);
- LOG(INFO) << "start to move tablet to trash. tablet_path = " << tablet_path;
- OLAPStatus rm_st = move_to_trash(tablet_path, tablet_path);
+ LOG(INFO) << "start to move tablet to trash. " << tablet_path_desc.debug_string();
+ OLAPStatus rm_st = move_to_trash(tablet_path_desc.filepath, tablet_path_desc.filepath);
if (rm_st != OLAP_SUCCESS) {
- LOG(WARNING) << "fail to move dir to trash. dir=" << tablet_path;
+ LOG(WARNING) << "fail to move dir to trash. " << tablet_path_desc.debug_string();
++it;
continue;
}
@@ -1022,13 +1023,13 @@ OLAPStatus TabletManager::start_trash_sweep() {
LOG(INFO) << "successfully move tablet to trash. "
<< "tablet_id=" << (*it)->tablet_id()
<< ", schema_hash=" << (*it)->schema_hash()
- << ", tablet_path=" << tablet_path;
+ << ", tablet_path=" << tablet_path_desc.debug_string();
it = _shutdown_tablets.erase(it);
++clean_num;
} else {
// if could not find tablet info in meta store, then check if dir existed
- string tablet_path = (*it)->tablet_path();
- if (Env::Default()->path_exists(tablet_path).ok()) {
+ FilePathDesc tablet_path_desc = (*it)->tablet_path_desc();
+ if (Env::Default()->path_exists(tablet_path_desc.filepath).ok()) {
LOG(WARNING) << "errors while load meta from store, skip this tablet. "
<< "tablet_id=" << (*it)->tablet_id()
<< ", schema_hash=" << (*it)->schema_hash();
@@ -1037,7 +1038,7 @@ OLAPStatus TabletManager::start_trash_sweep() {
LOG(INFO) << "could not find tablet dir, skip it and remove it from gc-queue. "
<< "tablet_id=" << (*it)->tablet_id()
<< ", schema_hash=" << (*it)->schema_hash()
- << ", tablet_path=" << tablet_path;
+ << ", tablet_path=" << tablet_path_desc.filepath;
it = _shutdown_tablets.erase(it);
}
}
@@ -1238,7 +1239,7 @@ OLAPStatus TabletManager::_create_initial_rowset_unlocked(const TCreateTabletReq
DCHECK(false);
context.rowset_type = StorageEngine::instance()->default_rowset_type();
}
- context.rowset_path_prefix = tablet->tablet_path();
+ context.path_desc = tablet->tablet_path_desc();
context.tablet_schema = &(tablet->tablet_schema());
context.rowset_state = VISIBLE;
context.version = version;
@@ -1352,7 +1353,7 @@ OLAPStatus TabletManager::_drop_tablet_directly_unlocked(TTabletId tablet_id,
WriteLock wrlock(tablet->get_header_lock_ptr());
LOG(INFO) << "set tablet to shutdown state and remove it from memory. "
<< "tablet_id=" << tablet_id << ", schema_hash=" << schema_hash
- << ", tablet_path=" << dropped_tablet->tablet_path();
+ << ", tablet_path=" << dropped_tablet->tablet_path_desc().filepath;
// NOTE: has to update tablet here, but must not update tablet meta directly.
// because other thread may hold the tablet object, they may save meta too.
// If update meta directly here, other thread may override the meta
diff --git a/be/src/olap/tablet_meta.cpp b/be/src/olap/tablet_meta.cpp
index 8739486..249e554 100644
--- a/be/src/olap/tablet_meta.cpp
+++ b/be/src/olap/tablet_meta.cpp
@@ -42,7 +42,8 @@ OLAPStatus TabletMeta::create(const TCreateTabletReq& request, const TabletUid&
request.table_id, request.partition_id, request.tablet_id,
request.tablet_schema.schema_hash, shard_id, request.tablet_schema, next_unique_id,
col_ordinal_to_unique_id, tablet_uid,
- request.__isset.tablet_type ? request.tablet_type : TTabletType::TABLET_TYPE_DISK));
+ request.__isset.tablet_type ? request.tablet_type : TTabletType::TABLET_TYPE_DISK,
+ request.storage_medium));
return OLAP_SUCCESS;
}
@@ -52,7 +53,8 @@ TabletMeta::TabletMeta(int64_t table_id, int64_t partition_id, int64_t tablet_id
int32_t schema_hash, uint64_t shard_id, const TTabletSchema& tablet_schema,
uint32_t next_unique_id,
const std::unordered_map<uint32_t, uint32_t>& col_ordinal_to_unique_id,
- TabletUid tablet_uid, TTabletType::type tabletType)
+ TabletUid tablet_uid, TTabletType::type tabletType,
+ TStorageMedium::type t_storage_medium)
: _tablet_uid(0, 0), _schema(new TabletSchema) {
TabletMetaPB tablet_meta_pb;
tablet_meta_pb.set_table_id(table_id);
@@ -67,6 +69,7 @@ TabletMeta::TabletMeta(int64_t table_id, int64_t partition_id, int64_t tablet_id
tablet_meta_pb.set_tablet_type(tabletType == TTabletType::TABLET_TYPE_DISK
? TabletTypePB::TABLET_TYPE_DISK
: TabletTypePB::TABLET_TYPE_MEMORY);
+ tablet_meta_pb.set_storage_medium(fs::fs_util::get_storage_medium_pb(t_storage_medium));
TabletSchemaPB* schema = tablet_meta_pb.mutable_schema();
schema->set_num_short_key_columns(tablet_schema.short_key_column_count);
schema->set_num_rows_per_row_block(config::default_num_rows_per_column_file_block);
diff --git a/be/src/olap/tablet_meta.h b/be/src/olap/tablet_meta.h
index 069507e..9289ab3 100644
--- a/be/src/olap/tablet_meta.h
+++ b/be/src/olap/tablet_meta.h
@@ -81,7 +81,7 @@ public:
TabletMeta(int64_t table_id, int64_t partition_id, int64_t tablet_id, int32_t schema_hash,
uint64_t shard_id, const TTabletSchema& tablet_schema, uint32_t next_unique_id,
const std::unordered_map<uint32_t, uint32_t>& col_ordinal_to_unique_id,
- TabletUid tablet_uid, TTabletType::type tabletType);
+ TabletUid tablet_uid, TTabletType::type tabletType, TStorageMedium::type t_storage_medium);
// If need add a filed in TableMeta, filed init copy in copy construct function
TabletMeta(const TabletMeta& tablet_meta);
TabletMeta(TabletMeta&& tablet_meta) = delete;
diff --git a/be/src/olap/task/engine_clone_task.cpp b/be/src/olap/task/engine_clone_task.cpp
index 5af17e7..25a863c 100644
--- a/be/src/olap/task/engine_clone_task.cpp
+++ b/be/src/olap/task/engine_clone_task.cpp
@@ -82,7 +82,7 @@ OLAPStatus EngineCloneTask::_do_clone() {
}
// get download path
- string local_data_path = tablet->tablet_path() + CLONE_PREFIX;
+ string local_data_path = tablet->tablet_path_desc().filepath + CLONE_PREFIX;
bool allow_incremental_clone = false;
// try to incremental clone
@@ -562,7 +562,7 @@ OLAPStatus EngineCloneTask::_finish_clone(Tablet* tablet, const string& clone_di
}
set<string> local_files;
- string tablet_dir = tablet->tablet_path();
+ string tablet_dir = tablet->tablet_path_desc().filepath;
ret = FileUtils::list_dirs_files(tablet_dir, nullptr, &local_files, Env::Default());
if (!ret.ok()) {
LOG(WARNING) << "failed to list local tablet dir when clone. [tablet_dir=" << tablet_dir
@@ -761,7 +761,7 @@ OLAPStatus EngineCloneTask::_finish_full_clone(Tablet* tablet, TabletMeta* clone
RowsetSharedPtr rowset_to_remove;
auto s =
RowsetFactory::create_rowset(&(cloned_tablet_meta->tablet_schema()),
- tablet->tablet_path(), rs_meta_ptr, &rowset_to_remove);
+ tablet->tablet_path_desc().filepath, rs_meta_ptr, &rowset_to_remove);
if (s != OLAP_SUCCESS) {
LOG(WARNING) << "failed to init rowset to remove: "
<< rs_meta_ptr->rowset_id().to_string();
diff --git a/be/src/olap/task/engine_storage_migration_task.cpp b/be/src/olap/task/engine_storage_migration_task.cpp
index 7894423..9b2269b 100644
--- a/be/src/olap/task/engine_storage_migration_task.cpp
+++ b/be/src/olap/task/engine_storage_migration_task.cpp
@@ -91,10 +91,11 @@ OLAPStatus EngineStorageMigrationTask::_migrate() {
LOG(WARNING) << "fail to get shard from store: " << _dest_store->path();
break;
}
- std::stringstream root_path_stream;
- root_path_stream << _dest_store->path() << DATA_PREFIX << "/" << shard;
- string full_path = SnapshotManager::instance()->get_schema_hash_full_path(
- _tablet, root_path_stream.str());
+ FilePathDescStream root_path_desc_s;
+ root_path_desc_s << _dest_store->path_desc() << DATA_PREFIX << "/" << shard;
+ FilePathDesc full_path_desc = SnapshotManager::instance()->get_schema_hash_full_path(
+ _tablet, root_path_desc_s.path_desc());
+ string full_path = full_path_desc.filepath;
// if dir already exist then return err, it should not happen.
// should not remove the dir directly, for safety reason.
if (FileUtils::check_exist(full_path)) {
diff --git a/be/src/olap/utils.cpp b/be/src/olap/utils.cpp
index 8c8ab69..c37138e 100644
--- a/be/src/olap/utils.cpp
+++ b/be/src/olap/utils.cpp
@@ -656,64 +656,6 @@ int operator-(const BinarySearchIterator& left, const BinarySearchIterator& righ
return *left - *right;
}
-OLAPStatus copy_file(const string& src, const string& dest) {
- int src_fd = -1;
- int dest_fd = -1;
- char buf[1024 * 1024];
- OLAPStatus res = OLAP_SUCCESS;
-
- src_fd = ::open(src.c_str(), O_RDONLY);
- if (src_fd < 0) {
- char errmsg[64];
- LOG(WARNING) << "failed to open file. [err='" << strerror_r(errno, errmsg, 64)
- << "' file_name=" << src << "]";
- res = OLAP_ERR_FILE_NOT_EXIST;
- goto COPY_EXIT;
- }
-
- dest_fd = ::open(dest.c_str(), O_WRONLY | O_CREAT, S_IRUSR | S_IWUSR);
- if (dest_fd < 0) {
- char errmsg[64];
- LOG(WARNING) << "failed to open file to write. [err='" << strerror_r(errno, errmsg, 64)
- << "' file_name=" << dest << "]";
- res = OLAP_ERR_FILE_NOT_EXIST;
- goto COPY_EXIT;
- }
-
- while (true) {
- ssize_t rd_size = ::read(src_fd, buf, sizeof(buf));
- if (rd_size < 0) {
- OLAP_LOG_WARNING("failed to read from file. [err=%m file_name=%s fd=%d size=%ld]",
- src.c_str(), src_fd, rd_size);
- return OLAP_ERR_IO_ERROR;
- } else if (0 == rd_size) {
- break;
- }
-
- ssize_t wr_size = ::write(dest_fd, buf, rd_size);
- if (wr_size != rd_size) {
- OLAP_LOG_WARNING(
- "failed to write to file. [err=%m file_name=%s fd=%d rd_size=%ld "
- "wr_size=%ld]",
- dest.c_str(), dest_fd, rd_size, wr_size);
- res = OLAP_ERR_IO_ERROR;
- goto COPY_EXIT;
- }
- }
-
-COPY_EXIT:
- if (src_fd >= 0) {
- ::close(src_fd);
- }
-
- if (dest_fd >= 0) {
- ::close(dest_fd);
- }
-
- VLOG_NOTICE << "copy file success. [src=" << src << " dest=" << dest << "]";
-
- return res;
-}
OLAPStatus read_write_test_file(const string& test_file_path) {
if (access(test_file_path.c_str(), F_OK) == 0) {
if (remove(test_file_path.c_str()) != 0) {
@@ -801,64 +743,6 @@ bool check_datapath_rw(const string& path) {
return false;
}
-OLAPStatus copy_dir(const string& src_dir, const string& dst_dir) {
- std::filesystem::path src_path(src_dir);
- std::filesystem::path dst_path(dst_dir);
-
- try {
- // Check whether the function call is valid
- if (!std::filesystem::exists(src_path) || !std::filesystem::is_directory(src_path)) {
- OLAP_LOG_WARNING("Source dir not exist or is not a dir.[src_path=%s]",
- src_path.string().c_str());
- return OLAP_ERR_CREATE_FILE_ERROR;
- }
-
- if (std::filesystem::exists(dst_path)) {
- LOG(WARNING) << "Dst dir already exists.[dst_path=" << dst_path.string() << "]";
- return OLAP_ERR_CREATE_FILE_ERROR;
- }
-
- // Create the destination directory
- if (!std::filesystem::create_directory(dst_path)) {
- LOG(WARNING) << "Unable to create dst dir.[dst_path=" << dst_path.string() << "]";
- return OLAP_ERR_CREATE_FILE_ERROR;
- }
- } catch (...) {
- OLAP_LOG_WARNING("input invalid[src_path=%s dst_path=%s]", src_path.string().c_str(),
- dst_path.string().c_str());
- ;
- return OLAP_ERR_STL_ERROR;
- }
-
- // Iterate through the source directory
- for (std::filesystem::directory_iterator file(src_path);
- file != std::filesystem::directory_iterator(); ++file) {
- try {
- std::filesystem::path current = file->path();
- if (std::filesystem::is_directory(current)) {
- // Found directory: Recursion
- OLAPStatus res = OLAP_SUCCESS;
- if (OLAP_SUCCESS !=
- (res = copy_dir(current.string(), (dst_path / current.filename()).string()))) {
- OLAP_LOG_WARNING("Fail to copy file.[src_path=%s dst_path=%s res=%d]",
- src_path.string().c_str(), dst_path.string().c_str(), res);
- ;
- return OLAP_ERR_CREATE_FILE_ERROR;
- }
- } else {
- // Found file: Copy
- std::filesystem::copy_file(current, (dst_path / current.filename()).string());
- }
- } catch (...) {
- OLAP_LOG_WARNING("Fail to copy file.[src_path=%s dst_path=%s]",
- src_path.string().c_str(), dst_path.string().c_str());
- ;
- return OLAP_ERR_STL_ERROR;
- }
- }
- return OLAP_SUCCESS;
-}
-
__thread char Errno::_buf[BUF_SIZE]; ///< buffer instance
const char* Errno::str() {
diff --git a/be/src/olap/utils.h b/be/src/olap/utils.h
index a90f9c0..ca27805 100644
--- a/be/src/olap/utils.h
+++ b/be/src/olap/utils.h
@@ -196,10 +196,6 @@ int operator-(const BinarySearchIterator& left, const BinarySearchIterator& righ
// 不用sse4指令的crc32c的计算函数
unsigned int crc32c_lut(char const* b, unsigned int off, unsigned int len, unsigned int crc);
-OLAPStatus copy_file(const std::string& src, const std::string& dest);
-
-OLAPStatus copy_dir(const std::string& src_dir, const std::string& dst_dir);
-
bool check_datapath_rw(const std::string& path);
OLAPStatus read_write_test_file(const std::string& test_file_path);
diff --git a/be/src/runtime/snapshot_loader.cpp b/be/src/runtime/snapshot_loader.cpp
index e3f5f35..2309352 100644
--- a/be/src/runtime/snapshot_loader.cpp
+++ b/be/src/runtime/snapshot_loader.cpp
@@ -104,7 +104,7 @@ Status SnapshotLoader::upload(const std::map<std::string, std::string>& src_to_d
// 2.1 get existing files from remote path
std::map<std::string, FileStat> remote_files;
- RETURN_IF_ERROR(_storage_backend->list(dest_path, &remote_files));
+ RETURN_IF_ERROR(_storage_backend->list(dest_path, true, false, &remote_files));
for (auto& tmp : remote_files) {
VLOG_CRITICAL << "get remote file: " << tmp.first << ", checksum: " << tmp.second.md5;
@@ -217,7 +217,7 @@ Status SnapshotLoader::download(const std::map<std::string, std::string>& src_to
// 2.2. get remote files
std::map<std::string, FileStat> remote_files;
- RETURN_IF_ERROR(_storage_backend->list(remote_path, &remote_files));
+ RETURN_IF_ERROR(_storage_backend->list(remote_path, true, false, &remote_files));
if (remote_files.empty()) {
std::stringstream ss;
ss << "get nothing from remote path: " << remote_path;
@@ -360,8 +360,8 @@ Status SnapshotLoader::download(const std::map<std::string, std::string>& src_to
// MUST hold tablet's header lock, push lock, cumulative lock and base compaction lock
Status SnapshotLoader::move(const std::string& snapshot_path, TabletSharedPtr tablet,
bool overwrite) {
- std::string tablet_path = tablet->tablet_path();
- std::string store_path = tablet->data_dir()->path();
+ std::string tablet_path = tablet->tablet_path_desc().filepath;
+ std::string store_path = tablet->data_dir()->path_desc().filepath;
LOG(INFO) << "begin to move snapshot files. from: " << snapshot_path << ", to: " << tablet_path
<< ", store: " << store_path << ", job: " << _job_id << ", task id: " << _task_id;
diff --git a/be/src/service/backend_service.cpp b/be/src/service/backend_service.cpp
index 3aea427..05eac43 100644
--- a/be/src/service/backend_service.cpp
+++ b/be/src/service/backend_service.cpp
@@ -211,7 +211,7 @@ int64_t BackendService::get_trash_used_capacity() {
StorageEngine::instance()->get_all_data_dir_info(&data_dir_infos, false /*do not update */);
for (const auto& root_path_info : data_dir_infos) {
- std::string lhs_trash_path = root_path_info.path + TRASH_PREFIX;
+ std::string lhs_trash_path = root_path_info.path_desc.filepath + TRASH_PREFIX;
std::filesystem::path trash_path(lhs_trash_path);
result += StorageEngine::instance()->get_file_or_directory_size(trash_path);
}
@@ -225,11 +225,11 @@ void BackendService::get_disk_trash_used_capacity(std::vector<TDiskTrashInfo>& d
for (const auto& root_path_info : data_dir_infos) {
TDiskTrashInfo diskTrashInfo;
- diskTrashInfo.__set_root_path(root_path_info.path);
+ diskTrashInfo.__set_root_path(root_path_info.path_desc.filepath);
diskTrashInfo.__set_state(root_path_info.is_used ? "ONLINE" : "OFFLINE");
- std::string lhs_trash_path = root_path_info.path + TRASH_PREFIX;
+ std::string lhs_trash_path = root_path_info.path_desc.filepath + TRASH_PREFIX;
std::filesystem::path trash_path(lhs_trash_path);
diskTrashInfo.__set_trash_used_capacity(
StorageEngine::instance()->get_file_or_directory_size(trash_path));
diff --git a/be/src/service/doris_main.cpp b/be/src/service/doris_main.cpp
index 0b26fea..fba3669 100644
--- a/be/src/service/doris_main.cpp
+++ b/be/src/service/doris_main.cpp
@@ -40,6 +40,7 @@
#include "common/logging.h"
#include "common/resource_tls.h"
#include "common/status.h"
+#include "env/env.h"
#include "olap/options.h"
#include "olap/storage_engine.h"
#include "runtime/exec_env.h"
@@ -145,6 +146,11 @@ int main(int argc, char** argv) {
}
#endif
+ if (!doris::Env::init()) {
+ LOG(FATAL) << "init env failed.";
+ exit(-1);
+ }
+
std::vector<doris::StorePath> paths;
auto olap_res = doris::parse_conf_store_paths(doris::config::storage_root_path, &paths);
if (olap_res != doris::OLAP_SUCCESS) {
diff --git a/be/src/tools/meta_tool.cpp b/be/src/tools/meta_tool.cpp
index 39fe84b..4979ac4 100644
--- a/be/src/tools/meta_tool.cpp
+++ b/be/src/tools/meta_tool.cpp
@@ -160,7 +160,7 @@ Status init_data_dir(const std::string& dir, std::unique_ptr<DataDir>* ret) {
}
std::unique_ptr<DataDir> p(
- new (std::nothrow) DataDir(path.path, path.capacity_bytes, path.storage_medium));
+ new (std::nothrow) DataDir(path.path, path.capacity_bytes, path.storage_medium, path.remote_path));
if (p == nullptr) {
std::cout << "new data dir failed" << std::endl;
return Status::InternalError("new data dir failed");
@@ -266,7 +266,8 @@ Status get_segment_footer(RandomAccessFile* input_file, SegmentFooterPB* footer)
}
uint8_t fixed_buf[12];
- RETURN_IF_ERROR(input_file->read_at(file_size - 12, Slice(fixed_buf, 12)));
+ Slice slice(fixed_buf, 12);
+ RETURN_IF_ERROR(input_file->read_at(file_size - 12, &slice));
// validate magic number
const char* k_segment_magic = "D0R1";
@@ -284,7 +285,8 @@ Status get_segment_footer(RandomAccessFile* input_file, SegmentFooterPB* footer)
}
std::string footer_buf;
footer_buf.resize(footer_length);
- RETURN_IF_ERROR(input_file->read_at(file_size - 12 - footer_length, footer_buf));
+ Slice slice2(footer_buf);
+ RETURN_IF_ERROR(input_file->read_at(file_size - 12 - footer_length, &slice2));
// validate footer PB's checksum
uint32_t expect_checksum = doris::decode_fixed32_le(fixed_buf + 4);
diff --git a/be/src/util/broker_storage_backend.cpp b/be/src/util/broker_storage_backend.cpp
index 1b5dd0e..05619c2 100644
--- a/be/src/util/broker_storage_backend.cpp
+++ b/be/src/util/broker_storage_backend.cpp
@@ -96,6 +96,10 @@ Status BrokerStorageBackend::download(const std::string& remote, const std::stri
return Status::OK();
}
+Status BrokerStorageBackend::direct_download(const std::string& remote, std::string* content) {
+ return Status::IOError("broker direct_download not support ");
+}
+
Status BrokerStorageBackend::upload(const std::string& local, const std::string& remote) {
// read file and write to broker
FileHandler file_handler;
@@ -188,8 +192,12 @@ Status BrokerStorageBackend::rename(const std::string& orig_name, const std::str
return status;
}
-Status BrokerStorageBackend::list(const std::string& remote_path,
- std::map<std::string, FileStat>* files) {
+Status BrokerStorageBackend::rename_dir(const std::string& orig_name, const std::string& new_name) {
+ return rename(orig_name, new_name);
+}
+
+Status BrokerStorageBackend::list(const std::string& remote_path, bool contain_md5,
+ bool recursion, std::map<std::string, FileStat>* files) {
Status status = Status::OK();
BrokerServiceConnection client(client_cache(_env), _broker_addr, config::thrift_rpc_timeout_ms,
&status);
@@ -317,14 +325,26 @@ Status BrokerStorageBackend::rm(const std::string& remote) {
}
}
+Status BrokerStorageBackend::rmdir(const std::string& remote) {
+ return rm(remote);
+}
+
Status BrokerStorageBackend::copy(const std::string& src, const std::string& dst) {
return Status::NotSupported("copy not implemented!");
}
+Status BrokerStorageBackend::copy_dir(const std::string& src, const std::string& dst) {
+ return copy(src, dst);
+}
+
Status BrokerStorageBackend::mkdir(const std::string& path) {
return Status::NotSupported("mkdir not implemented!");
}
+Status BrokerStorageBackend::mkdirs(const std::string& path) {
+ return Status::NotSupported("mkdirs not implemented!");
+}
+
Status BrokerStorageBackend::exist(const std::string& path) {
Status status = Status::OK();
BrokerServiceConnection client(client_cache(_env), _broker_addr, config::thrift_rpc_timeout_ms,
@@ -368,6 +388,10 @@ Status BrokerStorageBackend::exist(const std::string& path) {
}
}
+Status BrokerStorageBackend::exist_dir(const std::string& path) {
+ return exist(path);
+}
+
Status BrokerStorageBackend::upload_with_checksum(const std::string& local,
const std::string& remote,
const std::string& checksum) {
diff --git a/be/src/util/broker_storage_backend.h b/be/src/util/broker_storage_backend.h
index 1dcfc10..8c011eb 100644
--- a/be/src/util/broker_storage_backend.h
+++ b/be/src/util/broker_storage_backend.h
@@ -30,16 +30,23 @@ public:
const std::map<std::string, std::string>& broker_prop);
~BrokerStorageBackend() {}
Status download(const std::string& remote, const std::string& local) override;
+ Status direct_download(const std::string& remote, std::string* content) override;
Status upload(const std::string& local, const std::string& remote) override;
Status upload_with_checksum(const std::string& local, const std::string& remote,
const std::string& checksum) override;
Status rename(const std::string& orig_name, const std::string& new_name) override;
- Status list(const std::string& remote_path, std::map<std::string, FileStat>* files) override;
+ Status rename_dir(const std::string& orig_name, const std::string& new_name) override;
+ Status list(const std::string& remote_path, bool contain_md5,
+ bool recursion, std::map<std::string, FileStat>* files) override;
Status direct_upload(const std::string& remote, const std::string& content) override;
Status rm(const std::string& remote) override;
+ Status rmdir(const std::string& remote) override;
Status copy(const std::string& src, const std::string& dst) override;
+ Status copy_dir(const std::string& src, const std::string& dst) override;
Status mkdir(const std::string& path) override;
+ Status mkdirs(const std::string& path) override;
Status exist(const std::string& path) override;
+ Status exist_dir(const std::string& path) override;
private:
ExecEnv* _env;
diff --git a/be/src/util/coding.cpp b/be/src/util/coding.cpp
index 68237fa..e30df6a 100644
--- a/be/src/util/coding.cpp
+++ b/be/src/util/coding.cpp
@@ -2,7 +2,6 @@
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
-//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
diff --git a/be/src/util/coding.h b/be/src/util/coding.h
index c47070d..3cfa8e7 100644
--- a/be/src/util/coding.h
+++ b/be/src/util/coding.h
@@ -2,7 +2,6 @@
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
-//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
diff --git a/be/src/util/file_utils.cpp b/be/src/util/file_utils.cpp
index fd934b5..dd31a04 100644
--- a/be/src/util/file_utils.cpp
+++ b/be/src/util/file_utils.cpp
@@ -41,42 +41,7 @@ namespace doris {
using strings::Substitute;
Status FileUtils::create_dir(const std::string& path, Env* env) {
- if (path.empty()) {
- return Status::InvalidArgument(strings::Substitute("Unknown primitive type($0)", path));
- }
-
- std::filesystem::path p(path);
-
- std::string partial_path;
- for (std::filesystem::path::iterator it = p.begin(); it != p.end(); ++it) {
- partial_path = partial_path + it->string() + "/";
- bool is_dir = false;
-
- Status s = env->is_directory(partial_path, &is_dir);
-
- if (s.ok()) {
- if (is_dir) {
- // It's a normal directory.
- continue;
- }
-
- // Maybe a file or a symlink. Let's try to follow the symlink.
- std::string real_partial_path;
- RETURN_IF_ERROR(env->canonicalize(partial_path, &real_partial_path));
-
- RETURN_IF_ERROR(env->is_directory(real_partial_path, &is_dir));
- if (is_dir) {
- // It's a symlink to a directory.
- continue;
- } else {
- return Status::IOError(partial_path + " exists but is not a directory");
- }
- }
-
- RETURN_IF_ERROR(env->create_dir_if_missing(partial_path));
- }
-
- return Status::OK();
+ return env->create_dirs(path);
}
Status FileUtils::create_dir(const std::string& dir_path) {
@@ -84,32 +49,29 @@ Status FileUtils::create_dir(const std::string& dir_path) {
}
Status FileUtils::remove_all(const std::string& file_path) {
- std::filesystem::path boost_path(file_path);
- std::error_code ec;
- std::filesystem::remove_all(boost_path, ec);
- if (ec) {
- std::stringstream ss;
- ss << "remove all(" << file_path << ") failed, because: " << ec;
- return Status::InternalError(ss.str());
- }
- return Status::OK();
+ return Env::Default()->delete_dir(file_path);
+}
+
+Status FileUtils::remove_all(const std::string& path, TStorageMedium::type storage_medium) {
+ Env* env = Env::get_env(storage_medium);
+ return env->delete_dir(path);
}
-Status FileUtils::remove(const std::string& path, doris::Env* env) {
+Status FileUtils::remove(const std::string& path) {
+ if (!Env::Default()->path_exists(path).ok()) {
+ LOG(WARNING) << "path does exist: " << path;
+ return Status::OK();
+ }
bool is_dir;
- RETURN_IF_ERROR(env->is_directory(path, &is_dir));
+ RETURN_IF_ERROR(Env::Default()->is_directory(path, &is_dir));
if (is_dir) {
- return env->delete_dir(path);
+ return Env::Default()->delete_dir(path);
} else {
- return env->delete_file(path);
+ return Env::Default()->delete_file(path);
}
}
-Status FileUtils::remove(const std::string& path) {
- return remove(path, Env::Default());
-}
-
Status FileUtils::remove_paths(const std::vector<std::string>& paths) {
for (const std::string& p : paths) {
RETURN_IF_ERROR(remove(p));
@@ -224,39 +186,7 @@ Status FileUtils::split_paths(const char* path, std::vector<std::string>* path_v
}
Status FileUtils::copy_file(const std::string& src_path, const std::string& dest_path) {
- // open src file
- FileHandler src_file;
- if (src_file.open(src_path.c_str(), O_RDONLY) != OLAP_SUCCESS) {
- char errmsg[64];
- LOG(ERROR) << "open file failed: " << src_path << strerror_r(errno, errmsg, 64);
- return Status::InternalError("Internal Error");
- }
- // create dest file and overwrite existing file
- FileHandler dest_file;
- if (dest_file.open_with_mode(dest_path.c_str(), O_CREAT | O_TRUNC | O_WRONLY, S_IRWXU) !=
- OLAP_SUCCESS) {
- char errmsg[64];
- LOG(ERROR) << "open file failed: " << dest_path << strerror_r(errno, errmsg, 64);
- return Status::InternalError("Internal Error");
- }
-
- const int64_t BUF_SIZE = 8192;
- std::unique_ptr<char[]> buf = std::make_unique<char[]>(BUF_SIZE);
- int64_t src_length = src_file.length();
- int64_t offset = 0;
- while (src_length > 0) {
- int64_t to_read = BUF_SIZE < src_length ? BUF_SIZE : src_length;
- if (OLAP_SUCCESS != (src_file.pread(buf.get(), to_read, offset))) {
- return Status::InternalError("Internal Error");
- }
- if (OLAP_SUCCESS != (dest_file.pwrite(buf.get(), to_read, offset))) {
- return Status::InternalError("Internal Error");
- }
-
- offset += to_read;
- src_length -= to_read;
- }
- return Status::OK();
+ return Env::Default()->copy_path(src_path, dest_path);
}
Status FileUtils::md5sum(const std::string& file, std::string* md5sum) {
@@ -291,10 +221,6 @@ bool FileUtils::check_exist(const std::string& path) {
return Env::Default()->path_exists(path).ok();
}
-bool FileUtils::check_exist(const std::string& path, Env* env) {
- return env->path_exists(path).ok();
-}
-
Status FileUtils::canonicalize(const std::string& path, std::string* real_path) {
return Env::Default()->canonicalize(path, real_path);
}
diff --git a/be/src/util/file_utils.h b/be/src/util/file_utils.h
index d0d8e8a..0e56ec8 100644
--- a/be/src/util/file_utils.h
+++ b/be/src/util/file_utils.h
@@ -23,6 +23,7 @@
#include <vector>
#include "common/status.h"
+#include "gen_cpp/Types_types.h"
namespace doris {
@@ -52,10 +53,9 @@ public:
static Status create_dir(const std::string& dir_path, Env* env);
// Delete file recursively.
- static Status remove_all(const std::string& dir_path);
+ static Status remove_all(const std::string& dir_path, TStorageMedium::type store);
- // Delete dir or file, failed when there are files or dirs under the path
- static Status remove(const std::string& path, Env* env);
+ static Status remove_all(const std::string& dir_path);
static Status remove(const std::string& path);
@@ -102,9 +102,6 @@ public:
// check path(file or directory) exist with default env
static bool check_exist(const std::string& path);
- // check path(file or directory) exist with env
- static bool check_exist(const std::string& path, Env* env);
-
// Canonicalize 'path' by applying the following conversions:
// - Converts a relative path into an absolute one using the cwd.
// - Converts '.' and '..' references.
diff --git a/be/src/util/path_util.cpp b/be/src/util/path_util.cpp
index a04f664..3de2f10 100644
--- a/be/src/util/path_util.cpp
+++ b/be/src/util/path_util.cpp
@@ -47,6 +47,13 @@ std::string join_path_segments(const string& a, const string& b) {
}
}
+FilePathDesc join_path_desc_segments(const FilePathDesc& path_desc, const string& b) {
+ FilePathDesc seg_path_desc = path_desc;
+ seg_path_desc.filepath = join_path_segments(path_desc.filepath, b);
+ seg_path_desc.remote_path = join_path_segments(path_desc.remote_path, b);
+ return seg_path_desc;
+}
+
std::vector<string> join_path_segments_v(const std::vector<string>& v, const string& s) {
std::vector<string> out;
for (const string& path : v) {
diff --git a/be/src/util/path_util.h b/be/src/util/path_util.h
index 1376d2c..c7389d0 100644
--- a/be/src/util/path_util.h
+++ b/be/src/util/path_util.h
@@ -21,6 +21,8 @@
#include <string>
#include <vector>
+#include "env/env.h"
+
namespace doris {
namespace path_util {
@@ -36,6 +38,8 @@ std::string join_path_segments(const std::string& a, const std::string& b);
std::vector<std::string> join_path_segments_v(const std::vector<std::string>& v,
const std::string& s);
+FilePathDesc join_path_desc_segments(const FilePathDesc& path_desc, const std::string& b);
+
// Split a path into segments with the appropriate path separator.
std::vector<std::string> split_path(const std::string& path);
diff --git a/be/src/util/s3_storage_backend.cpp b/be/src/util/s3_storage_backend.cpp
index 5c786a9..95ab66b 100644
--- a/be/src/util/s3_storage_backend.cpp
+++ b/be/src/util/s3_storage_backend.cpp
@@ -24,9 +24,12 @@
#include <aws/s3/model/HeadObjectRequest.h>
#include <aws/s3/model/ListObjectsRequest.h>
#include <aws/s3/model/PutObjectRequest.h>
+#include <boost/algorithm/string.hpp>
+#include <filesystem>
#include <fstream>
#include <iostream>
+#include <sstream>
#include "common/logging.h"
#include "gutil/strings/strip.h"
@@ -88,6 +91,22 @@ Status S3StorageBackend::download(const std::string& remote, const std::string&
return Status::OK();
}
+Status S3StorageBackend::direct_download(const std::string& remote, std::string* content) {
+ CHECK_S3_CLIENT(_client);
+ CHECK_S3_PATH(uri, remote);
+ Aws::S3::Model::GetObjectRequest request;
+ request.WithBucket(uri.get_bucket()).WithKey(uri.get_key());
+ Aws::S3::Model::GetObjectOutcome response = _client->GetObject(request);
+ if (response.IsSuccess()) {
+ std::stringstream ss;
+ ss << response.GetResult().GetBody().rdbuf();
+ *content = ss.str();
+ } else {
+ return Status::IOError("s3 direct_download error: " + error_msg(response));
+ }
+ return Status::OK();
+}
+
Status S3StorageBackend::upload(const std::string& local, const std::string& remote) {
CHECK_S3_CLIENT(_client);
CHECK_S3_PATH(uri, remote);
@@ -107,8 +126,8 @@ Status S3StorageBackend::upload(const std::string& local, const std::string& rem
RETRUN_S3_STATUS(response);
}
-Status S3StorageBackend::list(const std::string& remote_path,
- std::map<std::string, FileStat>* files) {
+Status S3StorageBackend::list(const std::string& remote_path, bool contain_md5,
+ bool recursion, std::map<std::string, FileStat>* files) {
std::string normal_str(remote_path);
if (!normal_str.empty() && normal_str.at(normal_str.size() - 1) != '/') {
normal_str += '/';
@@ -117,7 +136,10 @@ Status S3StorageBackend::list(const std::string& remote_path,
CHECK_S3_PATH(uri, normal_str);
Aws::S3::Model::ListObjectsRequest request;
- request.WithBucket(uri.get_bucket()).WithPrefix(uri.get_key()).WithDelimiter("/");
+ request.WithBucket(uri.get_bucket()).WithPrefix(uri.get_key());
+ if (!recursion) {
+ request.WithDelimiter("/");
+ }
Aws::S3::Model::ListObjectsOutcome response = _client->ListObjects(request);
if (response.IsSuccess()) {
Aws::Vector<Aws::S3::Model::Object> objects = response.GetResult().GetContents();
@@ -127,17 +149,19 @@ Status S3StorageBackend::list(const std::string& remote_path,
if (key.at(key.size() - 1) == '/') {
continue;
}
- size_t pos = key.find_last_of("/");
- if (pos != std::string::npos && pos != key.size() - 1) {
- key = std::string(key, pos + 1);
+ key = boost::algorithm::replace_first_copy(key, uri.get_key(), "");
+ if (contain_md5) {
+ size_t pos = key.find_last_of(".");
+ if (pos == std::string::npos || pos == key.size() - 1) {
+ // Not found checksum separator, ignore this file
+ continue;
+ }
+ FileStat stat = {std::string(key, 0, pos), std::string(key, pos + 1), object.GetSize()};
+ files->emplace(std::string(key, 0, pos), stat);
+ } else {
+ FileStat stat = {key, "", object.GetSize()};
+ files->emplace(key, stat);
}
- pos = key.find_last_of(".");
- if (pos == std::string::npos || pos == key.size() - 1) {
- // Not found checksum separator, ignore this file
- continue;
- }
- FileStat stat = {std::string(key, 0, pos), std::string(key, pos + 1), object.GetSize()};
- files->emplace(std::string(key, 0, pos), stat);
}
return Status::OK();
} else {
@@ -150,6 +174,11 @@ Status S3StorageBackend::rename(const std::string& orig_name, const std::string&
return rm(orig_name);
}
+Status S3StorageBackend::rename_dir(const std::string& orig_name, const std::string& new_name) {
+ RETURN_IF_ERROR(copy_dir(orig_name, new_name));
+ return rmdir(orig_name);
+}
+
Status S3StorageBackend::direct_upload(const std::string& remote, const std::string& content) {
CHECK_S3_CLIENT(_client);
CHECK_S3_PATH(uri, remote);
@@ -181,6 +210,23 @@ Status S3StorageBackend::rm(const std::string& remote) {
RETRUN_S3_STATUS(response);
}
+Status S3StorageBackend::rmdir(const std::string& remote) {
+ CHECK_S3_CLIENT(_client);
+ std::map<std::string, FileStat> files;
+ std::string normal_path(remote);
+ if (!normal_path.empty() && normal_path.at(normal_path.size() - 1) != '/') {
+ normal_path += '/';
+ }
+ LOG(INFO) << "Remove S3 dir: " << remote;
+ RETURN_IF_ERROR(list(normal_path, false, true, &files));
+
+ for (auto &file : files) {
+ std::string file_path = normal_path + file.second.name;
+ RETURN_IF_ERROR(rm(file_path));
+ }
+ return Status::OK();
+}
+
Status S3StorageBackend::copy(const std::string& src, const std::string& dst) {
CHECK_S3_CLIENT(_client);
CHECK_S3_PATH(src_uri, src);
@@ -195,17 +241,28 @@ Status S3StorageBackend::copy(const std::string& src, const std::string& dst) {
RETRUN_S3_STATUS(response);
}
-Status S3StorageBackend::mkdir(const std::string& path) {
- std::string normal_str(path);
- if (!normal_str.empty() && normal_str.at(normal_str.size() - 1) != '/') {
- normal_str += '/';
+Status S3StorageBackend::copy_dir(const std::string& src, const std::string& dst) {
+ std::map<std::string, FileStat> files;
+ LOG(INFO) << "Copy S3 dir: " << src << " -> " << dst;
+ RETURN_IF_ERROR(list(src, false, true, &files));
+ if (files.size() == 0) {
+ LOG(WARNING) << "Nothing need to copy: " << src << " -> " << dst;
+ return Status::OK();
}
- CHECK_S3_CLIENT(_client);
- CHECK_S3_PATH(uri, normal_str);
- Aws::S3::Model::PutObjectRequest request;
- request.WithBucket(uri.get_bucket()).WithKey(uri.get_key()).WithContentLength(0);
- Aws::S3::Model::PutObjectOutcome response = _client->PutObject(request);
- RETRUN_S3_STATUS(response);
+ for (auto &kv : files) {
+ RETURN_IF_ERROR(copy(src + "/" + kv.first, dst + "/" + kv.first));
+ }
+ return Status::OK();
+}
+
+// dir is not supported by s3, it will cause something confusion.
+Status S3StorageBackend::mkdir(const std::string& path) {
+ return Status::OK();
+}
+
+// dir is not supported by s3, it will cause something confusion.
+Status S3StorageBackend::mkdirs(const std::string& path) {
+ return Status::OK();
}
Status S3StorageBackend::exist(const std::string& path) {
@@ -223,6 +280,15 @@ Status S3StorageBackend::exist(const std::string& path) {
}
}
+Status S3StorageBackend::exist_dir(const std::string& path) {
+ std::map<std::string, FileStat> files;
+ RETURN_IF_ERROR(list(path, false, true, &files));
+ if (files.size() > 0) {
+ return Status::OK();
+ }
+ return Status::NotFound(path + " not exists!");
+}
+
Status S3StorageBackend::upload_with_checksum(const std::string& local, const std::string& remote,
const std::string& checksum) {
return upload(local, remote + "." + checksum);
diff --git a/be/src/util/s3_storage_backend.h b/be/src/util/s3_storage_backend.h
index 4518227..2850adf 100644
--- a/be/src/util/s3_storage_backend.h
+++ b/be/src/util/s3_storage_backend.h
@@ -32,16 +32,23 @@ public:
S3StorageBackend(const std::map<std::string, std::string>& prop);
~S3StorageBackend();
Status download(const std::string& remote, const std::string& local) override;
+ Status direct_download(const std::string& remote, std::string* content) override;
Status upload(const std::string& local, const std::string& remote) override;
Status upload_with_checksum(const std::string& local, const std::string& remote,
const std::string& checksum) override;
- Status list(const std::string& remote_path, std::map<std::string, FileStat>* files) override;
+ Status list(const std::string& remote_path, bool contain_md5,
+ bool recursion, std::map<std::string, FileStat>* files) override;
Status rename(const std::string& orig_name, const std::string& new_name) override;
+ Status rename_dir(const std::string& orig_name, const std::string& new_name) override;
Status direct_upload(const std::string& remote, const std::string& content) override;
Status rm(const std::string& remote) override;
+ Status rmdir(const std::string& remote) override;
Status copy(const std::string& src, const std::string& dst) override;
+ Status copy_dir(const std::string& src, const std::string& dst) override;
Status mkdir(const std::string& path) override;
+ Status mkdirs(const std::string& path) override;
Status exist(const std::string& path) override;
+ Status exist_dir(const std::string& path) override;
private:
template <typename AwsOutcome>
diff --git a/be/src/util/s3_util.cpp b/be/src/util/s3_util.cpp
index 160b5f4..58fdc46 100644
--- a/be/src/util/s3_util.cpp
+++ b/be/src/util/s3_util.cpp
@@ -26,10 +26,6 @@
namespace doris {
-const static std::string S3_AK = "AWS_ACCESS_KEY";
-const static std::string S3_SK = "AWS_SECRET_KEY";
-const static std::string S3_ENDPOINT = "AWS_ENDPOINT";
-const static std::string S3_REGION = "AWS_REGION";
const static std::string USE_PATH_STYLE = "use_path_style";
ClientFactory::ClientFactory() {
@@ -52,6 +48,18 @@ ClientFactory& ClientFactory::instance() {
return ret;
}
+bool ClientFactory::is_s3_conf_valid(const std::map<std::string, std::string>& prop) {
+ StringCaseMap<std::string> properties(prop.begin(), prop.end());
+ if (properties.find(S3_AK) == properties.end() || properties.find(S3_SK) == properties.end() ||
+ properties.find(S3_ENDPOINT) == properties.end() ||
+ properties.find(S3_REGION) == properties.end()) {
+ DCHECK(false) << "aws properties is incorrect.";
+ LOG(ERROR) << "aws properties is incorrect.";
+ return false;
+ }
+ return true;
+}
+
std::shared_ptr<Aws::S3::S3Client> ClientFactory::create(
const std::map<std::string, std::string>& prop) {
StringCaseMap<std::string> properties(prop.begin(), prop.end());
@@ -68,6 +76,15 @@ std::shared_ptr<Aws::S3::S3Client> ClientFactory::create(
Aws::Client::ClientConfiguration aws_config;
aws_config.endpointOverride = properties.find(S3_ENDPOINT)->second;
aws_config.region = properties.find(S3_REGION)->second;
+ if (properties.find(S3_MAX_CONN_SIZE) != properties.end()) {
+ aws_config.maxConnections = std::atoi(properties.find(S3_MAX_CONN_SIZE)->second.c_str());
+ }
+ if (properties.find(S3_REQUEST_TIMEOUT_MS) != properties.end()) {
+ aws_config.requestTimeoutMs = std::atoi(properties.find(S3_REQUEST_TIMEOUT_MS)->second.c_str());
+ }
+ if (properties.find(S3_CONN_TIMEOUT_MS) != properties.end()) {
+ aws_config.connectTimeoutMs = std::atoi(properties.find(S3_CONN_TIMEOUT_MS)->second.c_str());
+ }
// See https://sdk.amazonaws.com/cpp/api/LATEST/class_aws_1_1_s3_1_1_s3_client.html
bool use_virtual_addressing = true;
diff --git a/be/src/util/s3_util.h b/be/src/util/s3_util.h
index 5fe383c..e4baa97 100644
--- a/be/src/util/s3_util.h
+++ b/be/src/util/s3_util.h
@@ -30,6 +30,15 @@ class S3Client;
} // namespace Aws
namespace doris {
+
+const static std::string S3_AK = "AWS_ACCESS_KEY";
+const static std::string S3_SK = "AWS_SECRET_KEY";
+const static std::string S3_ENDPOINT = "AWS_ENDPOINT";
+const static std::string S3_REGION = "AWS_REGION";
+const static std::string S3_MAX_CONN_SIZE = "AWS_MAX_CONN_SIZE";
+const static std::string S3_REQUEST_TIMEOUT_MS = "AWS_REQUEST_TIMEOUT_MS";
+const static std::string S3_CONN_TIMEOUT_MS = "AWS_CONN_TIMEOUT_MS";
+
class ClientFactory {
public:
~ClientFactory();
@@ -38,6 +47,7 @@ public:
std::shared_ptr<Aws::S3::S3Client> create(const std::map<std::string, std::string>& prop);
+ static bool is_s3_conf_valid(const std::map<std::string, std::string>& prop);
private:
ClientFactory();
diff --git a/be/src/util/storage_backend.h b/be/src/util/storage_backend.h
index 131131c..a05acb2 100644
--- a/be/src/util/storage_backend.h
+++ b/be/src/util/storage_backend.h
@@ -30,16 +30,23 @@ struct FileStat {
class StorageBackend {
public:
virtual Status download(const std::string& remote, const std::string& local) = 0;
+ virtual Status direct_download(const std::string& remote, std::string* content) = 0;
virtual Status upload(const std::string& local, const std::string& remote) = 0;
virtual Status upload_with_checksum(const std::string& local, const std::string& remote,
const std::string& checksum) = 0;
- virtual Status list(const std::string& remote_path, std::map<std::string, FileStat>* files) = 0;
+ virtual Status list(const std::string& remote_path, bool contain_md5,
+ bool recursion, std::map<std::string, FileStat>* files) = 0;
virtual Status rename(const std::string& orig_name, const std::string& new_name) = 0;
+ virtual Status rename_dir(const std::string& orig_name, const std::string& new_name) = 0;
virtual Status direct_upload(const std::string& remote, const std::string& content) = 0;
virtual Status copy(const std::string& src, const std::string& dst) = 0;
+ virtual Status copy_dir(const std::string& src, const std::string& dst) = 0;
virtual Status rm(const std::string& remote) = 0;
+ virtual Status rmdir(const std::string& remote) = 0;
virtual Status mkdir(const std::string& path) = 0;
+ virtual Status mkdirs(const std::string& path) = 0;
virtual Status exist(const std::string& path) = 0;
+ virtual Status exist_dir(const std::string& path) = 0;
virtual ~StorageBackend() = default;
};
diff --git a/be/test/env/env_posix_test.cpp b/be/test/env/env_posix_test.cpp
index 64ee0e8..e2124f3 100644
--- a/be/test/env/env_posix_test.cpp
+++ b/be/test/env/env_posix_test.cpp
@@ -36,6 +36,17 @@ public:
void TearDown() override {}
};
+TEST_F(EnvPosixTest, file_path_desc) {
+ FilePathDesc path_desc("/local");
+ path_desc.storage_medium = TStorageMedium::S3;
+ path_desc.remote_path = "/remote";
+ FilePathDescStream path_desc_stream;
+ path_desc_stream << path_desc << "/test" << "/" << 1;
+ FilePathDesc dest_path_desc = path_desc_stream.path_desc();
+ ASSERT_EQ("/local/test/1", dest_path_desc.filepath);
+ ASSERT_EQ("/remote/test/1", dest_path_desc.remote_path);
+}
+
TEST_F(EnvPosixTest, random_access) {
std::string fname = "./ut_dir/env_posix/random_access";
WritableFileOptions ops;
@@ -90,12 +101,12 @@ TEST_F(EnvPosixTest, random_access) {
ASSERT_STREQ("abc", std::string(slice3.data, slice3.size).c_str());
Slice slice4(mem, 3);
- st = rfile->read_at(112, slice4);
+ st = rfile->read_at(112, &slice4);
ASSERT_TRUE(st.ok());
ASSERT_STREQ("bcd", std::string(slice4.data, slice4.size).c_str());
// end of file
- st = rfile->read_at(114, slice4);
+ st = rfile->read_at(114, &slice4);
ASSERT_EQ(TStatusCode::END_OF_FILE, st.code());
LOG(INFO) << "st=" << st.to_string();
}
@@ -160,11 +171,11 @@ TEST_F(EnvPosixTest, random_rw) {
ASSERT_STREQ("789", std::string(slice3.data, slice3.size).c_str());
Slice slice4(mem, 100);
- st = rfile->read_at(9, slice4);
+ st = rfile->read_at(9, &slice4);
ASSERT_TRUE(st.ok());
// end of file
- st = rfile->read_at(102, slice4);
+ st = rfile->read_at(102, &slice4);
ASSERT_EQ(TStatusCode::END_OF_FILE, st.code());
LOG(INFO) << "st=" << st.to_string();
}
diff --git a/be/test/olap/cumulative_compaction_policy_test.cpp b/be/test/olap/cumulative_compaction_policy_test.cpp
index bc61071..0d9803b 100644
--- a/be/test/olap/cumulative_compaction_policy_test.cpp
+++ b/be/test/olap/cumulative_compaction_policy_test.cpp
@@ -33,7 +33,7 @@ public:
void SetUp() {
_tablet_meta = static_cast<TabletMetaSharedPtr>(
new TabletMeta(1, 2, 15673, 4, 5, TTabletSchema(), 6, {{7, 8}}, UniqueId(9, 10),
- TTabletType::TABLET_TYPE_DISK));
+ TTabletType::TABLET_TYPE_DISK, TStorageMedium::HDD));
_json_rowset_meta = R"({
"rowset_id": 540081,
@@ -323,7 +323,7 @@ public:
_tablet_meta = static_cast<TabletMetaSharedPtr>(
new TabletMeta(1, 2, 15673, 4, 5, TTabletSchema(), 6, {{7, 8}}, UniqueId(9, 10),
- TTabletType::TABLET_TYPE_DISK));
+ TTabletType::TABLET_TYPE_DISK, TStorageMedium::HDD));
_json_rowset_meta = R"({
"rowset_id": 540081,
diff --git a/be/test/olap/delete_handler_test.cpp b/be/test/olap/delete_handler_test.cpp
index 22d7fe5..f050fb6 100644
--- a/be/test/olap/delete_handler_test.cpp
+++ b/be/test/olap/delete_handler_test.cpp
@@ -259,16 +259,16 @@ protected:
ASSERT_EQ(OLAP_SUCCESS, res);
tablet = k_engine->tablet_manager()->get_tablet(_create_tablet.tablet_id,
_create_tablet.tablet_schema.schema_hash);
- ASSERT_TRUE(tablet.get() != nullptr);
- _tablet_path = tablet->tablet_path();
+ ASSERT_TRUE(tablet.get() != NULL);
+ _tablet_path = tablet->tablet_path_desc().filepath;
set_create_duplicate_tablet_request(&_create_dup_tablet);
res = k_engine->create_tablet(_create_dup_tablet);
ASSERT_EQ(OLAP_SUCCESS, res);
dup_tablet = k_engine->tablet_manager()->get_tablet(
_create_dup_tablet.tablet_id, _create_dup_tablet.tablet_schema.schema_hash);
- ASSERT_TRUE(dup_tablet.get() != nullptr);
- _dup_tablet_path = tablet->tablet_path();
+ ASSERT_TRUE(dup_tablet.get() != NULL);
+ _dup_tablet_path = tablet->tablet_path_desc().filepath;
}
void TearDown() {
@@ -431,7 +431,7 @@ protected:
tablet = k_engine->tablet_manager()->get_tablet(_create_tablet.tablet_id,
_create_tablet.tablet_schema.schema_hash);
ASSERT_TRUE(tablet.get() != nullptr);
- _tablet_path = tablet->tablet_path();
+ _tablet_path = tablet->tablet_path_desc().filepath;
}
void TearDown() {
@@ -798,7 +798,7 @@ protected:
tablet = k_engine->tablet_manager()->get_tablet(_create_tablet.tablet_id,
_create_tablet.tablet_schema.schema_hash);
ASSERT_TRUE(tablet != nullptr);
- _tablet_path = tablet->tablet_path();
+ _tablet_path = tablet->tablet_path_desc().filepath;
_data_row_cursor.init(tablet->tablet_schema());
_data_row_cursor.allocate_memory_for_string_type(tablet->tablet_schema());
diff --git a/be/test/olap/file_utils_test.cpp b/be/test/olap/file_utils_test.cpp
index d2c744e..fbcc32a 100644
--- a/be/test/olap/file_utils_test.cpp
+++ b/be/test/olap/file_utils_test.cpp
@@ -115,10 +115,6 @@ TEST_F(FileUtilsTest, TestRemove) {
ASSERT_TRUE(FileUtils::create_dir("./file_test/abc/123").ok());
save_string_file("./file_test/abc/123/s2", "123");
- ASSERT_FALSE(FileUtils::remove("./file_test").ok());
- ASSERT_FALSE(FileUtils::remove("./file_test/abc/").ok());
- ASSERT_FALSE(FileUtils::remove("./file_test/abc/123").ok());
-
ASSERT_TRUE(FileUtils::check_exist("./file_test/abc/123/s2"));
ASSERT_TRUE(FileUtils::remove("./file_test/abc/123/s2").ok());
ASSERT_FALSE(FileUtils::check_exist("./file_test/abc/123/s2"));
@@ -149,9 +145,9 @@ TEST_F(FileUtilsTest, TestRemove) {
ps.push_back("./file_test/s1");
ps.push_back("./file_test/abc/def");
- ASSERT_FALSE(FileUtils::remove_paths(ps).ok());
+ ASSERT_TRUE(FileUtils::remove_paths(ps).ok());
ASSERT_FALSE(FileUtils::check_exist("./file_test/s1"));
- ASSERT_TRUE(FileUtils::check_exist("./file_test/abc/def/"));
+ ASSERT_FALSE(FileUtils::check_exist("./file_test/abc/def/"));
ps.clear();
ps.push_back("./file_test/abc/def/zxc");
diff --git a/be/test/olap/fs/file_block_manager_test.cpp b/be/test/olap/fs/file_block_manager_test.cpp
index 65cecec..86fb6d9 100644
--- a/be/test/olap/fs/file_block_manager_test.cpp
+++ b/be/test/olap/fs/file_block_manager_test.cpp
@@ -56,7 +56,7 @@ TEST_F(FileBlockManagerTest, NormalTest) {
std::unique_ptr<fs::WritableBlock> wblock;
std::string fname = kBlockManagerDir + "/test_file";
- fs::CreateBlockOptions wblock_opts({fname});
+ fs::CreateBlockOptions wblock_opts(fname);
Status st = fbm->create_block(wblock_opts, &wblock);
ASSERT_TRUE(st.ok()) << st.get_error_msg();
@@ -64,8 +64,10 @@ TEST_F(FileBlockManagerTest, NormalTest) {
wblock->append(data);
wblock->close();
+ FilePathDesc path_desc;
+ path_desc.filepath = fname;
std::unique_ptr<fs::ReadableBlock> rblock;
- st = fbm->open_block(fname, &rblock);
+ st = fbm->open_block(path_desc, &rblock);
uint64_t file_size = 0;
ASSERT_TRUE(rblock->size(&file_size).ok());
ASSERT_EQ(data.size(), file_size);
diff --git a/be/test/olap/memory/mem_tablet_test.cpp b/be/test/olap/memory/mem_tablet_test.cpp
index ad4b297..04cc727 100644
--- a/be/test/olap/memory/mem_tablet_test.cpp
+++ b/be/test/olap/memory/mem_tablet_test.cpp
@@ -69,7 +69,8 @@ TEST(MemTablet, writescan) {
tschema.__set_schema_hash(1);
TabletMetaSharedPtr tablet_meta(
new TabletMeta(1, 1, 1, 1, 1, tschema, static_cast<uint32_t>(sc->cid_size()),
- col_idx_to_unique_id, TabletUid(1, 1), TTabletType::TABLET_TYPE_MEMORY));
+ col_idx_to_unique_id, TabletUid(1, 1), TTabletType::TABLET_TYPE_MEMORY,
+ TStorageMedium::HDD));
std::shared_ptr<MemTablet> tablet = MemTablet::create_tablet_from_meta(tablet_meta, nullptr);
ASSERT_TRUE(tablet->init().ok());
diff --git a/be/test/olap/olap_snapshot_converter_test.cpp b/be/test/olap/olap_snapshot_converter_test.cpp
index b91c911..ab5a4e4 100644
--- a/be/test/olap/olap_snapshot_converter_test.cpp
+++ b/be/test/olap/olap_snapshot_converter_test.cpp
@@ -74,7 +74,7 @@ public:
if (std::filesystem::exists(tmp_data_path)) {
std::filesystem::remove_all(tmp_data_path);
}
- copy_dir(test_engine_data_path, tmp_data_path);
+ FileUtils::copy_file(test_engine_data_path, tmp_data_path);
_tablet_id = 15007;
_schema_hash = 368169781;
_tablet_data_path = tmp_data_path + "/" + std::to_string(0) + "/" +
diff --git a/be/test/olap/rowset/alpha_rowset_test.cpp b/be/test/olap/rowset/alpha_rowset_test.cpp
index 4229460..609a9a7 100644
--- a/be/test/olap/rowset/alpha_rowset_test.cpp
+++ b/be/test/olap/rowset/alpha_rowset_test.cpp
@@ -81,7 +81,7 @@ void create_rowset_writer_context(TabletSchema* tablet_schema,
rowset_writer_context->tablet_schema_hash = 1111;
rowset_writer_context->partition_id = 10;
rowset_writer_context->rowset_type = ALPHA_ROWSET;
- rowset_writer_context->rowset_path_prefix = config::storage_root_path + "/data/0/12345/1111";
+ rowset_writer_context->path_desc.filepath = config::storage_root_path + "/data/0/12345/1111";
rowset_writer_context->rowset_state = VISIBLE;
rowset_writer_context->tablet_schema = tablet_schema;
rowset_writer_context->version.first = 0;
diff --git a/be/test/olap/rowset/beta_rowset_test.cpp b/be/test/olap/rowset/beta_rowset_test.cpp
index 54973c7..7f8190f 100644
--- a/be/test/olap/rowset/beta_rowset_test.cpp
+++ b/be/test/olap/rowset/beta_rowset_test.cpp
@@ -133,7 +133,7 @@ protected:
rowset_writer_context->tablet_schema_hash = 1111;
rowset_writer_context->partition_id = 10;
rowset_writer_context->rowset_type = BETA_ROWSET;
- rowset_writer_context->rowset_path_prefix = "./data_test/data/beta_rowset_test";
+ rowset_writer_context->path_desc.filepath = "./data_test/data/beta_rowset_test";
rowset_writer_context->rowset_state = VISIBLE;
rowset_writer_context->tablet_schema = tablet_schema;
rowset_writer_context->version.first = 10;
diff --git a/be/test/olap/rowset/rowset_converter_test.cpp b/be/test/olap/rowset/rowset_converter_test.cpp
index bb3f193..ca6feba 100644
--- a/be/test/olap/rowset/rowset_converter_test.cpp
+++ b/be/test/olap/rowset/rowset_converter_test.cpp
@@ -61,7 +61,7 @@ void create_rowset_writer_context(TabletSchema* tablet_schema, RowsetTypePB dst_
rowset_writer_context->tablet_schema_hash = 1111;
rowset_writer_context->partition_id = 10;
rowset_writer_context->rowset_type = dst_type;
- rowset_writer_context->rowset_path_prefix = config::storage_root_path + "/data/0/12345/1111";
+ rowset_writer_context->path_desc.filepath = config::storage_root_path + "/data/0/12345/1111";
rowset_writer_context->rowset_state = VISIBLE;
rowset_writer_context->tablet_schema = tablet_schema;
rowset_writer_context->version.first = 0;
@@ -235,14 +235,16 @@ void RowsetConverterTest::process(RowsetTypePB src_type, RowsetTypePB dst_type)
create_tablet_meta(&tablet_schema, tablet_meta.get());
RowsetConverter rowset_converter(tablet_meta);
RowsetMetaPB dst_rowset_meta_pb;
+ FilePathDesc schema_hash_path_desc;
+ schema_hash_path_desc.filepath = _schema_hash_path;
if (dst_type == BETA_ROWSET) {
ASSERT_EQ(OLAP_SUCCESS,
rowset_converter.convert_alpha_to_beta(src_rowset->rowset_meta(),
- _schema_hash_path, &dst_rowset_meta_pb));
+ schema_hash_path_desc, &dst_rowset_meta_pb));
} else {
ASSERT_EQ(OLAP_SUCCESS,
rowset_converter.convert_beta_to_alpha(src_rowset->rowset_meta(),
- _schema_hash_path, &dst_rowset_meta_pb));
+ schema_hash_path_desc, &dst_rowset_meta_pb));
}
ASSERT_EQ(dst_type, dst_rowset_meta_pb.rowset_type());
@@ -253,7 +255,7 @@ void RowsetConverterTest::process(RowsetTypePB src_type, RowsetTypePB dst_type)
RowsetMetaSharedPtr dst_rowset_meta(new RowsetMeta());
ASSERT_TRUE(dst_rowset_meta->init_from_pb(dst_rowset_meta_pb));
RowsetSharedPtr dst_rowset;
- ASSERT_EQ(OLAP_SUCCESS, RowsetFactory::create_rowset(&tablet_schema, _schema_hash_path,
+ ASSERT_EQ(OLAP_SUCCESS, RowsetFactory::create_rowset(&tablet_schema, schema_hash_path_desc,
dst_rowset_meta, &dst_rowset));
RowsetReaderSharedPtr dst_rowset_reader;
diff --git a/be/test/olap/rowset/segment_v2/bitmap_index_test.cpp b/be/test/olap/rowset/segment_v2/bitmap_index_test.cpp
index 6211997..be1a8e3 100644
--- a/be/test/olap/rowset/segment_v2/bitmap_index_test.cpp
+++ b/be/test/olap/rowset/segment_v2/bitmap_index_test.cpp
@@ -65,8 +65,8 @@ void write_index_file(std::string& filename, const void* values, size_t value_co
const TypeInfo* type_info = get_type_info(type);
{
std::unique_ptr<fs::WritableBlock> wblock;
- fs::CreateBlockOptions opts({filename});
- ASSERT_TRUE(fs::fs_util::block_manager()->create_block(opts, &wblock).ok());
+ fs::CreateBlockOptions opts(filename);
+ ASSERT_TRUE(fs::fs_util::block_manager(TStorageMedium::HDD)->create_block(opts, &wblock).ok());
std::unique_ptr<BitmapIndexWriter> writer;
BitmapIndexWriter::create(type_info, &writer);
diff --git a/be/test/olap/rowset/segment_v2/bloom_filter_index_reader_writer_test.cpp b/be/test/olap/rowset/segment_v2/bloom_filter_index_reader_writer_test.cpp
index 3cb0c3a..5a86835 100644
--- a/be/test/olap/rowset/segment_v2/bloom_filter_index_reader_writer_test.cpp
+++ b/be/test/olap/rowset/segment_v2/bloom_filter_index_reader_writer_test.cpp
@@ -59,8 +59,8 @@ void write_bloom_filter_index_file(const std::string& file_name, const void* val
std::string fname = dname + "/" + file_name;
{
std::unique_ptr<fs::WritableBlock> wblock;
- fs::CreateBlockOptions opts({fname});
- Status st = fs::fs_util::block_manager()->create_block(opts, &wblock);
+ fs::CreateBlockOptions opts(fname);
+ Status st = fs::fs_util::block_manager(TStorageMedium::HDD)->create_block(opts, &wblock);
ASSERT_TRUE(st.ok()) << st.to_string();
std::unique_ptr<BloomFilterIndexWriter> bloom_filter_index_writer;
diff --git a/be/test/olap/rowset/segment_v2/column_reader_writer_test.cpp b/be/test/olap/rowset/segment_v2/column_reader_writer_test.cpp
index 4f0990c..f5978f1 100644
--- a/be/test/olap/rowset/segment_v2/column_reader_writer_test.cpp
+++ b/be/test/olap/rowset/segment_v2/column_reader_writer_test.cpp
@@ -77,8 +77,8 @@ void test_nullable_data(uint8_t* src_data, uint8_t* src_is_null, int num_rows,
std::string fname = TEST_DIR + "/" + test_name;
{
std::unique_ptr<fs::WritableBlock> wblock;
- fs::CreateBlockOptions opts({fname});
- Status st = fs::fs_util::block_manager()->create_block(opts, &wblock);
+ fs::CreateBlockOptions opts(fname);
+ Status st = fs::fs_util::block_manager(TStorageMedium::HDD)->create_block(opts, &wblock);
ASSERT_TRUE(st.ok()) << st.get_error_msg();
ColumnWriterOptions writer_opts;
@@ -125,16 +125,18 @@ void test_nullable_data(uint8_t* src_data, uint8_t* src_is_null, int num_rows,
{
// read and check
ColumnReaderOptions reader_opts;
+ FilePathDesc path_desc;
+ path_desc.filepath = fname;
std::unique_ptr<ColumnReader> reader;
- auto st = ColumnReader::create(reader_opts, meta, num_rows, fname, &reader);
+ auto st = ColumnReader::create(reader_opts, meta, num_rows, path_desc, &reader);
ASSERT_TRUE(st.ok());
ColumnIterator* iter = nullptr;
st = reader->new_iterator(&iter);
ASSERT_TRUE(st.ok());
std::unique_ptr<fs::ReadableBlock> rblock;
- fs::BlockManager* block_manager = fs::fs_util::block_manager();
- block_manager->open_block(fname, &rblock);
+ fs::BlockManager* block_manager = fs::fs_util::block_manager(TStorageMedium::HDD);
+ block_manager->open_block(path_desc, &rblock);
ASSERT_TRUE(st.ok());
ColumnIteratorOptions iter_opts;
@@ -238,8 +240,8 @@ void test_array_nullable_data(CollectionValue* src_data, uint8_t* src_is_null, i
std::string fname = TEST_DIR + "/" + test_name;
{
std::unique_ptr<fs::WritableBlock> wblock;
- fs::CreateBlockOptions opts({fname});
- Status st = fs::fs_util::block_manager()->create_block(opts, &wblock);
+ fs::CreateBlockOptions opts(fname);
+ Status st = fs::fs_util::block_manager(TStorageMedium::HDD)->create_block(opts, &wblock);
ASSERT_TRUE(st.ok()) << st.get_error_msg();
ColumnWriterOptions writer_opts;
@@ -289,16 +291,18 @@ void test_array_nullable_data(CollectionValue* src_data, uint8_t* src_is_null, i
// read and check
{
ColumnReaderOptions reader_opts;
+ FilePathDesc path_desc;
+ path_desc.filepath = fname;
std::unique_ptr<ColumnReader> reader;
- auto st = ColumnReader::create(reader_opts, meta, num_rows, fname, &reader);
+ auto st = ColumnReader::create(reader_opts, meta, num_rows, path_desc, &reader);
ASSERT_TRUE(st.ok());
ColumnIterator* iter = nullptr;
st = reader->new_iterator(&iter);
ASSERT_TRUE(st.ok());
std::unique_ptr<fs::ReadableBlock> rblock;
- fs::BlockManager* block_manager = fs::fs_util::block_manager();
- st = block_manager->open_block(fname, &rblock);
+ fs::BlockManager* block_manager = fs::fs_util::block_manager(TStorageMedium::HDD);
+ st = block_manager->open_block(path_desc, &rblock);
ASSERT_TRUE(st.ok());
ColumnIteratorOptions iter_opts;
OlapReaderStatistics stats;
diff --git a/be/test/olap/rowset/segment_v2/ordinal_page_index_test.cpp b/be/test/olap/rowset/segment_v2/ordinal_page_index_test.cpp
index 9c4fd85..4802f3f 100644
--- a/be/test/olap/rowset/segment_v2/ordinal_page_index_test.cpp
+++ b/be/test/olap/rowset/segment_v2/ordinal_page_index_test.cpp
@@ -62,8 +62,8 @@ TEST_F(OrdinalPageIndexTest, normal) {
ColumnIndexMetaPB index_meta;
{
std::unique_ptr<fs::WritableBlock> wblock;
- fs::CreateBlockOptions opts({filename});
- ASSERT_TRUE(fs::fs_util::block_manager()->create_block(opts, &wblock).ok());
+ fs::CreateBlockOptions opts(filename);
+ ASSERT_TRUE(fs::fs_util::block_manager(TStorageMedium::HDD)->create_block(opts, &wblock).ok());
ASSERT_TRUE(builder.finish(wblock.get(), &index_meta).ok());
ASSERT_EQ(ORDINAL_INDEX, index_meta.type());
@@ -73,7 +73,8 @@ TEST_F(OrdinalPageIndexTest, normal) {
<< index_meta.ordinal_index().root_page().root_page().size();
}
- OrdinalIndexReader index(filename, &index_meta.ordinal_index(), 16 * 1024 * 4096 + 1);
+ FilePathDesc path_desc(filename);
+ OrdinalIndexReader index(path_desc, &index_meta.ordinal_index(), 16 * 1024 * 4096 + 1);
ASSERT_TRUE(index.load(true, false).ok());
ASSERT_EQ(16 * 1024, index.num_data_pages());
ASSERT_EQ(1, index.get_first_ordinal(0));
@@ -127,7 +128,8 @@ TEST_F(OrdinalPageIndexTest, one_data_page) {
ASSERT_EQ(data_page_pointer, root_page_pointer);
}
- OrdinalIndexReader index("", &index_meta.ordinal_index(), num_values);
+ FilePathDesc path_desc;
+ OrdinalIndexReader index(path_desc, &index_meta.ordinal_index(), num_values);
ASSERT_TRUE(index.load(true, false).ok());
ASSERT_EQ(1, index.num_data_pages());
ASSERT_EQ(0, index.get_first_ordinal(0));
diff --git a/be/test/olap/rowset/segment_v2/segment_test.cpp b/be/test/olap/rowset/segment_v2/segment_test.cpp
index c1f653c3..1f8f45d 100644
--- a/be/test/olap/rowset/segment_v2/segment_test.cpp
+++ b/be/test/olap/rowset/segment_v2/segment_test.cpp
@@ -111,8 +111,8 @@ protected:
// the wrong answer (it use (filename,offset) as cache key)
std::string filename = strings::Substitute("$0/seg_$1.dat", kSegmentDir, seg_id++);
std::unique_ptr<fs::WritableBlock> wblock;
- fs::CreateBlockOptions block_opts({filename});
- Status st = fs::fs_util::block_manager()->create_block(block_opts, &wblock);
+ fs::CreateBlockOptions block_opts(filename);
+ Status st = fs::fs_util::block_manager(TStorageMedium::HDD)->create_block(block_opts, &wblock);
ASSERT_TRUE(st.ok());
SegmentWriter writer(wblock.get(), 0, &build_schema, opts);
st = writer.init(10);
@@ -136,7 +136,9 @@ protected:
ASSERT_TRUE(st.ok());
ASSERT_TRUE(wblock->close().ok());
- st = Segment::open(filename, 0, &query_schema, res);
+ FilePathDesc path_desc;
+ path_desc.filepath = filename;
+ st = Segment::open(path_desc, 0, &query_schema, res);
ASSERT_TRUE(st.ok());
ASSERT_EQ(nrows, (*res)->num_rows());
}
@@ -613,8 +615,8 @@ TEST_F(SegmentReaderWriterTest, estimate_segment_size) {
std::string fname = dname + "/int_case";
std::unique_ptr<fs::WritableBlock> wblock;
- fs::CreateBlockOptions wblock_opts({fname});
- Status st = fs::fs_util::block_manager()->create_block(wblock_opts, &wblock);
+ fs::CreateBlockOptions wblock_opts(fname);
+ Status st = fs::fs_util::block_manager(TStorageMedium::HDD)->create_block(wblock_opts, &wblock);
ASSERT_TRUE(st.ok()) << st.to_string();
SegmentWriter writer(wblock.get(), 0, tablet_schema.get(), opts);
st = writer.init(10);
@@ -783,8 +785,8 @@ TEST_F(SegmentReaderWriterTest, TestStringDict) {
std::string fname = dname + "/string_case";
std::unique_ptr<fs::WritableBlock> wblock;
- fs::CreateBlockOptions wblock_opts({fname});
- Status st = fs::fs_util::block_manager()->create_block(wblock_opts, &wblock);
+ fs::CreateBlockOptions wblock_opts(fname);
+ Status st = fs::fs_util::block_manager(TStorageMedium::HDD)->create_block(wblock_opts, &wblock);
ASSERT_TRUE(st.ok());
SegmentWriter writer(wblock.get(), 0, tablet_schema.get(), opts);
st = writer.init(10);
@@ -817,7 +819,9 @@ TEST_F(SegmentReaderWriterTest, TestStringDict) {
{
std::shared_ptr<Segment> segment;
- st = Segment::open(fname, 0, tablet_schema.get(), &segment);
+ FilePathDesc path_desc;
+ path_desc.filepath = fname;
+ st = Segment::open(path_desc, 0, tablet_schema.get(), &segment);
ASSERT_TRUE(st.ok());
ASSERT_EQ(4096, segment->num_rows());
Schema schema(*tablet_schema);
diff --git a/be/test/olap/rowset/segment_v2/zone_map_index_test.cpp b/be/test/olap/rowset/segment_v2/zone_map_index_test.cpp
index 3f11e69..ce19c92 100644
--- a/be/test/olap/rowset/segment_v2/zone_map_index_test.cpp
+++ b/be/test/olap/rowset/segment_v2/zone_map_index_test.cpp
@@ -74,8 +74,8 @@ public:
ColumnIndexMetaPB index_meta;
{
std::unique_ptr<fs::WritableBlock> wblock;
- fs::CreateBlockOptions opts({filename});
- ASSERT_TRUE(fs::fs_util::block_manager()->create_block(opts, &wblock).ok());
+ fs::CreateBlockOptions opts(filename);
+ ASSERT_TRUE(fs::fs_util::block_manager(TStorageMedium::HDD)->create_block(opts, &wblock).ok());
ASSERT_TRUE(builder.finish(wblock.get(), &index_meta).ok());
ASSERT_EQ(ZONE_MAP_INDEX, index_meta.type());
ASSERT_TRUE(wblock->close().ok());
@@ -118,8 +118,8 @@ public:
ColumnIndexMetaPB index_meta;
{
std::unique_ptr<fs::WritableBlock> wblock;
- fs::CreateBlockOptions opts({filename});
- ASSERT_TRUE(fs::fs_util::block_manager()->create_block(opts, &wblock).ok());
+ fs::CreateBlockOptions opts(filename);
+ ASSERT_TRUE(fs::fs_util::block_manager(TStorageMedium::HDD)->create_block(opts, &wblock).ok());
ASSERT_TRUE(builder.finish(wblock.get(), &index_meta).ok());
ASSERT_EQ(ZONE_MAP_INDEX, index_meta.type());
ASSERT_TRUE(wblock->close().ok());
@@ -169,7 +169,7 @@ TEST_F(ColumnZoneMapTest, NormalTestIntPage) {
{
std::unique_ptr<fs::WritableBlock> wblock;
fs::CreateBlockOptions opts({filename});
- ASSERT_TRUE(fs::fs_util::block_manager()->create_block(opts, &wblock).ok());
+ ASSERT_TRUE(fs::fs_util::block_manager(TStorageMedium::HDD)->create_block(opts, &wblock).ok());
ASSERT_TRUE(builder.finish(wblock.get(), &index_meta).ok());
ASSERT_EQ(ZONE_MAP_INDEX, index_meta.type());
ASSERT_TRUE(wblock->close().ok());
diff --git a/be/test/olap/tablet_meta_test.cpp b/be/test/olap/tablet_meta_test.cpp
index aea3e84..001651f 100644
--- a/be/test/olap/tablet_meta_test.cpp
+++ b/be/test/olap/tablet_meta_test.cpp
@@ -27,7 +27,7 @@ TEST(TabletMetaTest, SaveAndParse) {
std::string meta_path = "./be/test/olap/test_data/tablet_meta_test.hdr";
TabletMeta old_tablet_meta(1, 2, 3, 4, 5, TTabletSchema(), 6, {{7, 8}}, UniqueId(9, 10),
- TTabletType::TABLET_TYPE_DISK);
+ TTabletType::TABLET_TYPE_DISK, TStorageMedium::HDD);
ASSERT_EQ(OLAP_SUCCESS, old_tablet_meta.save(meta_path));
{
diff --git a/be/test/olap/tablet_mgr_test.cpp b/be/test/olap/tablet_mgr_test.cpp
index 9a5eb86..30d418f 100644
--- a/be/test/olap/tablet_mgr_test.cpp
+++ b/be/test/olap/tablet_mgr_test.cpp
@@ -108,7 +108,7 @@ TEST_F(TabletMgrTest, CreateTablet) {
TabletSharedPtr tablet = _tablet_mgr->get_tablet(111, 3333);
ASSERT_TRUE(tablet != nullptr);
// check dir exist
- bool dir_exist = FileUtils::check_exist(tablet->tablet_path());
+ bool dir_exist = FileUtils::check_exist(tablet->tablet_path_desc().filepath);
ASSERT_TRUE(dir_exist);
// check meta has this tablet
TabletMetaSharedPtr new_tablet_meta(new TabletMeta());
@@ -174,8 +174,8 @@ TEST_F(TabletMgrTest, CreateTabletWithSequence) {
TabletSharedPtr tablet = _tablet_mgr->get_tablet(111, 3333);
ASSERT_TRUE(tablet != nullptr);
// check dir exist
- bool dir_exist = FileUtils::check_exist(tablet->tablet_path());
- ASSERT_TRUE(dir_exist) << tablet->tablet_path();
+ bool dir_exist = FileUtils::check_exist(tablet->tablet_path_desc().filepath);
+ ASSERT_TRUE(dir_exist) << tablet->tablet_path_desc().filepath;
// check meta has this tablet
TabletMetaSharedPtr new_tablet_meta(new TabletMeta());
OLAPStatus check_meta_st = TabletMetaManager::get_meta(_data_dir, 111, 3333, new_tablet_meta);
@@ -230,7 +230,7 @@ TEST_F(TabletMgrTest, DropTablet) {
ASSERT_TRUE(tablet != nullptr);
// check dir exist
- std::string tablet_path = tablet->tablet_path();
+ std::string tablet_path = tablet->tablet_path_desc().filepath;
bool dir_exist = FileUtils::check_exist(tablet_path);
ASSERT_TRUE(dir_exist);
diff --git a/be/test/olap/tablet_test.cpp b/be/test/olap/tablet_test.cpp
index 0e5d57b..01aac86 100644
--- a/be/test/olap/tablet_test.cpp
+++ b/be/test/olap/tablet_test.cpp
@@ -37,7 +37,7 @@ public:
virtual void SetUp() {
_tablet_meta = static_cast<TabletMetaSharedPtr>(
new TabletMeta(1, 2, 15673, 4, 5, TTabletSchema(), 6, {{7, 8}}, UniqueId(9, 10),
- TTabletType::TABLET_TYPE_DISK));
+ TTabletType::TABLET_TYPE_DISK, TStorageMedium::HDD));
_json_rowset_meta = R"({
"rowset_id": 540081,
"tablet_id": 15673,
diff --git a/be/test/olap/txn_manager_test.cpp b/be/test/olap/txn_manager_test.cpp
index 69d20ba..f32a64a 100644
--- a/be/test/olap/txn_manager_test.cpp
+++ b/be/test/olap/txn_manager_test.cpp
@@ -133,9 +133,11 @@ public:
RowsetMetaSharedPtr rowset_meta(new AlphaRowsetMeta());
rowset_meta->init_from_json(_json_rowset_meta);
ASSERT_EQ(rowset_meta->rowset_id(), rowset_id);
- ASSERT_EQ(OLAP_SUCCESS, RowsetFactory::create_rowset(_schema.get(), rowset_meta_path,
+ FilePathDesc rowset_meta_path_desc;
+ rowset_meta_path_desc.filepath = rowset_meta_path;
+ ASSERT_EQ(OLAP_SUCCESS, RowsetFactory::create_rowset(_schema.get(), rowset_meta_path_desc,
rowset_meta, &_alpha_rowset));
- ASSERT_EQ(OLAP_SUCCESS, RowsetFactory::create_rowset(_schema.get(), rowset_meta_path,
+ ASSERT_EQ(OLAP_SUCCESS, RowsetFactory::create_rowset(_schema.get(), rowset_meta_path_desc,
rowset_meta, &_alpha_rowset_same_id));
// init rowset meta 2
@@ -152,7 +154,9 @@ public:
RowsetMetaSharedPtr rowset_meta2(new AlphaRowsetMeta());
rowset_meta2->init_from_json(_json_rowset_meta);
ASSERT_EQ(rowset_meta2->rowset_id(), rowset_id);
- ASSERT_EQ(OLAP_SUCCESS, RowsetFactory::create_rowset(_schema.get(), rowset_meta_path_2,
+ FilePathDesc rowset_meta_path_desc_2;
+ rowset_meta_path_desc_2.filepath = rowset_meta_path_2;
+ ASSERT_EQ(OLAP_SUCCESS, RowsetFactory::create_rowset(_schema.get(), rowset_meta_path_desc_2,
rowset_meta2, &_alpha_rowset_diff_id));
_tablet_uid = TabletUid(10, 10);
}
diff --git a/be/test/plugin/plugin_zip_test.cpp b/be/test/plugin/plugin_zip_test.cpp
index 810026b..2ee1eba 100644
--- a/be/test/plugin/plugin_zip_test.cpp
+++ b/be/test/plugin/plugin_zip_test.cpp
@@ -96,7 +96,7 @@ TEST_F(PluginZipTest, local_normal) {
char f[11];
Slice s(f, 11);
- file->read_at(0, s);
+ file->read_at(0, &s);
ASSERT_EQ("hello world", s.to_string());
@@ -119,7 +119,7 @@ TEST_F(PluginZipTest, http_normal) {
char f[11];
Slice s(f, 11);
- file->read_at(0, s);
+ file->read_at(0, &s);
ASSERT_EQ("hello world", s.to_string());
diff --git a/be/test/tools/benchmark_tool.cpp b/be/test/tools/benchmark_tool.cpp
index 1e8d4e2..fea489a 100644
--- a/be/test/tools/benchmark_tool.cpp
+++ b/be/test/tools/benchmark_tool.cpp
@@ -337,7 +337,7 @@ public:
std::string filename = strings::Substitute("$0/seg_$1.dat", kSegmentDir, ++seg_id);
std::unique_ptr<fs::WritableBlock> wblock;
fs::CreateBlockOptions block_opts({filename});
- fs::fs_util::block_manager()->create_block(block_opts, &wblock);
+ fs::fs_util::block_manager(TStorageMedium::HDD)->create_block(block_opts, &wblock);
SegmentWriterOptions opts;
SegmentWriter writer(wblock.get(), 0, &_tablet_schema, opts);
writer.init(1024);
diff --git a/be/test/util/broker_storage_backend_test.cpp b/be/test/util/broker_storage_backend_test.cpp
index f8f1611..548ce31 100644
--- a/be/test/util/broker_storage_backend_test.cpp
+++ b/be/test/util/broker_storage_backend_test.cpp
@@ -165,7 +165,7 @@ TEST_F(StorageBackendTest, broker_list) {
status = _broker->direct_upload(_broker_base_path + "/Ode_to_the_West_Wind2.md5", _content);
ASSERT_TRUE(status.ok());
std::map<std::string, FileStat> files;
- status = _broker->list(_broker_base_path, &files);
+ status = _broker->list(_broker_base_path, true, false, &files);
ASSERT_TRUE(status.ok());
ASSERT_TRUE(files.find("Ode_to_the_West_Wind") != files.end());
ASSERT_TRUE(files.find("Ode_to_the_West_Wind1") != files.end());
diff --git a/be/test/util/s3_storage_backend_test.cpp b/be/test/util/s3_storage_backend_test.cpp
index 3e8cc87..ab5dd81 100644
--- a/be/test/util/s3_storage_backend_test.cpp
+++ b/be/test/util/s3_storage_backend_test.cpp
@@ -158,7 +158,7 @@ TEST_F(S3StorageBackendTest, s3_list) {
status = _s3->direct_upload(_s3_base_path + "/Ode_to_the_West_Wind2.md5", _content);
ASSERT_TRUE(status.ok());
std::map<std::string, FileStat> files;
- status = _s3->list(_s3_base_path, &files);
+ status = _s3->list(_s3_base_path, true, false, &files);
ASSERT_TRUE(status.ok());
ASSERT_TRUE(files.find("Ode_to_the_West_Wind") != files.end());
ASSERT_TRUE(files.find("Ode_to_the_West_Wind1") != files.end());
@@ -182,8 +182,6 @@ TEST_F(S3StorageBackendTest, s3_mkdir) {
ASSERT_TRUE(status.ok());
status = _s3->exist(_s3_base_path + "/dir");
ASSERT_TRUE(status.code() == TStatusCode::NOT_FOUND);
- status = _s3->exist(_s3_base_path + "/dir/");
- ASSERT_TRUE(status.ok());
}
} // end namespace doris
diff --git a/be/test/util/zip_util_test.cpp b/be/test/util/zip_util_test.cpp
index 164d72e..72401ad 100644
--- a/be/test/util/zip_util_test.cpp
+++ b/be/test/util/zip_util_test.cpp
@@ -48,7 +48,7 @@ TEST(ZipUtilTest, basic) {
char f[11];
Slice slice(f, 11);
- file->read_at(0, slice);
+ file->read_at(0, &slice);
ASSERT_EQ("hello world", slice.to_string());
@@ -77,7 +77,7 @@ TEST(ZipUtilTest, dir) {
char f[4];
Slice slice(f, 4);
- file->read_at(0, slice);
+ file->read_at(0, &slice);
ASSERT_EQ("test", slice.to_string());
diff --git a/gensrc/proto/olap_file.proto b/gensrc/proto/olap_file.proto
index 96e4c2e..8dce011 100644
--- a/gensrc/proto/olap_file.proto
+++ b/gensrc/proto/olap_file.proto
@@ -295,6 +295,12 @@ enum TabletTypePB {
TABLET_TYPE_MEMORY = 1;
}
+enum StorageMediumPB {
+ HDD = 0;
+ SSD = 1;
+ S3 = 2;
+}
+
message TabletMetaPB {
optional int64 table_id = 1; // ?
optional int64 partition_id = 2; // ?
@@ -319,6 +325,7 @@ message TabletMetaPB {
optional RowsetTypePB preferred_rowset_type = 16;
optional TabletTypePB tablet_type = 17;
repeated RowsetMetaPB stale_rs_metas = 18;
+ optional StorageMediumPB storage_medium = 19 [default = HDD];
}
message OLAPIndexHeaderMessage {
diff --git a/gensrc/thrift/Types.thrift b/gensrc/thrift/Types.thrift
index 506a97e..c69f0d9 100644
--- a/gensrc/thrift/Types.thrift
+++ b/gensrc/thrift/Types.thrift
@@ -46,6 +46,7 @@ enum TStorageType {
enum TStorageMedium {
HDD,
SSD,
+ S3,
}
enum TVarType {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org