You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by yi...@apache.org on 2022/10/28 07:48:28 UTC
[doris] branch master updated: [Feature](remote) Using heavy schema change if the table is not enable light weight schema change (#13487)
This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new eab8876abc [Feature](remote) Using heavy schema change if the table is not enable light weight schema change (#13487)
eab8876abc is described below
commit eab8876abc536cd338c1c7c080b9f92dee8a6d64
Author: pengxiangyu <di...@163.com>
AuthorDate: Fri Oct 28 15:48:22 2022 +0800
[Feature](remote) Using heavy schema change if the table is not enable light weight schema change (#13487)
---
be/src/io/CMakeLists.txt | 1 +
be/src/io/fs/file_system.h | 2 +-
be/src/io/fs/file_system_map.cpp | 4 +-
be/src/io/fs/file_system_map.h | 6 +-
be/src/io/fs/local_file_system.cpp | 7 +-
be/src/io/fs/local_file_system.h | 2 +-
be/src/io/fs/s3_file_system.cpp | 5 +-
be/src/io/fs/s3_file_system.h | 1 -
be/src/io/fs/s3_file_writer.cpp | 243 +++++++++++++++++++++
be/src/io/fs/s3_file_writer.h | 79 +++++++
be/src/olap/data_dir.h | 4 +-
be/src/olap/rowset/beta_rowset.cpp | 55 +++--
be/src/olap/rowset/beta_rowset.h | 7 +-
be/src/olap/rowset/beta_rowset_writer.cpp | 20 +-
be/src/olap/rowset/rowset.h | 3 +
be/src/olap/rowset/rowset_meta.h | 8 +-
be/src/olap/rowset/rowset_writer_context.h | 6 +-
be/src/olap/rowset/segment_v2/segment.cpp | 2 +-
be/src/olap/rowset/segment_v2/segment.h | 6 +-
be/src/olap/schema_change.cpp | 14 +-
be/src/olap/snapshot_manager.cpp | 2 +-
be/src/olap/tablet.cpp | 17 +-
be/src/olap/tablet.h | 6 +
be/src/olap/tablet_meta.cpp | 2 +-
be/src/olap/tablet_meta.h | 2 +-
be/src/util/doris_metrics.cpp | 8 +
be/src/util/doris_metrics.h | 4 +
be/test/olap/rowid_conversion_test.cpp | 2 +-
be/test/olap/rowset/beta_rowset_test.cpp | 2 +-
.../olap/rowset/segment_v2/bitmap_index_test.cpp | 5 +-
30 files changed, 457 insertions(+), 68 deletions(-)
diff --git a/be/src/io/CMakeLists.txt b/be/src/io/CMakeLists.txt
index 0941505ba0..4096d2557b 100644
--- a/be/src/io/CMakeLists.txt
+++ b/be/src/io/CMakeLists.txt
@@ -40,6 +40,7 @@ set(IO_FILES
fs/local_file_writer.cpp
fs/s3_file_reader.cpp
fs/s3_file_system.cpp
+ fs/s3_file_writer.cpp
cache/dummy_file_cache.cpp
cache/file_cache.cpp
cache/file_cache_manager.cpp
diff --git a/be/src/io/fs/file_system.h b/be/src/io/fs/file_system.h
index e3c4a19018..e7d4fbbb88 100644
--- a/be/src/io/fs/file_system.h
+++ b/be/src/io/fs/file_system.h
@@ -80,7 +80,7 @@ protected:
FileSystemType _type;
};
-using FileSystemPtr = std::shared_ptr<FileSystem>;
+using FileSystemSPtr = std::shared_ptr<FileSystem>;
} // namespace io
} // namespace doris
diff --git a/be/src/io/fs/file_system_map.cpp b/be/src/io/fs/file_system_map.cpp
index 2467781e53..18d7f454e8 100644
--- a/be/src/io/fs/file_system_map.cpp
+++ b/be/src/io/fs/file_system_map.cpp
@@ -27,12 +27,12 @@ FileSystemMap* FileSystemMap::instance() {
return ↦
}
-void FileSystemMap::insert(ResourceId id, FileSystemPtr fs) {
+void FileSystemMap::insert(ResourceId id, FileSystemSPtr fs) {
std::unique_lock wlock(_mu);
_map.try_emplace(std::move(id), std::move(fs));
}
-FileSystemPtr FileSystemMap::get(const ResourceId& id) {
+FileSystemSPtr FileSystemMap::get(const ResourceId& id) {
std::shared_lock rlock(_mu);
auto it = _map.find(id);
if (it != _map.end()) {
diff --git a/be/src/io/fs/file_system_map.h b/be/src/io/fs/file_system_map.h
index a7a4ef57fc..c208db59aa 100644
--- a/be/src/io/fs/file_system_map.h
+++ b/be/src/io/fs/file_system_map.h
@@ -31,17 +31,17 @@ public:
static FileSystemMap* instance();
~FileSystemMap() = default;
- void insert(ResourceId id, FileSystemPtr fs);
+ void insert(ResourceId id, FileSystemSPtr fs);
// If `id` is not in `_map`, return nullptr.
- FileSystemPtr get(const ResourceId& id);
+ FileSystemSPtr get(const ResourceId& id);
private:
FileSystemMap() = default;
private:
std::shared_mutex _mu;
- std::unordered_map<ResourceId, FileSystemPtr> _map; // GUARED_BY(_mu)
+ std::unordered_map<ResourceId, FileSystemSPtr> _map; // GUARED_BY(_mu)
};
} // namespace io
diff --git a/be/src/io/fs/local_file_system.cpp b/be/src/io/fs/local_file_system.cpp
index 46098e060f..d9e43c21a8 100644
--- a/be/src/io/fs/local_file_system.cpp
+++ b/be/src/io/fs/local_file_system.cpp
@@ -142,9 +142,10 @@ Status LocalFileSystem::list(const Path& path, std::vector<Path>* files) {
return Status::OK();
}
-LocalFileSystem* global_local_filesystem() {
- static LocalFileSystem fs("");
- return &fs;
+static FileSystemSPtr local_fs = std::make_shared<io::LocalFileSystem>("");
+
+FileSystemSPtr global_local_filesystem() {
+ return local_fs;
}
} // namespace io
diff --git a/be/src/io/fs/local_file_system.h b/be/src/io/fs/local_file_system.h
index 2363edc77f..1477d0aa99 100644
--- a/be/src/io/fs/local_file_system.h
+++ b/be/src/io/fs/local_file_system.h
@@ -50,7 +50,7 @@ private:
Path absolute_path(const Path& path) const;
};
-LocalFileSystem* global_local_filesystem();
+FileSystemSPtr global_local_filesystem();
} // namespace io
} // namespace doris
diff --git a/be/src/io/fs/s3_file_system.cpp b/be/src/io/fs/s3_file_system.cpp
index 00a4eed292..68feb418e3 100644
--- a/be/src/io/fs/s3_file_system.cpp
+++ b/be/src/io/fs/s3_file_system.cpp
@@ -19,6 +19,7 @@
#include <aws/core/utils/threading/Executor.h>
#include <aws/s3/S3Client.h>
+#include <aws/s3/model/CopyObjectRequest.h>
#include <aws/s3/model/DeleteObjectRequest.h>
#include <aws/s3/model/DeleteObjectsRequest.h>
#include <aws/s3/model/HeadObjectRequest.h>
@@ -35,6 +36,7 @@
#include "gutil/strings/stringpiece.h"
#include "io/fs/remote_file_system.h"
#include "io/fs/s3_file_reader.h"
+#include "io/fs/s3_file_writer.h"
namespace doris {
namespace io {
@@ -136,7 +138,8 @@ Status S3FileSystem::batch_upload(const std::vector<Path>& local_paths,
}
Status S3FileSystem::create_file(const Path& path, FileWriterPtr* writer) {
- return Status::NotSupported("not support");
+ *writer = std::make_unique<S3FileWriter>(Path(get_key(path)), get_client(), _s3_conf);
+ return Status::OK();
}
Status S3FileSystem::open_file(const Path& path, FileReaderSPtr* reader) {
diff --git a/be/src/io/fs/s3_file_system.h b/be/src/io/fs/s3_file_system.h
index 9eb393996f..46510d3aa0 100644
--- a/be/src/io/fs/s3_file_system.h
+++ b/be/src/io/fs/s3_file_system.h
@@ -75,7 +75,6 @@ public:
// Guarded by external lock.
void set_sk(std::string sk) { _s3_conf.sk = std::move(sk); }
-private:
std::string get_key(const Path& path) const;
private:
diff --git a/be/src/io/fs/s3_file_writer.cpp b/be/src/io/fs/s3_file_writer.cpp
new file mode 100644
index 0000000000..0a5848726a
--- /dev/null
+++ b/be/src/io/fs/s3_file_writer.cpp
@@ -0,0 +1,243 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "io/fs/s3_file_writer.h"
+
+#include <aws/core/Aws.h>
+#include <aws/core/utils/HashingUtils.h>
+#include <aws/s3/S3Client.h>
+#include <aws/s3/model/AbortMultipartUploadRequest.h>
+#include <aws/s3/model/CompleteMultipartUploadRequest.h>
+#include <aws/s3/model/CreateMultipartUploadRequest.h>
+#include <aws/s3/model/DeleteObjectRequest.h>
+#include <aws/s3/model/DeleteObjectsRequest.h>
+#include <aws/s3/model/GetObjectRequest.h>
+#include <aws/s3/model/UploadPartRequest.h>
+#include <fmt/core.h>
+#include <sys/uio.h>
+
+#include <cerrno>
+
+#include "common/compiler_util.h"
+#include "common/status.h"
+#include "gutil/macros.h"
+#include "io/fs/file_writer.h"
+#include "io/fs/path.h"
+#include "io/fs/s3_file_system.h"
+#include "util/doris_metrics.h"
+
+using Aws::S3::Model::AbortMultipartUploadRequest;
+using Aws::S3::Model::CompletedPart;
+using Aws::S3::Model::CompletedMultipartUpload;
+using Aws::S3::Model::CompleteMultipartUploadRequest;
+using Aws::S3::Model::CreateMultipartUploadRequest;
+using Aws::S3::Model::DeleteObjectRequest;
+using Aws::S3::Model::UploadPartRequest;
+using Aws::S3::Model::UploadPartOutcome;
+
+namespace doris {
+namespace io {
+
+// max size of each part when uploading: 5MB
+static const int MAX_SIZE_EACH_PART = 5 * 1024 * 1024;
+static const char* STREAM_TAG = "S3FileWriter";
+
+S3FileWriter::S3FileWriter(Path path, std::shared_ptr<Aws::S3::S3Client> client,
+ const S3Conf& s3_conf)
+ : FileWriter(std::move(path)), _client(client), _s3_conf(s3_conf) {
+ DorisMetrics::instance()->s3_file_open_writing->increment(1);
+ DorisMetrics::instance()->s3_file_writer_total->increment(1);
+}
+
+S3FileWriter::~S3FileWriter() {
+ if (!_closed) {
+ WARN_IF_ERROR(abort(), fmt::format("Cannot abort {}", _path.native()));
+ }
+}
+
+Status S3FileWriter::close() {
+ return _close();
+}
+
+Status S3FileWriter::abort() {
+ AbortMultipartUploadRequest request;
+ request.WithBucket(_s3_conf.bucket).WithKey(_path.native()).WithUploadId(_upload_id);
+ auto outcome = _client->AbortMultipartUpload(request);
+ if (outcome.IsSuccess() ||
+ outcome.GetError().GetErrorType() == Aws::S3::S3Errors::NO_SUCH_UPLOAD ||
+ outcome.GetError().GetResponseCode() == Aws::Http::HttpResponseCode::NOT_FOUND) {
+ LOG(INFO) << "Abort multipart upload successfully. endpoint=" << _s3_conf.endpoint
+ << ", bucket=" << _s3_conf.bucket << ", key=" << _path.native()
+ << ", upload_id=" << _upload_id;
+ return Status::OK();
+ }
+ return Status::IOError(
+ "failed to abort multipart upload(endpoint={}, bucket={}, key={}, upload_id={}): {}",
+ _s3_conf.endpoint, _s3_conf.bucket, _path.native(), _upload_id,
+ outcome.GetError().GetMessage());
+}
+
+Status S3FileWriter::_open() {
+ CreateMultipartUploadRequest create_request;
+ create_request.WithBucket(_s3_conf.bucket).WithKey(_path.native());
+ create_request.SetContentType("text/plain");
+
+ _reset_stream();
+ auto outcome = _client->CreateMultipartUpload(create_request);
+
+ if (outcome.IsSuccess()) {
+ _upload_id = outcome.GetResult().GetUploadId();
+ LOG(INFO) << "create multi part upload successfully (endpoint=" << _s3_conf.endpoint
+ << ", bucket=" << _s3_conf.bucket << ", key=" << _path.native()
+ << ") upload_id: " << _upload_id;
+ return Status::OK();
+ }
+ return Status::IOError(
+ "failed to create multi part upload (endpoint={}, bucket={}, key={}): {}",
+ _s3_conf.endpoint, _s3_conf.bucket, _path.native(), outcome.GetError().GetMessage());
+}
+
+Status S3FileWriter::append(const Slice& data) {
+ Status st = appendv(&data, 1);
+ if (st.ok()) {
+ DorisMetrics::instance()->s3_bytes_written_total->increment(data.size);
+ }
+ return st;
+}
+
+Status S3FileWriter::appendv(const Slice* data, size_t data_cnt) {
+ DCHECK(!_closed);
+ if (!_is_open) {
+ RETURN_IF_ERROR(_open());
+ _is_open = true;
+ }
+
+ for (size_t i = 0; i < data_cnt; i++) {
+ const Slice& result = data[i];
+ _stream_ptr->write(result.data, result.size);
+ _bytes_appended += result.size;
+ auto start_pos = _stream_ptr->tellg();
+ _stream_ptr->seekg(0LL, _stream_ptr->end);
+ _stream_ptr->seekg(start_pos);
+ }
+ if (_stream_ptr->str().size() >= MAX_SIZE_EACH_PART) {
+ RETURN_IF_ERROR(_upload_part());
+ }
+ return Status::OK();
+}
+
+Status S3FileWriter::_upload_part() {
+ if (_stream_ptr->str().size() == 0) {
+ return Status::OK();
+ }
+ ++_cur_part_num;
+
+ UploadPartRequest upload_request;
+ upload_request.WithBucket(_s3_conf.bucket)
+ .WithKey(_path.native())
+ .WithPartNumber(_cur_part_num)
+ .WithUploadId(_upload_id);
+
+ upload_request.SetBody(_stream_ptr);
+
+ Aws::Utils::ByteBuffer part_md5(Aws::Utils::HashingUtils::CalculateMD5(*_stream_ptr));
+ upload_request.SetContentMD5(Aws::Utils::HashingUtils::Base64Encode(part_md5));
+
+ auto start_pos = _stream_ptr->tellg();
+ _stream_ptr->seekg(0LL, _stream_ptr->end);
+ upload_request.SetContentLength(static_cast<long>(_stream_ptr->tellg()));
+ _stream_ptr->seekg(start_pos);
+
+ auto upload_part_callable = _client->UploadPartCallable(upload_request);
+
+ UploadPartOutcome upload_part_outcome = upload_part_callable.get();
+ _reset_stream();
+ if (!upload_part_outcome.IsSuccess()) {
+ return Status::IOError(
+ "failed to upload part (endpoint={}, bucket={}, key={}, part_num = {}): {}",
+ _s3_conf.endpoint, _s3_conf.bucket, _path.native(), _cur_part_num,
+ upload_part_outcome.GetError().GetMessage());
+ }
+
+ std::shared_ptr<CompletedPart> completed_part = std::make_shared<CompletedPart>();
+ completed_part->SetPartNumber(_cur_part_num);
+ auto etag = upload_part_outcome.GetResult().GetETag();
+ DCHECK(etag.empty());
+ completed_part->SetETag(etag);
+ _completed_parts.emplace_back(completed_part);
+ return Status::OK();
+}
+
+void S3FileWriter::_reset_stream() {
+ _stream_ptr = Aws::MakeShared<Aws::StringStream>(STREAM_TAG, "");
+}
+
+Status S3FileWriter::finalize() {
+ DCHECK(!_closed);
+ if (_is_open) {
+ _close();
+ }
+ return Status::OK();
+}
+
+Status S3FileWriter::_close() {
+ if (_closed) {
+ return Status::OK();
+ }
+ if (_is_open) {
+ RETURN_IF_ERROR(_upload_part());
+
+ CompleteMultipartUploadRequest complete_request;
+ complete_request.WithBucket(_s3_conf.bucket)
+ .WithKey(_path.native())
+ .WithUploadId(_upload_id);
+
+ CompletedMultipartUpload completed_upload;
+ for (std::shared_ptr<CompletedPart> part : _completed_parts) {
+ completed_upload.AddParts(*part);
+ }
+
+ complete_request.WithMultipartUpload(completed_upload);
+
+ auto compute_outcome = _client->CompleteMultipartUpload(complete_request);
+
+ if (!compute_outcome.IsSuccess()) {
+ return Status::IOError(
+ "failed to create multi part upload (endpoint={}, bucket={}, key={}): {}",
+ _s3_conf.endpoint, _s3_conf.bucket, _path.native(),
+ compute_outcome.GetError().GetMessage());
+ }
+ _is_open = false;
+ }
+ _closed = true;
+
+ DorisMetrics::instance()->s3_file_open_writing->increment(-1);
+ DorisMetrics::instance()->s3_file_created_total->increment(1);
+ DorisMetrics::instance()->s3_bytes_written_total->increment(_bytes_appended);
+
+ LOG(INFO) << "complete multi part upload successfully (endpoint=" << _s3_conf.endpoint
+ << ", bucket=" << _s3_conf.bucket << ", key=" << _path.native()
+ << ") upload_id: " << _upload_id;
+ return Status::OK();
+}
+
+Status S3FileWriter::write_at(size_t offset, const Slice& data) {
+ return Status::NotSupported("not support");
+}
+
+} // namespace io
+} // namespace doris
diff --git a/be/src/io/fs/s3_file_writer.h b/be/src/io/fs/s3_file_writer.h
new file mode 100644
index 0000000000..d3abc19ba8
--- /dev/null
+++ b/be/src/io/fs/s3_file_writer.h
@@ -0,0 +1,79 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <cstddef>
+#include <list>
+
+#include "io/fs/file_writer.h"
+#include "util/s3_util.h"
+
+namespace Aws::S3 {
+namespace Model {
+class CompletedPart;
+}
+class S3Client;
+} // namespace Aws::S3
+
+namespace doris {
+namespace io {
+
+class S3FileWriter final : public FileWriter {
+public:
+ S3FileWriter(Path path, std::shared_ptr<Aws::S3::S3Client> client, const S3Conf& s3_conf);
+ ~S3FileWriter() override;
+
+ Status close() override;
+
+ Status abort() override;
+
+ Status append(const Slice& data) override;
+
+ Status appendv(const Slice* data, size_t data_cnt) override;
+
+ Status write_at(size_t offset, const Slice& data) override;
+
+ Status finalize() override;
+
+ size_t bytes_appended() const override { return _bytes_appended; }
+
+private:
+ Status _close();
+
+ Status _open();
+
+ Status _upload_part();
+
+ void _reset_stream();
+
+private:
+ std::shared_ptr<Aws::S3::S3Client> _client;
+ S3Conf _s3_conf;
+ std::string _upload_id;
+ bool _is_open = false;
+ bool _closed = false;
+ size_t _bytes_appended = 0;
+
+ std::shared_ptr<Aws::StringStream> _stream_ptr;
+ // Current Part Num for CompletedPart
+ int _cur_part_num = 0;
+ std::list<std::shared_ptr<Aws::S3::Model::CompletedPart>> _completed_parts;
+};
+
+} // namespace io
+} // namespace doris
diff --git a/be/src/olap/data_dir.h b/be/src/olap/data_dir.h
index 4cf40fd1bd..20b23c1405 100644
--- a/be/src/olap/data_dir.h
+++ b/be/src/olap/data_dir.h
@@ -55,7 +55,7 @@ public:
const std::string& path() const { return _path; }
size_t path_hash() const { return _path_hash; }
- const io::FileSystemPtr& fs() const { return _fs; }
+ const io::FileSystemSPtr& fs() const { return _fs; }
bool is_used() const { return _is_used; }
void set_is_used(bool is_used) { _is_used = is_used; }
@@ -169,7 +169,7 @@ private:
std::string _path;
size_t _path_hash;
- io::FileSystemPtr _fs;
+ io::FileSystemSPtr _fs;
// user specified capacity
int64_t _capacity_bytes;
// the actual available capacity of the disk of this data dir
diff --git a/be/src/olap/rowset/beta_rowset.cpp b/be/src/olap/rowset/beta_rowset.cpp
index 5f1985bd8f..0d99676ecc 100644
--- a/be/src/olap/rowset/beta_rowset.cpp
+++ b/be/src/olap/rowset/beta_rowset.cpp
@@ -38,15 +38,12 @@ namespace doris {
using io::FileCacheManager;
std::string BetaRowset::segment_file_path(int segment_id) {
- if (is_local()) {
- return local_segment_path(_tablet_path, rowset_id(), segment_id);
- }
#ifdef BE_TEST
if (!config::file_cache_type.empty()) {
- return local_segment_path(_tablet_path, rowset_id(), segment_id);
+ return segment_file_path(_tablet_path, rowset_id(), segment_id);
}
#endif
- return remote_segment_path(_rowset_meta->tablet_id(), rowset_id(), segment_id);
+ return segment_file_path(_rowset_dir, rowset_id(), segment_id);
}
std::string BetaRowset::segment_cache_path(int segment_id) {
@@ -54,34 +51,38 @@ std::string BetaRowset::segment_cache_path(int segment_id) {
return fmt::format("{}/{}_{}", _tablet_path, rowset_id().to_string(), segment_id);
}
-std::string BetaRowset::local_segment_path(const std::string& tablet_path,
- const RowsetId& rowset_id, int segment_id) {
- // {root_path}/data/{shard_id}/{tablet_id}/{schema_hash}/{rowset_id}_{seg_num}.dat
- return fmt::format("{}/{}_{}.dat", tablet_path, rowset_id.to_string(), segment_id);
+std::string BetaRowset::segment_file_path(const std::string& rowset_dir, const RowsetId& rowset_id,
+ int segment_id) {
+ // {rowset_dir}/{schema_hash}/{rowset_id}_{seg_num}.dat
+ return fmt::format("{}/{}_{}.dat", rowset_dir, rowset_id.to_string(), segment_id);
}
-std::string BetaRowset::remote_segment_path(int64_t tablet_id, const std::string& rowset_id,
- int segment_id) {
- // data/{tablet_id}/{rowset_id}_{seg_num}.dat
- return fmt::format("{}/{}/{}_{}.dat", DATA_PREFIX, tablet_id, rowset_id, segment_id);
+std::string BetaRowset::remote_tablet_path(int64_t tablet_id) {
+ // data/{tablet_id}
+ return fmt::format("{}/{}", DATA_PREFIX, tablet_id);
}
std::string BetaRowset::remote_segment_path(int64_t tablet_id, const RowsetId& rowset_id,
int segment_id) {
// data/{tablet_id}/{rowset_id}_{seg_num}.dat
- return fmt::format("{}/{}/{}_{}.dat", DATA_PREFIX, tablet_id, rowset_id.to_string(),
- segment_id);
+ return remote_segment_path(tablet_id, rowset_id.to_string(), segment_id);
}
-std::string BetaRowset::local_cache_path(const std::string& tablet_path, const RowsetId& rowset_id,
- int segment_id) {
- // {root_path}/data/{shard_id}/{tablet_id}/{schema_hash}/{rowset_id}_{seg_num}
- return fmt::format("{}/{}_{}", tablet_path, rowset_id.to_string(), segment_id);
+std::string BetaRowset::remote_segment_path(int64_t tablet_id, const std::string& rowset_id,
+ int segment_id) {
+ // data/{tablet_id}/{rowset_id}_{seg_num}.dat
+ return fmt::format("{}/{}_{}.dat", remote_tablet_path(tablet_id), rowset_id, segment_id);
}
BetaRowset::BetaRowset(TabletSchemaSPtr schema, const std::string& tablet_path,
RowsetMetaSharedPtr rowset_meta)
- : Rowset(schema, tablet_path, std::move(rowset_meta)) {}
+ : Rowset(schema, tablet_path, std::move(rowset_meta)) {
+ if (_rowset_meta->is_local()) {
+ _rowset_dir = tablet_path;
+ } else {
+ _rowset_dir = remote_tablet_path(_rowset_meta->tablet_id());
+ }
+}
BetaRowset::~BetaRowset() = default;
@@ -188,9 +189,10 @@ Status BetaRowset::link_files_to(const std::string& dir, RowsetId new_rowset_id)
return Status::OLAPInternalError(OLAP_ERR_INIT_FAILED);
}
for (int i = 0; i < num_segments(); ++i) {
- auto dst_path = local_segment_path(dir, new_rowset_id, i);
+ auto dst_path = segment_file_path(dir, new_rowset_id, i);
// TODO(lingbin): use Env API? or EnvUtil?
- if (FileUtils::check_exist(dst_path)) {
+ bool dst_path_exist = false;
+ if (!fs->exists(dst_path, &dst_path_exist).ok() || dst_path_exist) {
LOG(WARNING) << "failed to create hard link, file already exist: " << dst_path;
return Status::OLAPInternalError(OLAP_ERR_FILE_ALREADY_EXIST);
}
@@ -209,7 +211,7 @@ Status BetaRowset::link_files_to(const std::string& dir, RowsetId new_rowset_id)
Status BetaRowset::copy_files_to(const std::string& dir, const RowsetId& new_rowset_id) {
DCHECK(is_local());
for (int i = 0; i < num_segments(); ++i) {
- auto dst_path = local_segment_path(dir, new_rowset_id, i);
+ auto dst_path = segment_file_path(dir, new_rowset_id, i);
Status status = Env::Default()->path_exists(dst_path);
if (status.ok()) {
LOG(WARNING) << "file already exist: " << dst_path;
@@ -266,7 +268,12 @@ bool BetaRowset::check_path(const std::string& path) {
bool BetaRowset::check_file_exist() {
for (int i = 0; i < num_segments(); ++i) {
auto seg_path = segment_file_path(i);
- if (!Env::Default()->path_exists(seg_path).ok()) {
+ auto fs = _rowset_meta->fs();
+ if (!fs) {
+ return false;
+ }
+ bool seg_file_exist = false;
+ if (!fs->exists(seg_path, &seg_file_exist).ok() || !seg_file_exist) {
LOG(WARNING) << "data file not existed: " << seg_path
<< " for rowset_id: " << rowset_id();
return false;
diff --git a/be/src/olap/rowset/beta_rowset.h b/be/src/olap/rowset/beta_rowset.h
index dde891d6cb..1d4153f701 100644
--- a/be/src/olap/rowset/beta_rowset.h
+++ b/be/src/olap/rowset/beta_rowset.h
@@ -46,8 +46,8 @@ public:
std::string segment_cache_path(int segment_id);
- static std::string local_segment_path(const std::string& tablet_path, const RowsetId& rowset_id,
- int segment_id);
+ static std::string segment_file_path(const std::string& rowset_dir, const RowsetId& rowset_id,
+ int segment_id);
static std::string remote_segment_path(int64_t tablet_id, const RowsetId& rowset_id,
int segment_id);
@@ -55,8 +55,7 @@ public:
static std::string remote_segment_path(int64_t tablet_id, const std::string& rowset_id,
int segment_id);
- static std::string local_cache_path(const std::string& tablet_path, const RowsetId& rowset_id,
- int segment_id);
+ static std::string remote_tablet_path(int64_t tablet_id);
Status split_range(const RowCursor& start_key, const RowCursor& end_key,
uint64_t request_block_row_count, size_t key_num,
diff --git a/be/src/olap/rowset/beta_rowset_writer.cpp b/be/src/olap/rowset/beta_rowset_writer.cpp
index 13532ee269..288d8b91f7 100644
--- a/be/src/olap/rowset/beta_rowset_writer.cpp
+++ b/be/src/olap/rowset/beta_rowset_writer.cpp
@@ -53,8 +53,8 @@ BetaRowsetWriter::~BetaRowsetWriter() {
return;
}
for (int i = 0; i < _num_segment; ++i) {
- auto seg_path =
- BetaRowset::local_segment_path(_context.tablet_path, _context.rowset_id, i);
+ std::string seg_path =
+ BetaRowset::segment_file_path(_context.rowset_dir, _context.rowset_id, i);
// Even if an error is encountered, these files that have not been cleaned up
// will be cleaned up by the GC background. So here we only print the error
// message when we encounter an error.
@@ -67,8 +67,13 @@ BetaRowsetWriter::~BetaRowsetWriter() {
Status BetaRowsetWriter::init(const RowsetWriterContext& rowset_writer_context) {
_context = rowset_writer_context;
_rowset_meta.reset(new RowsetMeta);
- if (_context.data_dir) {
+ if (_context.fs == nullptr && _context.data_dir) {
_rowset_meta->set_fs(_context.data_dir->fs());
+ } else {
+ _rowset_meta->set_fs(_context.fs);
+ }
+ if (_context.fs != nullptr && _context.fs->resource_id().size() > 0) {
+ _rowset_meta->set_resource_id(_context.fs->resource_id());
}
_rowset_meta->set_rowset_id(_context.rowset_id);
_rowset_meta->set_partition_id(_context.partition_id);
@@ -156,7 +161,7 @@ template Status BetaRowsetWriter::_add_row(const ContiguousRow& row);
Status BetaRowsetWriter::add_rowset(RowsetSharedPtr rowset) {
assert(rowset->rowset_meta()->rowset_type() == BETA_ROWSET);
- RETURN_NOT_OK(rowset->link_files_to(_context.tablet_path, _context.rowset_id));
+ RETURN_NOT_OK(rowset->link_files_to(_context.rowset_dir, _context.rowset_id));
_num_rows_written += rowset->num_rows();
_total_data_size += rowset->rowset_meta()->data_disk_size();
_total_index_size += rowset->rowset_meta()->index_disk_size();
@@ -250,7 +255,7 @@ RowsetSharedPtr BetaRowsetWriter::build() {
}
RowsetSharedPtr rowset;
- auto status = RowsetFactory::create_rowset(_context.tablet_schema, _context.tablet_path,
+ auto status = RowsetFactory::create_rowset(_context.tablet_schema, _context.rowset_dir,
_rowset_meta, &rowset);
if (!status.ok()) {
LOG(WARNING) << "rowset init failed when build new rowset, res=" << status;
@@ -286,7 +291,7 @@ RowsetSharedPtr BetaRowsetWriter::build_tmp() {
_build_rowset_meta(rowset_meta_);
RowsetSharedPtr rowset;
- auto status = RowsetFactory::create_rowset(_context.tablet_schema, _context.tablet_path,
+ auto status = RowsetFactory::create_rowset(_context.tablet_schema, _context.rowset_dir,
rowset_meta_, &rowset);
if (!status.ok()) {
LOG(WARNING) << "rowset init failed when build new rowset, res=" << status;
@@ -298,8 +303,7 @@ RowsetSharedPtr BetaRowsetWriter::build_tmp() {
Status BetaRowsetWriter::_create_segment_writer(
std::unique_ptr<segment_v2::SegmentWriter>* writer) {
int32_t segment_id = _num_segment.fetch_add(1);
- auto path =
- BetaRowset::local_segment_path(_context.tablet_path, _context.rowset_id, segment_id);
+ auto path = BetaRowset::segment_file_path(_context.rowset_dir, _context.rowset_id, segment_id);
auto fs = _rowset_meta->fs();
if (!fs) {
return Status::OLAPInternalError(OLAP_ERR_INIT_FAILED);
diff --git a/be/src/olap/rowset/rowset.h b/be/src/olap/rowset/rowset.h
index bda106d462..32fda0e102 100644
--- a/be/src/olap/rowset/rowset.h
+++ b/be/src/olap/rowset/rowset.h
@@ -229,6 +229,8 @@ public:
const std::string& tablet_path() const { return _tablet_path; }
+ virtual std::string rowset_dir() { return _rowset_dir; }
+
static bool comparator(const RowsetSharedPtr& left, const RowsetSharedPtr& right) {
return left->end_version() < right->end_version();
}
@@ -291,6 +293,7 @@ protected:
TabletSchemaSPtr _schema;
std::string _tablet_path;
+ std::string _rowset_dir;
RowsetMetaSharedPtr _rowset_meta;
// init in constructor
bool _is_pending; // rowset is pending iff it's not in visible state
diff --git a/be/src/olap/rowset/rowset_meta.h b/be/src/olap/rowset/rowset_meta.h
index 69b4f65911..7154116e7f 100644
--- a/be/src/olap/rowset/rowset_meta.h
+++ b/be/src/olap/rowset/rowset_meta.h
@@ -82,7 +82,7 @@ public:
}
// This method may return nullptr.
- io::FileSystem* fs() {
+ io::FileSystemSPtr fs() {
if (!_fs) {
if (is_local()) {
return io::global_local_filesystem();
@@ -91,10 +91,10 @@ public:
LOG_IF(WARNING, !_fs) << "Cannot get file system: " << resource_id();
}
}
- return _fs.get();
+ return _fs;
}
- void set_fs(io::FileSystemPtr fs) { _fs = std::move(fs); }
+ void set_fs(io::FileSystemSPtr fs) { _fs = std::move(fs); }
const io::ResourceId& resource_id() const { return _rowset_meta_pb.resource_id(); }
@@ -396,7 +396,7 @@ private:
RowsetMetaPB _rowset_meta_pb;
TabletSchemaSPtr _schema = nullptr;
RowsetId _rowset_id;
- io::FileSystemPtr _fs;
+ io::FileSystemSPtr _fs;
bool _is_removed_from_rowset_meta = false;
};
diff --git a/be/src/olap/rowset/rowset_writer_context.h b/be/src/olap/rowset/rowset_writer_context.h
index b01e6fca46..8fef7bb16a 100644
--- a/be/src/olap/rowset/rowset_writer_context.h
+++ b/be/src/olap/rowset/rowset_writer_context.h
@@ -34,6 +34,7 @@ struct RowsetWriterContext {
tablet_schema_hash(0),
partition_id(0),
rowset_type(ALPHA_ROWSET),
+ fs(nullptr),
tablet_schema(nullptr),
rowset_state(PREPARED),
version(Version(0, 0)),
@@ -54,7 +55,7 @@ struct RowsetWriterContext {
context.partition_id = new_tablet->partition_id();
context.tablet_schema_hash = new_tablet->schema_hash();
context.rowset_type = new_rowset_type;
- context.tablet_path = new_tablet->tablet_path();
+ context.rowset_dir = new_tablet->tablet_path();
context.tablet_schema = new_tablet->tablet_schema();
context.data_dir = new_tablet->data_dir();
context.rowset_state = VISIBLE;
@@ -69,7 +70,8 @@ struct RowsetWriterContext {
int64_t tablet_schema_hash;
int64_t partition_id;
RowsetTypePB rowset_type;
- std::string tablet_path;
+ io::FileSystemSPtr fs = nullptr;
+ std::string rowset_dir = "";
TabletSchemaSPtr tablet_schema;
// PREPARED/COMMITTED for pending rowset
// VISIBLE for non-pending rowset
diff --git a/be/src/olap/rowset/segment_v2/segment.cpp b/be/src/olap/rowset/segment_v2/segment.cpp
index cc59a15ae1..056665d295 100644
--- a/be/src/olap/rowset/segment_v2/segment.cpp
+++ b/be/src/olap/rowset/segment_v2/segment.cpp
@@ -42,7 +42,7 @@ namespace segment_v2 {
using io::FileCacheManager;
-Status Segment::open(io::FileSystem* fs, const std::string& path, const std::string& cache_path,
+Status Segment::open(io::FileSystemSPtr fs, const std::string& path, const std::string& cache_path,
uint32_t segment_id, TabletSchemaSPtr tablet_schema,
std::shared_ptr<Segment>* output) {
std::shared_ptr<Segment> segment(new Segment(segment_id, tablet_schema));
diff --git a/be/src/olap/rowset/segment_v2/segment.h b/be/src/olap/rowset/segment_v2/segment.h
index fcdf965fd5..a3dcd8c6c6 100644
--- a/be/src/olap/rowset/segment_v2/segment.h
+++ b/be/src/olap/rowset/segment_v2/segment.h
@@ -61,9 +61,9 @@ using SegmentSharedPtr = std::shared_ptr<Segment>;
// change finished, client should disable all cached Segment for old TabletSchema.
class Segment : public std::enable_shared_from_this<Segment> {
public:
- static Status open(io::FileSystem* fs, const std::string& path, const std::string& cache_path,
- uint32_t segment_id, TabletSchemaSPtr tablet_schema,
- std::shared_ptr<Segment>* output);
+ static Status open(io::FileSystemSPtr fs, const std::string& path,
+ const std::string& cache_path, uint32_t segment_id,
+ TabletSchemaSPtr tablet_schema, std::shared_ptr<Segment>* output);
~Segment();
diff --git a/be/src/olap/schema_change.cpp b/be/src/olap/schema_change.cpp
index 45a8e4268b..c4caf90567 100644
--- a/be/src/olap/schema_change.cpp
+++ b/be/src/olap/schema_change.cpp
@@ -2186,7 +2186,7 @@ Status SchemaChangeHandler::_convert_historical_rowsets(const SchemaChangeParams
rs_reader->version(), VISIBLE,
rs_reader->rowset()->rowset_meta()->segments_overlap(), new_tablet->tablet_schema(),
rs_reader->oldest_write_timestamp(), rs_reader->newest_write_timestamp(),
- &rowset_writer);
+ rs_reader->rowset()->rowset_meta()->fs(), &rowset_writer);
if (!status.ok()) {
res = Status::OLAPInternalError(OLAP_ERR_ROWSET_BUILDER_INIT);
return process_alter_exit();
@@ -2374,6 +2374,18 @@ Status SchemaChangeHandler::_parse_request(const SchemaChangeParams& sc_params,
*sc_directly = true;
}
+ // if rs_reader has remote files, link schema change is not supported,
+ // use directly schema change instead.
+ if (!(*sc_directly) && !(*sc_sorting)) {
+ // check has remote rowset
+ for (auto& rs_reader : sc_params.ref_rowset_readers) {
+ if (!rs_reader->rowset()->is_local()) {
+ *sc_directly = true;
+ break;
+ }
+ }
+ }
+
return Status::OK();
}
diff --git a/be/src/olap/snapshot_manager.cpp b/be/src/olap/snapshot_manager.cpp
index a6f2fa0764..904ef62baf 100644
--- a/be/src/olap/snapshot_manager.cpp
+++ b/be/src/olap/snapshot_manager.cpp
@@ -255,7 +255,7 @@ Status SnapshotManager::_rename_rowset_id(const RowsetMetaPB& rs_meta_pb,
context.partition_id = org_rowset_meta->partition_id();
context.tablet_schema_hash = org_rowset_meta->tablet_schema_hash();
context.rowset_type = org_rowset_meta->rowset_type();
- context.tablet_path = new_tablet_path;
+ context.rowset_dir = new_tablet_path;
context.tablet_schema =
org_rowset_meta->tablet_schema() ? org_rowset_meta->tablet_schema() : tablet_schema;
context.rowset_state = org_rowset_meta->rowset_state();
diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp
index 4524f1e89f..58f6f226d8 100644
--- a/be/src/olap/tablet.cpp
+++ b/be/src/olap/tablet.cpp
@@ -1661,6 +1661,16 @@ Status Tablet::create_rowset_writer(const Version& version, const RowsetStatePB&
TabletSchemaSPtr tablet_schema, int64_t oldest_write_timestamp,
int64_t newest_write_timestamp,
std::unique_ptr<RowsetWriter>* rowset_writer) {
+ return create_rowset_writer(version, rowset_state, overlap, tablet_schema,
+ oldest_write_timestamp, newest_write_timestamp, nullptr,
+ rowset_writer);
+}
+
+Status Tablet::create_rowset_writer(const Version& version, const RowsetStatePB& rowset_state,
+ const SegmentsOverlapPB& overlap,
+ TabletSchemaSPtr tablet_schema, int64_t oldest_write_timestamp,
+ int64_t newest_write_timestamp, io::FileSystemSPtr fs,
+ std::unique_ptr<RowsetWriter>* rowset_writer) {
RowsetWriterContext context;
context.version = version;
context.rowset_state = rowset_state;
@@ -1669,6 +1679,7 @@ Status Tablet::create_rowset_writer(const Version& version, const RowsetStatePB&
context.newest_write_timestamp = newest_write_timestamp;
context.tablet_schema = tablet_schema;
context.enable_unique_key_merge_on_write = enable_unique_key_merge_on_write();
+ context.fs = fs;
_init_context_common_fields(context);
return RowsetFactory::create_rowset_writer(context, rowset_writer);
}
@@ -1704,7 +1715,11 @@ void Tablet::_init_context_common_fields(RowsetWriterContext& context) {
if (context.rowset_type == ALPHA_ROWSET) {
context.rowset_type = StorageEngine::instance()->default_rowset_type();
}
- context.tablet_path = tablet_path();
+ if (context.fs != nullptr && context.fs->type() != io::FileSystemType::LOCAL) {
+ context.rowset_dir = BetaRowset::remote_tablet_path(tablet_id());
+ } else {
+ context.rowset_dir = tablet_path();
+ }
context.data_dir = data_dir();
}
diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h
index 938c1f38a2..318fdc8748 100644
--- a/be/src/olap/tablet.h
+++ b/be/src/olap/tablet.h
@@ -301,6 +301,12 @@ public:
int64_t oldest_write_timestamp, int64_t newest_write_timestamp,
std::unique_ptr<RowsetWriter>* rowset_writer);
+ Status create_rowset_writer(const Version& version, const RowsetStatePB& rowset_state,
+ const SegmentsOverlapPB& overlap, TabletSchemaSPtr tablet_schema,
+ int64_t oldest_write_timestamp, int64_t newest_write_timestamp,
+ io::FileSystemSPtr fs,
+ std::unique_ptr<RowsetWriter>* rowset_writer);
+
Status create_rowset_writer(const int64_t& txn_id, const PUniqueId& load_id,
const RowsetStatePB& rowset_state, const SegmentsOverlapPB& overlap,
TabletSchemaSPtr tablet_schema,
diff --git a/be/src/olap/tablet_meta.cpp b/be/src/olap/tablet_meta.cpp
index 6a3555575c..70348ba110 100644
--- a/be/src/olap/tablet_meta.cpp
+++ b/be/src/olap/tablet_meta.cpp
@@ -397,7 +397,7 @@ Status TabletMeta::deserialize(const string& meta_binary) {
return Status::OK();
}
-void TabletMeta::init_rs_metas_fs(const io::FileSystemPtr& fs) {
+void TabletMeta::init_rs_metas_fs(const io::FileSystemSPtr& fs) {
for (auto& rs_meta : _rs_metas) {
if (rs_meta->is_local()) {
rs_meta->set_fs(fs);
diff --git a/be/src/olap/tablet_meta.h b/be/src/olap/tablet_meta.h
index d61a20dfa3..3655b71602 100644
--- a/be/src/olap/tablet_meta.h
+++ b/be/src/olap/tablet_meta.h
@@ -111,7 +111,7 @@ public:
Status deserialize(const std::string& meta_binary);
void init_from_pb(const TabletMetaPB& tablet_meta_pb);
// Init `RowsetMeta._fs` if rowset is local.
- void init_rs_metas_fs(const io::FileSystemPtr& fs);
+ void init_rs_metas_fs(const io::FileSystemSPtr& fs);
void to_meta_pb(TabletMetaPB* tablet_meta_pb);
void to_json(std::string* json_string, json2pb::Pb2JsonOptions& options);
diff --git a/be/src/util/doris_metrics.cpp b/be/src/util/doris_metrics.cpp
index f8a27e4f6a..15be14b06e 100644
--- a/be/src/util/doris_metrics.cpp
+++ b/be/src/util/doris_metrics.cpp
@@ -168,14 +168,18 @@ DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(upload_fail_count, MetricUnit::ROWSETS);
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(local_file_reader_total, MetricUnit::FILESYSTEM);
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(s3_file_reader_total, MetricUnit::FILESYSTEM);
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(local_file_writer_total, MetricUnit::FILESYSTEM);
+DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(s3_file_writer_total, MetricUnit::FILESYSTEM);
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(file_created_total, MetricUnit::FILESYSTEM);
+DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(s3_file_created_total, MetricUnit::FILESYSTEM);
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(local_bytes_read_total, MetricUnit::FILESYSTEM);
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(s3_bytes_read_total, MetricUnit::FILESYSTEM);
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(local_bytes_written_total, MetricUnit::FILESYSTEM);
+DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(s3_bytes_written_total, MetricUnit::FILESYSTEM);
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(local_file_open_reading, MetricUnit::FILESYSTEM);
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(s3_file_open_reading, MetricUnit::FILESYSTEM);
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(local_file_open_writing, MetricUnit::FILESYSTEM);
+DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(s3_file_open_writing, MetricUnit::FILESYSTEM);
const std::string DorisMetrics::_s_registry_name = "doris_be";
const std::string DorisMetrics::_s_hook_name = "doris_metrics";
@@ -289,13 +293,17 @@ DorisMetrics::DorisMetrics() : _metric_registry(_s_registry_name) {
INT_COUNTER_METRIC_REGISTER(_server_metric_entity, local_file_reader_total);
INT_COUNTER_METRIC_REGISTER(_server_metric_entity, s3_file_reader_total);
INT_COUNTER_METRIC_REGISTER(_server_metric_entity, local_file_writer_total);
+ INT_COUNTER_METRIC_REGISTER(_server_metric_entity, s3_file_writer_total);
INT_COUNTER_METRIC_REGISTER(_server_metric_entity, file_created_total);
+ INT_COUNTER_METRIC_REGISTER(_server_metric_entity, s3_file_created_total);
INT_COUNTER_METRIC_REGISTER(_server_metric_entity, local_bytes_read_total);
INT_COUNTER_METRIC_REGISTER(_server_metric_entity, s3_bytes_read_total);
INT_COUNTER_METRIC_REGISTER(_server_metric_entity, local_bytes_written_total);
+ INT_COUNTER_METRIC_REGISTER(_server_metric_entity, s3_bytes_written_total);
INT_GAUGE_METRIC_REGISTER(_server_metric_entity, local_file_open_reading);
INT_GAUGE_METRIC_REGISTER(_server_metric_entity, s3_file_open_reading);
INT_GAUGE_METRIC_REGISTER(_server_metric_entity, local_file_open_writing);
+ INT_GAUGE_METRIC_REGISTER(_server_metric_entity, s3_file_open_writing);
}
void DorisMetrics::initialize(bool init_system_metrics, const std::set<std::string>& disk_devices,
diff --git a/be/src/util/doris_metrics.h b/be/src/util/doris_metrics.h
index da9085671e..fe0689f23c 100644
--- a/be/src/util/doris_metrics.h
+++ b/be/src/util/doris_metrics.h
@@ -155,13 +155,17 @@ public:
IntCounter* local_file_reader_total;
IntCounter* s3_file_reader_total;
IntCounter* local_file_writer_total;
+ IntCounter* s3_file_writer_total;
IntCounter* file_created_total;
+ IntCounter* s3_file_created_total;
IntCounter* local_bytes_read_total;
IntCounter* s3_bytes_read_total;
IntCounter* local_bytes_written_total;
+ IntCounter* s3_bytes_written_total;
IntGauge* local_file_open_reading;
IntGauge* s3_file_open_reading;
IntGauge* local_file_open_writing;
+ IntGauge* s3_file_open_writing;
// Size of some global containers
UIntGauge* rowset_count_generated_and_in_use;
diff --git a/be/test/olap/rowid_conversion_test.cpp b/be/test/olap/rowid_conversion_test.cpp
index d105223f55..27b43fec3c 100644
--- a/be/test/olap/rowid_conversion_test.cpp
+++ b/be/test/olap/rowid_conversion_test.cpp
@@ -131,7 +131,7 @@ protected:
rowset_writer_context->data_dir = _data_dir.get();
rowset_writer_context->rowset_state = VISIBLE;
rowset_writer_context->tablet_schema = tablet_schema;
- rowset_writer_context->tablet_path = "tablet_path";
+ rowset_writer_context->rowset_dir = "tablet_path";
rowset_writer_context->version = Version(inc_id, inc_id);
rowset_writer_context->segments_overlap = overlap;
rowset_writer_context->max_rows_per_segment = max_rows_per_segment;
diff --git a/be/test/olap/rowset/beta_rowset_test.cpp b/be/test/olap/rowset/beta_rowset_test.cpp
index 5dd53cd490..2df689fc04 100644
--- a/be/test/olap/rowset/beta_rowset_test.cpp
+++ b/be/test/olap/rowset/beta_rowset_test.cpp
@@ -147,7 +147,7 @@ protected:
rowset_writer_context->tablet_schema_hash = 1111;
rowset_writer_context->partition_id = 10;
rowset_writer_context->rowset_type = BETA_ROWSET;
- rowset_writer_context->tablet_path = kTestDir;
+ rowset_writer_context->rowset_dir = kTestDir;
rowset_writer_context->rowset_state = VISIBLE;
rowset_writer_context->tablet_schema = tablet_schema;
rowset_writer_context->version.first = 10;
diff --git a/be/test/olap/rowset/segment_v2/bitmap_index_test.cpp b/be/test/olap/rowset/segment_v2/bitmap_index_test.cpp
index 3d2d23abed..fc1e9cd62f 100644
--- a/be/test/olap/rowset/segment_v2/bitmap_index_test.cpp
+++ b/be/test/olap/rowset/segment_v2/bitmap_index_test.cpp
@@ -35,6 +35,9 @@
#include "util/file_utils.h"
namespace doris {
+
+using FileSystemSPtr = std::shared_ptr<io::FileSystem>;
+
namespace segment_v2 {
using roaring::Roaring;
@@ -56,7 +59,7 @@ public:
};
template <FieldType type>
-void write_index_file(const std::string& filename, io::FileSystem* fs, const void* values,
+void write_index_file(const std::string& filename, FileSystemSPtr fs, const void* values,
size_t value_count, size_t null_count, ColumnIndexMetaPB* meta) {
const auto* type_info = get_scalar_type_info<type>();
{
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org