You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by yi...@apache.org on 2022/05/19 15:57:09 UTC
[incubator-doris] branch master updated: [refactor][rowset]move rowset writer to a single place (#9368)
This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new 2c79d223e4 [refactor][rowset]move rowset writer to a single place (#9368)
2c79d223e4 is described below
commit 2c79d223e48eb26573a39c16b55f7a571ed59f6e
Author: yiguolei <67...@qq.com>
AuthorDate: Thu May 19 23:57:02 2022 +0800
[refactor][rowset]move rowset writer to a single place (#9368)
---
be/src/olap/base_tablet.h | 10 ----
be/src/olap/compaction.cpp | 22 +------
be/src/olap/data_dir.cpp | 6 +-
be/src/olap/delta_writer.cpp | 25 +-------
be/src/olap/push_handler.cpp | 47 ++-------------
be/src/olap/schema_change.cpp | 88 +++++----------------------
be/src/olap/schema_change.h | 3 +-
be/src/olap/storage_migration_v2.cpp | 2 +-
be/src/olap/tablet.cpp | 96 ++++++++++++++++++++++++++----
be/src/olap/tablet.h | 15 ++++-
be/src/olap/tablet_manager.cpp | 111 +++++------------------------------
be/src/olap/tablet_manager.h | 1 -
be/src/olap/tablet_meta.cpp | 1 +
13 files changed, 145 insertions(+), 282 deletions(-)
diff --git a/be/src/olap/base_tablet.h b/be/src/olap/base_tablet.h
index 20fe1a5f16..7c97247958 100644
--- a/be/src/olap/base_tablet.h
+++ b/be/src/olap/base_tablet.h
@@ -56,8 +56,6 @@ public:
int64_t tablet_id() const;
int32_t schema_hash() const;
int16_t shard_id();
- const int64_t creation_time() const;
- void set_creation_time(int64_t creation_time);
bool equal(int64_t tablet_id, int32_t schema_hash);
// properties encapsulated in TabletSchema
@@ -133,14 +131,6 @@ inline int16_t BaseTablet::shard_id() {
return _tablet_meta->shard_id();
}
-inline const int64_t BaseTablet::creation_time() const {
- return _tablet_meta->creation_time();
-}
-
-inline void BaseTablet::set_creation_time(int64_t creation_time) {
- _tablet_meta->set_creation_time(creation_time);
-}
-
inline bool BaseTablet::equal(int64_t id, int32_t hash) {
return (tablet_id() == id) && (schema_hash() == hash);
}
diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp
index 245a0fd4eb..fdc07fc725 100644
--- a/be/src/olap/compaction.cpp
+++ b/be/src/olap/compaction.cpp
@@ -18,7 +18,6 @@
#include "olap/compaction.h"
#include "gutil/strings/substitute.h"
-#include "olap/rowset/rowset_factory.h"
#include "util/time.h"
#include "util/trace.h"
@@ -160,25 +159,8 @@ Status Compaction::do_compaction_impl(int64_t permits) {
}
Status Compaction::construct_output_rowset_writer() {
- RowsetWriterContext context;
- context.rowset_id = StorageEngine::instance()->next_rowset_id();
- context.tablet_uid = _tablet->tablet_uid();
- context.tablet_id = _tablet->tablet_id();
- context.partition_id = _tablet->partition_id();
- context.tablet_schema_hash = _tablet->schema_hash();
- context.data_dir = _tablet->data_dir();
- context.rowset_type = StorageEngine::instance()->default_rowset_type();
- if (_tablet->tablet_meta()->preferred_rowset_type() == BETA_ROWSET) {
- context.rowset_type = BETA_ROWSET;
- }
- context.path_desc = _tablet->tablet_path_desc();
- context.tablet_schema = &(_tablet->tablet_schema());
- context.rowset_state = VISIBLE;
- context.version = _output_version;
- context.segments_overlap = NONOVERLAPPING;
- // The test results show that one rs writer is low-memory-footprint, there is no need to tracker its mem pool
- RETURN_NOT_OK(RowsetFactory::create_rowset_writer(context, &_output_rs_writer));
- return Status::OK();
+ return _tablet->create_rowset_writer(_output_version, VISIBLE, NONOVERLAPPING,
+ &_output_rs_writer);
}
Status Compaction::construct_input_rowset_readers() {
diff --git a/be/src/olap/data_dir.cpp b/be/src/olap/data_dir.cpp
index 862d5e02eb..13f4648690 100644
--- a/be/src/olap/data_dir.cpp
+++ b/be/src/olap/data_dir.cpp
@@ -37,7 +37,6 @@
#include "olap/file_helper.h"
#include "olap/olap_define.h"
#include "olap/rowset/alpha_rowset_meta.h"
-#include "olap/rowset/rowset_factory.h"
#include "olap/rowset/rowset_meta_manager.h"
#include "olap/storage_engine.h"
#include "olap/tablet_meta_manager.h"
@@ -469,8 +468,7 @@ Status DataDir::load() {
continue;
}
RowsetSharedPtr rowset;
- Status create_status = RowsetFactory::create_rowset(
- &tablet->tablet_schema(), tablet->tablet_path_desc(), rowset_meta, &rowset);
+ Status create_status = tablet->create_rowset(rowset_meta, &rowset);
if (!create_status) {
LOG(WARNING) << "could not create rowset from rowsetmeta: "
<< " rowset_id: " << rowset_meta->rowset_id()
@@ -498,7 +496,7 @@ Status DataDir::load() {
}
} else if (rowset_meta->rowset_state() == RowsetStatePB::VISIBLE &&
rowset_meta->tablet_uid() == tablet->tablet_uid()) {
- Status publish_status = tablet->add_rowset(rowset, false);
+ Status publish_status = tablet->add_rowset(rowset);
if (!publish_status &&
publish_status.precise_code() != OLAP_ERR_PUSH_VERSION_ALREADY_EXIST) {
LOG(WARNING) << "add visible rowset to tablet failed rowset_id:"
diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp
index b40677e2b0..613a742195 100644
--- a/be/src/olap/delta_writer.cpp
+++ b/be/src/olap/delta_writer.cpp
@@ -20,7 +20,6 @@
#include "olap/data_dir.h"
#include "olap/memtable.h"
#include "olap/memtable_flush_executor.h"
-#include "olap/rowset/rowset_factory.h"
#include "olap/schema.h"
#include "olap/schema_change.h"
#include "olap/storage_engine.h"
@@ -115,33 +114,15 @@ Status DeltaWriter::init() {
_req.txn_id, _req.load_id));
}
- RowsetWriterContext writer_context;
- writer_context.rowset_id = _storage_engine->next_rowset_id();
- writer_context.tablet_uid = _tablet->tablet_uid();
- writer_context.tablet_id = _req.tablet_id;
- writer_context.partition_id = _req.partition_id;
- writer_context.tablet_schema_hash = _req.schema_hash;
- if (_tablet->tablet_meta()->preferred_rowset_type() == BETA_ROWSET) {
- writer_context.rowset_type = BETA_ROWSET;
- } else {
- writer_context.rowset_type = ALPHA_ROWSET;
- }
- writer_context.path_desc = _tablet->tablet_path_desc();
- writer_context.tablet_schema = &(_tablet->tablet_schema());
- writer_context.rowset_state = PREPARED;
- writer_context.txn_id = _req.txn_id;
- writer_context.load_id = _req.load_id;
- writer_context.segments_overlap = OVERLAPPING;
- writer_context.data_dir = _tablet->data_dir();
- RETURN_NOT_OK(RowsetFactory::create_rowset_writer(writer_context, &_rowset_writer));
-
+ RETURN_NOT_OK(_tablet->create_rowset_writer(_req.txn_id, _req.load_id, PREPARED, OVERLAPPING,
+ &_rowset_writer));
_tablet_schema = &(_tablet->tablet_schema());
_schema.reset(new Schema(*_tablet_schema));
_reset_mem_table();
// create flush handler
RETURN_NOT_OK(_storage_engine->memtable_flush_executor()->create_flush_token(
- &_flush_token, writer_context.rowset_type, _req.is_high_priority));
+ &_flush_token, _rowset_writer->type(), _req.is_high_priority));
_is_init = true;
return Status::OK();
diff --git a/be/src/olap/push_handler.cpp b/be/src/olap/push_handler.cpp
index 9e1bcaccee..02e0d46dc9 100644
--- a/be/src/olap/push_handler.cpp
+++ b/be/src/olap/push_handler.cpp
@@ -25,7 +25,6 @@
#include "common/status.h"
#include "exec/parquet_scanner.h"
#include "olap/row.h"
-#include "olap/rowset/rowset_factory.h"
#include "olap/rowset/rowset_id_generator.h"
#include "olap/rowset/rowset_meta_manager.h"
#include "olap/schema_change.h"
@@ -221,29 +220,12 @@ Status PushHandler::_convert_v2(TabletSharedPtr cur_tablet, TabletSharedPtr new_
// 1. init RowsetBuilder of cur_tablet for current push
VLOG_NOTICE << "init rowset builder. tablet=" << cur_tablet->full_name()
<< ", block_row_size=" << cur_tablet->num_rows_per_row_block();
- RowsetWriterContext context;
- context.rowset_id = StorageEngine::instance()->next_rowset_id();
- context.tablet_uid = cur_tablet->tablet_uid();
- context.tablet_id = cur_tablet->tablet_id();
- context.partition_id = _request.partition_id;
- context.tablet_schema_hash = cur_tablet->schema_hash();
- context.data_dir = cur_tablet->data_dir();
- context.rowset_type = StorageEngine::instance()->default_rowset_type();
- if (cur_tablet->tablet_meta()->preferred_rowset_type() == BETA_ROWSET) {
- context.rowset_type = BETA_ROWSET;
- }
- context.path_desc = cur_tablet->tablet_path_desc();
- context.tablet_schema = &(cur_tablet->tablet_schema());
- context.rowset_state = PREPARED;
- context.txn_id = _request.transaction_id;
- context.load_id = load_id;
// although the spark load output files are fully sorted,
// but it depends on thirparty implementation, so we conservatively
// set this value to OVERLAP_UNKNOWN
- context.segments_overlap = OVERLAP_UNKNOWN;
-
std::unique_ptr<RowsetWriter> rowset_writer;
- res = RowsetFactory::create_rowset_writer(context, &rowset_writer);
+ res = cur_tablet->create_rowset_writer(_request.transaction_id, load_id, PREPARED,
+ OVERLAP_UNKNOWN, &rowset_writer);
if (!res.ok()) {
LOG(WARNING) << "failed to init rowset writer, tablet=" << cur_tablet->full_name()
<< ", txn_id=" << _request.transaction_id << ", res=" << res;
@@ -407,30 +389,9 @@ Status PushHandler::_convert(TabletSharedPtr cur_tablet, TabletSharedPtr new_tab
}
// 2. init RowsetBuilder of cur_tablet for current push
- VLOG_NOTICE << "init RowsetBuilder.";
- RowsetWriterContext context;
- context.rowset_id = StorageEngine::instance()->next_rowset_id();
- context.tablet_uid = cur_tablet->tablet_uid();
- context.tablet_id = cur_tablet->tablet_id();
- context.partition_id = _request.partition_id;
- context.tablet_schema_hash = cur_tablet->schema_hash();
- context.data_dir = cur_tablet->data_dir();
- context.rowset_type = StorageEngine::instance()->default_rowset_type();
- if (cur_tablet->tablet_meta()->preferred_rowset_type() == BETA_ROWSET) {
- context.rowset_type = BETA_ROWSET;
- }
- context.path_desc = cur_tablet->tablet_path_desc();
- context.tablet_schema = &(cur_tablet->tablet_schema());
- context.rowset_state = PREPARED;
- context.txn_id = _request.transaction_id;
- context.load_id = load_id;
- // although the hadoop load output files are fully sorted,
- // but it depends on thirparty implementation, so we conservatively
- // set this value to OVERLAP_UNKNOWN
- context.segments_overlap = OVERLAP_UNKNOWN;
-
std::unique_ptr<RowsetWriter> rowset_writer;
- res = RowsetFactory::create_rowset_writer(context, &rowset_writer);
+ res = cur_tablet->create_rowset_writer(_request.transaction_id, load_id, PREPARED,
+ OVERLAP_UNKNOWN, &rowset_writer);
if (!res.ok()) {
LOG(WARNING) << "failed to init rowset writer, tablet=" << cur_tablet->full_name()
<< ", txn_id=" << _request.transaction_id << ", res=" << res;
diff --git a/be/src/olap/schema_change.cpp b/be/src/olap/schema_change.cpp
index 871f6bb64a..b382c862ea 100644
--- a/be/src/olap/schema_change.cpp
+++ b/be/src/olap/schema_change.cpp
@@ -29,7 +29,6 @@
#include "olap/row.h"
#include "olap/row_block.h"
#include "olap/row_cursor.h"
-#include "olap/rowset/rowset_factory.h"
#include "olap/rowset/rowset_id_generator.h"
#include "olap/storage_engine.h"
#include "olap/tablet.h"
@@ -1160,8 +1159,6 @@ Status SchemaChangeWithSorting::process(RowsetReaderSharedPtr rowset_reader,
reset_merged_rows();
reset_filtered_rows();
- bool use_beta_rowset = new_tablet->tablet_meta()->preferred_rowset_type() == BETA_ROWSET;
-
SegmentsOverlapPB segments_overlap = rowset->rowset_meta()->segments_overlap();
RowBlock* ref_row_block = nullptr;
rowset_reader->next_block(&ref_row_block);
@@ -1191,14 +1188,10 @@ Status SchemaChangeWithSorting::process(RowsetReaderSharedPtr rowset_reader,
// enter here while memory limitation is reached.
RowsetSharedPtr rowset;
- RowsetTypePB new_rowset_type = rowset_reader->rowset()->rowset_meta()->rowset_type();
- if (use_beta_rowset) {
- new_rowset_type = BETA_ROWSET;
- }
if (!_internal_sorting(
row_block_arr,
Version(_temp_delta_versions.second, _temp_delta_versions.second),
- new_tablet, new_rowset_type, segments_overlap, &rowset)) {
+ new_tablet, segments_overlap, &rowset)) {
LOG(WARNING) << "failed to sorting internally.";
return Status::OLAPInternalError(OLAP_ERR_ALTER_STATUS_ERR);
}
@@ -1247,13 +1240,9 @@ Status SchemaChangeWithSorting::process(RowsetReaderSharedPtr rowset_reader,
// enter here while memory limitation is reached.
RowsetSharedPtr rowset = nullptr;
- RowsetTypePB new_rowset_type = rowset_reader->rowset()->rowset_meta()->rowset_type();
- if (use_beta_rowset) {
- new_rowset_type = BETA_ROWSET;
- }
if (!_internal_sorting(row_block_arr,
Version(_temp_delta_versions.second, _temp_delta_versions.second),
- new_tablet, new_rowset_type, segments_overlap, &rowset)) {
+ new_tablet, segments_overlap, &rowset)) {
LOG(WARNING) << "failed to sorting internally.";
return Status::OLAPInternalError(OLAP_ERR_ALTER_STATUS_ERR);
}
@@ -1305,31 +1294,16 @@ Status SchemaChangeWithSorting::process(RowsetReaderSharedPtr rowset_reader,
bool SchemaChangeWithSorting::_internal_sorting(const std::vector<RowBlock*>& row_block_arr,
const Version& version, TabletSharedPtr new_tablet,
- RowsetTypePB new_rowset_type,
SegmentsOverlapPB segments_overlap,
RowsetSharedPtr* rowset) {
uint64_t merged_rows = 0;
RowBlockMerger merger(new_tablet);
- RowsetWriterContext context;
- context.rowset_id = StorageEngine::instance()->next_rowset_id();
- context.tablet_uid = new_tablet->tablet_uid();
- context.tablet_id = new_tablet->tablet_id();
- context.partition_id = new_tablet->partition_id();
- context.tablet_schema_hash = new_tablet->schema_hash();
- context.rowset_type = new_rowset_type;
- context.path_desc = new_tablet->tablet_path_desc();
- context.tablet_schema = &(new_tablet->tablet_schema());
- context.data_dir = new_tablet->data_dir();
- context.rowset_state = VISIBLE;
- context.version = version;
- context.segments_overlap = segments_overlap;
-
VLOG_NOTICE << "init rowset builder. tablet=" << new_tablet->full_name()
<< ", block_row_size=" << new_tablet->num_rows_per_row_block();
std::unique_ptr<RowsetWriter> rowset_writer;
- if (RowsetFactory::create_rowset_writer(context, &rowset_writer) != Status::OK()) {
+ if (!new_tablet->create_rowset_writer(version, VISIBLE, segments_overlap, &rowset_writer)) {
return false;
}
@@ -1699,28 +1673,13 @@ Status SchemaChangeHandler::schema_version_convert(TabletSharedPtr base_tablet,
RowsetReaderSharedPtr rowset_reader;
RETURN_NOT_OK((*base_rowset)->create_reader(&rowset_reader));
RETURN_NOT_OK(rowset_reader->init(&reader_context));
-
- RowsetWriterContext writer_context;
- writer_context.rowset_id = StorageEngine::instance()->next_rowset_id();
- writer_context.tablet_uid = new_tablet->tablet_uid();
- writer_context.tablet_id = new_tablet->tablet_id();
- writer_context.partition_id = (*base_rowset)->partition_id();
- writer_context.tablet_schema_hash = new_tablet->schema_hash();
- writer_context.data_dir = new_tablet->data_dir();
- writer_context.rowset_type = (*base_rowset)->rowset_meta()->rowset_type();
- if (new_tablet->tablet_meta()->preferred_rowset_type() == BETA_ROWSET) {
- writer_context.rowset_type = BETA_ROWSET;
- }
- writer_context.path_desc = new_tablet->tablet_path_desc();
- writer_context.tablet_schema = &(new_tablet->tablet_schema());
- writer_context.rowset_state = PREPARED;
- writer_context.txn_id = (*base_rowset)->txn_id();
- writer_context.load_id.set_hi((*base_rowset)->load_id().hi());
- writer_context.load_id.set_lo((*base_rowset)->load_id().lo());
- writer_context.segments_overlap = (*base_rowset)->rowset_meta()->segments_overlap();
-
+ PUniqueId load_id;
+ load_id.set_hi((*base_rowset)->load_id().hi());
+ load_id.set_lo((*base_rowset)->load_id().lo());
std::unique_ptr<RowsetWriter> rowset_writer;
- RowsetFactory::create_rowset_writer(writer_context, &rowset_writer);
+ RETURN_NOT_OK(new_tablet->create_rowset_writer(
+ (*base_rowset)->txn_id(), load_id, PREPARED,
+ (*base_rowset)->rowset_meta()->segments_overlap(), &rowset_writer));
if ((res = sc_procedure->process(rowset_reader, rowset_writer.get(), new_tablet,
base_tablet)) != Status::OK()) {
@@ -1842,29 +1801,12 @@ Status SchemaChangeHandler::_convert_historical_rowsets(const SchemaChangeParams
// As long as there is a new_table as running, ref table is set as running
// NOTE If the first sub_table fails first, it will continue to go as normal here
TabletSharedPtr new_tablet = sc_params.new_tablet;
-
- RowsetWriterContext writer_context;
- writer_context.rowset_id = StorageEngine::instance()->next_rowset_id();
- writer_context.tablet_uid = new_tablet->tablet_uid();
- writer_context.tablet_id = new_tablet->tablet_id();
- writer_context.partition_id = new_tablet->partition_id();
- writer_context.tablet_schema_hash = new_tablet->schema_hash();
- writer_context.data_dir = new_tablet->data_dir();
- // linked schema change can't change rowset type, therefore we preserve rowset type in schema change now
- writer_context.rowset_type = rs_reader->rowset()->rowset_meta()->rowset_type();
- if (sc_params.new_tablet->tablet_meta()->preferred_rowset_type() == BETA_ROWSET) {
- // Use beta rowset to do schema change
- // And in this case, linked schema change will not be used.
- writer_context.rowset_type = BETA_ROWSET;
- }
- writer_context.path_desc = new_tablet->tablet_path_desc();
- writer_context.tablet_schema = &(new_tablet->tablet_schema());
- writer_context.rowset_state = VISIBLE;
- writer_context.version = rs_reader->version();
- writer_context.segments_overlap = rs_reader->rowset()->rowset_meta()->segments_overlap();
-
+ // When tablet create new rowset writer, it may change rowset type, in this case
+ // linked schema change will not be used.
std::unique_ptr<RowsetWriter> rowset_writer;
- Status status = RowsetFactory::create_rowset_writer(writer_context, &rowset_writer);
+ Status status = new_tablet->create_rowset_writer(
+ rs_reader->version(), VISIBLE,
+ rs_reader->rowset()->rowset_meta()->segments_overlap(), &rowset_writer);
if (!status.ok()) {
res = Status::OLAPInternalError(OLAP_ERR_ROWSET_BUILDER_INIT);
goto PROCESS_ALTER_EXIT;
@@ -1889,7 +1831,7 @@ Status SchemaChangeHandler::_convert_historical_rowsets(const SchemaChangeParams
LOG(WARNING) << "failed to build rowset, exit alter process";
goto PROCESS_ALTER_EXIT;
}
- res = sc_params.new_tablet->add_rowset(new_rowset, false);
+ res = sc_params.new_tablet->add_rowset(new_rowset);
if (res.precise_code() == OLAP_ERR_PUSH_VERSION_ALREADY_EXIST) {
LOG(WARNING) << "version already exist, version revert occurred. "
<< "tablet=" << sc_params.new_tablet->full_name() << ", version='"
diff --git a/be/src/olap/schema_change.h b/be/src/olap/schema_change.h
index ec1f2dc0f2..dc382ececd 100644
--- a/be/src/olap/schema_change.h
+++ b/be/src/olap/schema_change.h
@@ -162,8 +162,7 @@ public:
private:
bool _internal_sorting(const std::vector<RowBlock*>& row_block_arr,
const Version& temp_delta_versions, TabletSharedPtr new_tablet,
- RowsetTypePB new_rowset_type, SegmentsOverlapPB segments_overlap,
- RowsetSharedPtr* rowset);
+ SegmentsOverlapPB segments_overlap, RowsetSharedPtr* rowset);
bool _external_sorting(std::vector<RowsetSharedPtr>& src_rowsets, RowsetWriter* rowset_writer,
TabletSharedPtr new_tablet);
diff --git a/be/src/olap/storage_migration_v2.cpp b/be/src/olap/storage_migration_v2.cpp
index 4bd59e16b2..aed8fc9621 100644
--- a/be/src/olap/storage_migration_v2.cpp
+++ b/be/src/olap/storage_migration_v2.cpp
@@ -372,7 +372,7 @@ Status StorageMigrationV2Handler::_convert_historical_rowsets(
LOG(WARNING) << "failed to build rowset, exit alter process";
goto PROCESS_ALTER_EXIT;
}
- res = sm_params.new_tablet->add_rowset(new_rowset, false);
+ res = sm_params.new_tablet->add_rowset(new_rowset);
if (res.precise_code() == OLAP_ERR_PUSH_VERSION_ALREADY_EXIST) {
LOG(WARNING) << "version already exist, version revert occurred. "
<< "tablet=" << sm_params.new_tablet->full_name() << ", version='"
diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp
index fb473cf3d9..cb073a8558 100644
--- a/be/src/olap/tablet.cpp
+++ b/be/src/olap/tablet.cpp
@@ -206,7 +206,7 @@ Status Tablet::revise_tablet_meta(const std::vector<RowsetMetaSharedPtr>& rowset
return res;
}
-Status Tablet::add_rowset(RowsetSharedPtr rowset, bool need_persist) {
+Status Tablet::add_rowset(RowsetSharedPtr rowset) {
DCHECK(rowset != nullptr);
std::lock_guard<std::shared_mutex> wrlock(_meta_lock);
// If the rowset already exist, just return directly. The rowset_id is an unique-id,
@@ -235,15 +235,6 @@ Status Tablet::add_rowset(RowsetSharedPtr rowset, bool need_persist) {
}
std::vector<RowsetSharedPtr> empty_vec;
modify_rowsets(empty_vec, rowsets_to_delete);
-
- if (need_persist) {
- Status res =
- RowsetMetaManager::save(data_dir()->get_meta(), tablet_uid(), rowset->rowset_id(),
- rowset->rowset_meta()->get_rowset_pb());
- if (!res.ok()) {
- LOG(FATAL) << "failed to save rowset to local meta store" << rowset->rowset_id();
- }
- }
++_newly_created_rowset_num;
return Status::OK();
}
@@ -1446,4 +1437,89 @@ void Tablet::reset_compaction(CompactionType compaction_type) {
}
}
+Status Tablet::create_initial_rowset(const int64_t req_version) {
+ Status res = Status::OK();
+ if (req_version < 1) {
+ LOG(WARNING) << "init version of tablet should at least 1. req.ver=" << req_version;
+ return Status::OLAPInternalError(OLAP_ERR_CE_CMD_PARAMS_ERROR);
+ }
+ Version version(0, req_version);
+ RowsetSharedPtr new_rowset;
+ do {
+ // there is no data in init rowset, so overlapping info is unknown.
+ std::unique_ptr<RowsetWriter> rs_writer;
+ res = create_rowset_writer(version, VISIBLE, OVERLAP_UNKNOWN, &rs_writer);
+ if (!res.ok()) {
+ LOG(WARNING) << "failed to init rowset writer for tablet " << full_name();
+ break;
+ }
+ res = rs_writer->flush();
+ if (!res.ok()) {
+ LOG(WARNING) << "failed to flush rowset writer for tablet " << full_name();
+ break;
+ }
+
+ new_rowset = rs_writer->build();
+ res = add_rowset(new_rowset);
+ if (!res.ok()) {
+ LOG(WARNING) << "failed to add rowset for tablet " << full_name();
+ break;
+ }
+ } while (0);
+
+ // Unregister index and delete files(index and data) if failed
+ if (!res.ok()) {
+ LOG(WARNING) << "fail to create initial rowset. res=" << res << " version=" << req_version;
+ StorageEngine::instance()->add_unused_rowset(new_rowset);
+ return res;
+ }
+ set_cumulative_layer_point(req_version + 1);
+ return res;
+}
+
+Status Tablet::create_rowset_writer(const Version& version, const RowsetStatePB& rowset_state,
+ const SegmentsOverlapPB& overlap,
+ std::unique_ptr<RowsetWriter>* rowset_writer) {
+ RowsetWriterContext context;
+ context.version = version;
+ context.rowset_state = rowset_state;
+ context.segments_overlap = overlap;
+ _init_context_common_fields(context);
+ return RowsetFactory::create_rowset_writer(context, rowset_writer);
+}
+
+Status Tablet::create_rowset_writer(const int64_t& txn_id, const PUniqueId& load_id,
+ const RowsetStatePB& rowset_state,
+ const SegmentsOverlapPB& overlap,
+ std::unique_ptr<RowsetWriter>* rowset_writer) {
+ RowsetWriterContext context;
+ context.txn_id = txn_id;
+ context.load_id = load_id;
+ context.rowset_state = rowset_state;
+ context.segments_overlap = overlap;
+ _init_context_common_fields(context);
+ return RowsetFactory::create_rowset_writer(context, rowset_writer);
+}
+
+void Tablet::_init_context_common_fields(RowsetWriterContext& context) {
+ context.rowset_id = StorageEngine::instance()->next_rowset_id();
+ context.tablet_uid = tablet_uid();
+ context.tablet_id = tablet_id();
+ context.partition_id = partition_id();
+ context.tablet_schema_hash = schema_hash();
+ context.rowset_type = tablet_meta()->preferred_rowset_type();
+ // Alpha Rowset will be removed in the future, so that if the tablet's default rowset type is
+ // alpah rowset, then set the newly created rowset to storage engine's default rowset.
+ if (context.rowset_type == ALPHA_ROWSET) {
+ context.rowset_type = StorageEngine::instance()->default_rowset_type();
+ }
+ context.path_desc = tablet_path_desc();
+ context.tablet_schema = &(tablet_schema());
+ context.data_dir = data_dir();
+}
+
+Status Tablet::create_rowset(RowsetMetaSharedPtr rowset_meta, RowsetSharedPtr* rowset) {
+ return RowsetFactory::create_rowset(&tablet_schema(), tablet_path_desc(), rowset_meta, rowset);
+}
+
} // namespace doris
diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h
index cf413a5064..1874afa4e8 100644
--- a/be/src/olap/tablet.h
+++ b/be/src/olap/tablet.h
@@ -33,6 +33,7 @@
#include "olap/olap_define.h"
#include "olap/rowset/rowset.h"
#include "olap/rowset/rowset_reader.h"
+#include "olap/rowset/rowset_writer.h"
#include "olap/tablet_meta.h"
#include "olap/tuple.h"
#include "olap/utils.h"
@@ -97,7 +98,8 @@ public:
int32_t field_index(const std::string& field_name) const;
// operation in rowsets
- Status add_rowset(RowsetSharedPtr rowset, bool need_persist = true);
+ Status add_rowset(RowsetSharedPtr rowset);
+ Status create_initial_rowset(const int64_t version);
void modify_rowsets(std::vector<RowsetSharedPtr>& to_add,
std::vector<RowsetSharedPtr>& to_delete);
@@ -256,6 +258,16 @@ public:
return _tablet_meta->all_beta();
}
+ Status create_rowset_writer(const Version& version, const RowsetStatePB& rowset_state,
+ const SegmentsOverlapPB& overlap,
+ std::unique_ptr<RowsetWriter>* rowset_writer);
+
+ Status create_rowset_writer(const int64_t& txn_id, const PUniqueId& load_id,
+ const RowsetStatePB& rowset_state, const SegmentsOverlapPB& overlap,
+ std::unique_ptr<RowsetWriter>* rowset_writer);
+
+ Status create_rowset(RowsetMetaSharedPtr rowset_meta, RowsetSharedPtr* rowset);
+
private:
Status _init_once_action();
void _print_missed_versions(const std::vector<Version>& missed_versions) const;
@@ -281,6 +293,7 @@ private:
// When the proportion of empty edges in the adjacency matrix used to represent the version graph
// in the version tracker is greater than the threshold, rebuild the version tracker
bool _reconstruct_version_tracker_if_necessary();
+ void _init_context_common_fields(RowsetWriterContext& context);
public:
static const int64_t K_INVALID_CUMULATIVE_POINT = -1;
diff --git a/be/src/olap/tablet_manager.cpp b/be/src/olap/tablet_manager.cpp
index 2936c9862e..c3a682f283 100644
--- a/be/src/olap/tablet_manager.cpp
+++ b/be/src/olap/tablet_manager.cpp
@@ -39,7 +39,6 @@
#include "olap/push_handler.h"
#include "olap/reader.h"
#include "olap/rowset/column_data_writer.h"
-#include "olap/rowset/rowset_factory.h"
#include "olap/rowset/rowset_id_generator.h"
#include "olap/schema_change.h"
#include "olap/tablet.h"
@@ -332,7 +331,7 @@ TabletSharedPtr TabletManager::_internal_create_tablet_unlocked(
// bool in_restore_mode = request.__isset.in_restore_mode && request.in_restore_mode;
// if (!in_restore_mode && request.__isset.version) {
// create initial rowset before add it to storage engine could omit many locks
- res = _create_initial_rowset_unlocked(request, tablet.get());
+ res = tablet->create_initial_rowset(request.version);
if (!res.ok()) {
LOG(WARNING) << "fail to create initial version for tablet. res=" << res;
break;
@@ -343,26 +342,10 @@ TabletSharedPtr TabletManager::_internal_create_tablet_unlocked(
// because schema change handler depends on it to check whether history data
// convert finished
tablet->set_tablet_state(TabletState::TABLET_NOTREADY);
- // The following two special situations may occur:
- // 1. Because the operating system time jumps, the creation_time of the newly generated table
- // is less than the creation_time of the old table
- // 2. Because the unit of second is unified in the olap engine code,
- // if two operations (such as creating a table, and then immediately altering the table)
- // is less than 1s, then the creation_time of the new table and the old table obtained by alter will be the same
- //
- // When the above two situations occur, in order to be able to distinguish between the new tablet
- // obtained by alter and the old tablet, the creation_time of the new tablet is set to
- // the creation_time of the old tablet increased by 1
- if (tablet->creation_time() <= base_tablet->creation_time()) {
- LOG(WARNING) << "new tablet's create time is less than or equal to old tablet"
- << "new_tablet_create_time=" << tablet->creation_time()
- << ", base_tablet_create_time=" << base_tablet->creation_time();
- int64_t new_creation_time = base_tablet->creation_time() + 1;
- tablet->set_creation_time(new_creation_time);
- }
}
// Add tablet to StorageEngine will make it visible to user
- res = _add_tablet_unlocked(new_tablet_id, tablet, true, false);
+ // Will persist tablet meta
+ res = _add_tablet_unlocked(new_tablet_id, tablet, /*update_meta*/ true, false);
if (!res.ok()) {
LOG(WARNING) << "fail to add tablet to StorageEngine. res=" << res;
break;
@@ -1167,76 +1150,6 @@ void TabletManager::do_tablet_meta_checkpoint(DataDir* data_dir) {
<< ", number: " << counter << ", cost(ms): " << cost;
}
-Status TabletManager::_create_initial_rowset_unlocked(const TCreateTabletReq& request,
- Tablet* tablet) {
- Status res = Status::OK();
- if (request.version < 1) {
- LOG(WARNING) << "init version of tablet should at least 1. req.ver=" << request.version;
- return Status::OLAPInternalError(OLAP_ERR_CE_CMD_PARAMS_ERROR);
- } else {
- Version version(0, request.version);
- VLOG_NOTICE << "begin to create init version. version=" << version;
- RowsetSharedPtr new_rowset;
- do {
- RowsetWriterContext context;
- context.rowset_id = StorageEngine::instance()->next_rowset_id();
- context.tablet_uid = tablet->tablet_uid();
- context.tablet_id = tablet->tablet_id();
- context.partition_id = tablet->partition_id();
- context.tablet_schema_hash = tablet->schema_hash();
- context.data_dir = tablet->data_dir();
- if (!request.__isset.storage_format ||
- request.storage_format == TStorageFormat::DEFAULT) {
- context.rowset_type = StorageEngine::instance()->default_rowset_type();
- } else if (request.storage_format == TStorageFormat::V1) {
- context.rowset_type = RowsetTypePB::ALPHA_ROWSET;
- } else if (request.storage_format == TStorageFormat::V2) {
- context.rowset_type = RowsetTypePB::BETA_ROWSET;
- } else {
- LOG(ERROR) << "invalid TStorageFormat: " << request.storage_format;
- DCHECK(false);
- context.rowset_type = StorageEngine::instance()->default_rowset_type();
- }
- context.path_desc = tablet->tablet_path_desc();
- context.tablet_schema = &(tablet->tablet_schema());
- context.rowset_state = VISIBLE;
- context.version = version;
- // there is no data in init rowset, so overlapping info is unknown.
- context.segments_overlap = OVERLAP_UNKNOWN;
-
- std::unique_ptr<RowsetWriter> builder;
- res = RowsetFactory::create_rowset_writer(context, &builder);
- if (!res.ok()) {
- LOG(WARNING) << "failed to init rowset writer for tablet " << tablet->full_name();
- break;
- }
- res = builder->flush();
- if (!res.ok()) {
- LOG(WARNING) << "failed to flush rowset writer for tablet " << tablet->full_name();
- break;
- }
-
- new_rowset = builder->build();
- res = tablet->add_rowset(new_rowset, false);
- if (!res.ok()) {
- LOG(WARNING) << "failed to add rowset for tablet " << tablet->full_name();
- break;
- }
- } while (0);
-
- // Unregister index and delete files(index and data) if failed
- if (!res.ok()) {
- LOG(WARNING) << "fail to create initial rowset. res=" << res << " version=" << version;
- StorageEngine::instance()->add_unused_rowset(new_rowset);
- return res;
- }
- }
- tablet->set_cumulative_layer_point(request.version + 1);
- // NOTE: should not save tablet meta here, because it will be saved if add to map successfully
-
- return res;
-}
-
Status TabletManager::_create_tablet_meta_unlocked(const TCreateTabletReq& request, DataDir* store,
const bool is_schema_change,
const Tablet* base_tablet,
@@ -1276,11 +1189,19 @@ Status TabletManager::_create_tablet_meta_unlocked(const TCreateTabletReq& reque
RETURN_NOT_OK_LOG(store->get_shard(&shard_id), "fail to get root path shard");
Status res = TabletMeta::create(request, TabletUid::gen_uid(), shard_id, next_unique_id,
col_idx_to_unique_id, tablet_meta);
-
- if (request.__isset.storage_format && request.storage_format != TStorageFormat::V1) {
- (*tablet_meta)->set_preferred_rowset_type(BETA_ROWSET);
- } else {
- (*tablet_meta)->set_preferred_rowset_type(ALPHA_ROWSET);
+ RETURN_NOT_OK(res);
+ if (request.__isset.storage_format) {
+ if (request.storage_format == TStorageFormat::DEFAULT) {
+ (*tablet_meta)
+ ->set_preferred_rowset_type(StorageEngine::instance()->default_rowset_type());
+ } else if (request.storage_format == TStorageFormat::V1) {
+ (*tablet_meta)->set_preferred_rowset_type(ALPHA_ROWSET);
+ } else if (request.storage_format == TStorageFormat::V2) {
+ (*tablet_meta)->set_preferred_rowset_type(BETA_ROWSET);
+ } else {
+ LOG(FATAL) << "invalid TStorageFormat: " << request.storage_format;
+ return Status::OLAPInternalError(OLAP_ERR_CE_CMD_PARAMS_ERROR);
+ }
}
return res;
}
diff --git a/be/src/olap/tablet_manager.h b/be/src/olap/tablet_manager.h
index a2420a5d81..85b4c644ae 100644
--- a/be/src/olap/tablet_manager.h
+++ b/be/src/olap/tablet_manager.h
@@ -154,7 +154,6 @@ private:
bool update_meta, bool keep_files, bool drop_old);
bool _check_tablet_id_exist_unlocked(TTabletId tablet_id);
- Status _create_initial_rowset_unlocked(const TCreateTabletReq& request, Tablet* tablet);
Status _drop_tablet_directly_unlocked(TTabletId tablet_id, bool keep_files = false);
diff --git a/be/src/olap/tablet_meta.cpp b/be/src/olap/tablet_meta.cpp
index 7e1a9b40cc..0cd9202a72 100644
--- a/be/src/olap/tablet_meta.cpp
+++ b/be/src/olap/tablet_meta.cpp
@@ -62,6 +62,7 @@ TabletMeta::TabletMeta(int64_t table_id, int64_t partition_id, int64_t tablet_id
tablet_meta_pb.set_tablet_id(tablet_id);
tablet_meta_pb.set_schema_hash(schema_hash);
tablet_meta_pb.set_shard_id(shard_id);
+ // Persist the creation time, but it is not used
tablet_meta_pb.set_creation_time(time(nullptr));
tablet_meta_pb.set_cumulative_layer_point(-1);
tablet_meta_pb.set_tablet_state(PB_RUNNING);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org