You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by yi...@apache.org on 2022/04/13 08:50:13 UTC
[incubator-doris] 01/01: move rowset writer to a single place
This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch uniform_rowset_writer
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
commit 49fc596024f9d1963f6c38334c35ef3bf3086424
Author: yiguolei <yi...@gmail.com>
AuthorDate: Wed Apr 13 16:49:48 2022 +0800
move rowset writer to a single place
---
be/src/olap/compaction.cpp | 22 ++-----------
be/src/olap/data_dir.cpp | 1 -
be/src/olap/delta_writer.cpp | 22 +------------
be/src/olap/push_handler.cpp | 49 +++--------------------------
be/src/olap/schema_change.cpp | 70 ++++++++----------------------------------
be/src/olap/tablet.h | 7 +++++
be/src/olap/tablet_manager.cpp | 30 ++++++------------
7 files changed, 36 insertions(+), 165 deletions(-)
diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp
index 08a49b54b8..1f9937b100 100644
--- a/be/src/olap/compaction.cpp
+++ b/be/src/olap/compaction.cpp
@@ -18,7 +18,6 @@
#include "olap/compaction.h"
#include "gutil/strings/substitute.h"
-#include "olap/rowset/rowset_factory.h"
#include "util/time.h"
#include "util/trace.h"
@@ -144,25 +143,8 @@ OLAPStatus Compaction::do_compaction_impl(int64_t permits) {
}
OLAPStatus Compaction::construct_output_rowset_writer() {
- RowsetWriterContext context;
- context.rowset_id = StorageEngine::instance()->next_rowset_id();
- context.tablet_uid = _tablet->tablet_uid();
- context.tablet_id = _tablet->tablet_id();
- context.partition_id = _tablet->partition_id();
- context.tablet_schema_hash = _tablet->schema_hash();
- context.data_dir = _tablet->data_dir();
- context.rowset_type = StorageEngine::instance()->default_rowset_type();
- if (_tablet->tablet_meta()->preferred_rowset_type() == BETA_ROWSET) {
- context.rowset_type = BETA_ROWSET;
- }
- context.path_desc = _tablet->tablet_path_desc();
- context.tablet_schema = &(_tablet->tablet_schema());
- context.rowset_state = VISIBLE;
- context.version = _output_version;
- context.segments_overlap = NONOVERLAPPING;
- // The test results show that one rs writer is low-memory-footprint, there is no need to tracker its mem pool
- RETURN_NOT_OK(RowsetFactory::create_rowset_writer(context, &_output_rs_writer));
- return OLAP_SUCCESS;
+ return _tablet->create_rowset_writer(_output_version, VISIBLE,
+ NONOVERLAPPING, &_output_rs_writer);
}
OLAPStatus Compaction::construct_input_rowset_readers() {
diff --git a/be/src/olap/data_dir.cpp b/be/src/olap/data_dir.cpp
index d882391bb7..d3f84f46ba 100644
--- a/be/src/olap/data_dir.cpp
+++ b/be/src/olap/data_dir.cpp
@@ -37,7 +37,6 @@
#include "olap/file_helper.h"
#include "olap/olap_define.h"
#include "olap/rowset/alpha_rowset_meta.h"
-#include "olap/rowset/rowset_factory.h"
#include "olap/rowset/rowset_meta_manager.h"
#include "olap/storage_engine.h"
#include "olap/tablet_meta_manager.h"
diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp
index 60ea864282..0bf6610a6c 100644
--- a/be/src/olap/delta_writer.cpp
+++ b/be/src/olap/delta_writer.cpp
@@ -20,7 +20,6 @@
#include "olap/data_dir.h"
#include "olap/memtable.h"
#include "olap/memtable_flush_executor.h"
-#include "olap/rowset/rowset_factory.h"
#include "olap/schema.h"
#include "olap/schema_change.h"
#include "olap/storage_engine.h"
@@ -114,26 +113,7 @@ OLAPStatus DeltaWriter::init() {
_req.txn_id, _req.load_id));
}
- RowsetWriterContext writer_context;
- writer_context.rowset_id = _storage_engine->next_rowset_id();
- writer_context.tablet_uid = _tablet->tablet_uid();
- writer_context.tablet_id = _req.tablet_id;
- writer_context.partition_id = _req.partition_id;
- writer_context.tablet_schema_hash = _req.schema_hash;
- if (_tablet->tablet_meta()->preferred_rowset_type() == BETA_ROWSET) {
- writer_context.rowset_type = BETA_ROWSET;
- } else {
- writer_context.rowset_type = ALPHA_ROWSET;
- }
- writer_context.path_desc = _tablet->tablet_path_desc();
- writer_context.tablet_schema = &(_tablet->tablet_schema());
- writer_context.rowset_state = PREPARED;
- writer_context.txn_id = _req.txn_id;
- writer_context.load_id = _req.load_id;
- writer_context.segments_overlap = OVERLAPPING;
- writer_context.data_dir = _tablet->data_dir();
- RETURN_NOT_OK(RowsetFactory::create_rowset_writer(writer_context, &_rowset_writer));
-
+ RETURN_NOT_OK(_tablet->create_rowset_writer(_req.txn_id, _req.load_id, PREPARED, OVERLAPPING, &_rowset_writer));
_tablet_schema = &(_tablet->tablet_schema());
_schema.reset(new Schema(*_tablet_schema));
_reset_mem_table();
diff --git a/be/src/olap/push_handler.cpp b/be/src/olap/push_handler.cpp
index 234db7ad63..fd6c41babe 100644
--- a/be/src/olap/push_handler.cpp
+++ b/be/src/olap/push_handler.cpp
@@ -25,7 +25,6 @@
#include "common/status.h"
#include "exec/parquet_scanner.h"
#include "olap/row.h"
-#include "olap/rowset/rowset_factory.h"
#include "olap/rowset/rowset_id_generator.h"
#include "olap/rowset/rowset_meta_manager.h"
#include "olap/schema_change.h"
@@ -221,29 +220,11 @@ OLAPStatus PushHandler::_convert_v2(TabletSharedPtr cur_tablet, TabletSharedPtr
// 1. init RowsetBuilder of cur_tablet for current push
VLOG_NOTICE << "init rowset builder. tablet=" << cur_tablet->full_name()
<< ", block_row_size=" << cur_tablet->num_rows_per_row_block();
- RowsetWriterContext context;
- context.rowset_id = StorageEngine::instance()->next_rowset_id();
- context.tablet_uid = cur_tablet->tablet_uid();
- context.tablet_id = cur_tablet->tablet_id();
- context.partition_id = _request.partition_id;
- context.tablet_schema_hash = cur_tablet->schema_hash();
- context.data_dir = cur_tablet->data_dir();
- context.rowset_type = StorageEngine::instance()->default_rowset_type();
- if (cur_tablet->tablet_meta()->preferred_rowset_type() == BETA_ROWSET) {
- context.rowset_type = BETA_ROWSET;
- }
- context.path_desc = cur_tablet->tablet_path_desc();
- context.tablet_schema = &(cur_tablet->tablet_schema());
- context.rowset_state = PREPARED;
- context.txn_id = _request.transaction_id;
- context.load_id = load_id;
// although the spark load output files are fully sorted,
// but it depends on thirparty implementation, so we conservatively
// set this value to OVERLAP_UNKNOWN
- context.segments_overlap = OVERLAP_UNKNOWN;
-
- std::unique_ptr<RowsetWriter> rowset_writer;
- res = RowsetFactory::create_rowset_writer(context, &rowset_writer);
+ res = cur_tablet->create_rowset_writer(_request.transaction_id, load_id, PREPARED,
+ OVERLAP_UNKNOWN, &rowset_writer);
if (OLAP_SUCCESS != res) {
LOG(WARNING) << "failed to init rowset writer, tablet=" << cur_tablet->full_name()
<< ", txn_id=" << _request.transaction_id << ", res=" << res;
@@ -407,30 +388,8 @@ OLAPStatus PushHandler::_convert(TabletSharedPtr cur_tablet, TabletSharedPtr new
}
// 2. init RowsetBuilder of cur_tablet for current push
- VLOG_NOTICE << "init RowsetBuilder.";
- RowsetWriterContext context;
- context.rowset_id = StorageEngine::instance()->next_rowset_id();
- context.tablet_uid = cur_tablet->tablet_uid();
- context.tablet_id = cur_tablet->tablet_id();
- context.partition_id = _request.partition_id;
- context.tablet_schema_hash = cur_tablet->schema_hash();
- context.data_dir = cur_tablet->data_dir();
- context.rowset_type = StorageEngine::instance()->default_rowset_type();
- if (cur_tablet->tablet_meta()->preferred_rowset_type() == BETA_ROWSET) {
- context.rowset_type = BETA_ROWSET;
- }
- context.path_desc = cur_tablet->tablet_path_desc();
- context.tablet_schema = &(cur_tablet->tablet_schema());
- context.rowset_state = PREPARED;
- context.txn_id = _request.transaction_id;
- context.load_id = load_id;
- // although the hadoop load output files are fully sorted,
- // but it depends on thirparty implementation, so we conservatively
- // set this value to OVERLAP_UNKNOWN
- context.segments_overlap = OVERLAP_UNKNOWN;
-
- std::unique_ptr<RowsetWriter> rowset_writer;
- res = RowsetFactory::create_rowset_writer(context, &rowset_writer);
+ res = cur_tablet->create_rowset_writer(_request.transaction_id, load_id, PREPARED,
+ OVERLAP_UNKNOWN, &rowset_writer);
if (OLAP_SUCCESS != res) {
LOG(WARNING) << "failed to init rowset writer, tablet=" << cur_tablet->full_name()
<< ", txn_id=" << _request.transaction_id << ", res=" << res;
diff --git a/be/src/olap/schema_change.cpp b/be/src/olap/schema_change.cpp
index ebe69f8b75..da062f134d 100644
--- a/be/src/olap/schema_change.cpp
+++ b/be/src/olap/schema_change.cpp
@@ -29,7 +29,6 @@
#include "olap/row.h"
#include "olap/row_block.h"
#include "olap/row_cursor.h"
-#include "olap/rowset/rowset_factory.h"
#include "olap/rowset/rowset_id_generator.h"
#include "olap/storage_engine.h"
#include "olap/tablet.h"
@@ -1315,25 +1314,11 @@ bool SchemaChangeWithSorting::_internal_sorting(const std::vector<RowBlock*>& ro
uint64_t merged_rows = 0;
RowBlockMerger merger(new_tablet);
- RowsetWriterContext context;
- context.rowset_id = StorageEngine::instance()->next_rowset_id();
- context.tablet_uid = new_tablet->tablet_uid();
- context.tablet_id = new_tablet->tablet_id();
- context.partition_id = new_tablet->partition_id();
- context.tablet_schema_hash = new_tablet->schema_hash();
- context.rowset_type = new_rowset_type;
- context.path_desc = new_tablet->tablet_path_desc();
- context.tablet_schema = &(new_tablet->tablet_schema());
- context.data_dir = new_tablet->data_dir();
- context.rowset_state = VISIBLE;
- context.version = version;
- context.segments_overlap = segments_overlap;
-
VLOG_NOTICE << "init rowset builder. tablet=" << new_tablet->full_name()
<< ", block_row_size=" << new_tablet->num_rows_per_row_block();
std::unique_ptr<RowsetWriter> rowset_writer;
- if (RowsetFactory::create_rowset_writer(context, &rowset_writer) != OLAP_SUCCESS) {
+ if (new_tablet->create_rowset_writer(version, VISIBLE, segments_overlap, &rowset_writer) != OLAP_SUCCESS) {
return false;
}
@@ -1700,28 +1685,14 @@ OLAPStatus SchemaChangeHandler::schema_version_convert(TabletSharedPtr base_tabl
RowsetReaderSharedPtr rowset_reader;
RETURN_NOT_OK((*base_rowset)->create_reader(&rowset_reader));
RETURN_NOT_OK(rowset_reader->init(&reader_context));
-
- RowsetWriterContext writer_context;
- writer_context.rowset_id = StorageEngine::instance()->next_rowset_id();
- writer_context.tablet_uid = new_tablet->tablet_uid();
- writer_context.tablet_id = new_tablet->tablet_id();
- writer_context.partition_id = (*base_rowset)->partition_id();
- writer_context.tablet_schema_hash = new_tablet->schema_hash();
- writer_context.data_dir = new_tablet->data_dir();
- writer_context.rowset_type = (*base_rowset)->rowset_meta()->rowset_type();
- if (new_tablet->tablet_meta()->preferred_rowset_type() == BETA_ROWSET) {
- writer_context.rowset_type = BETA_ROWSET;
- }
- writer_context.path_desc = new_tablet->tablet_path_desc();
- writer_context.tablet_schema = &(new_tablet->tablet_schema());
- writer_context.rowset_state = PREPARED;
- writer_context.txn_id = (*base_rowset)->txn_id();
- writer_context.load_id.set_hi((*base_rowset)->load_id().hi());
- writer_context.load_id.set_lo((*base_rowset)->load_id().lo());
- writer_context.segments_overlap = (*base_rowset)->rowset_meta()->segments_overlap();
-
+ PUniqueId load_id;
+ load_id.set_hi((*base_rowset)->load_id().hi());
+ load_id.set_lo((*base_rowset)->load_id().lo());
std::unique_ptr<RowsetWriter> rowset_writer;
RowsetFactory::create_rowset_writer(writer_context, &rowset_writer);
+ RETURN_NOT_OK(new_tablet->create_rowset_writer((*base_rowset)->txn_id(), load_id, PREPARED,
+ (*base_rowset)->rowset_meta()->segments_overlap(),
+ &rowset_writer));
if ((res = sc_procedure->process(rowset_reader, rowset_writer.get(), new_tablet,
base_tablet)) != OLAP_SUCCESS) {
@@ -1843,29 +1814,12 @@ OLAPStatus SchemaChangeHandler::_convert_historical_rowsets(const SchemaChangePa
// As long as there is a new_table as running, ref table is set as running
// NOTE If the first sub_table fails first, it will continue to go as normal here
TabletSharedPtr new_tablet = sc_params.new_tablet;
-
- RowsetWriterContext writer_context;
- writer_context.rowset_id = StorageEngine::instance()->next_rowset_id();
- writer_context.tablet_uid = new_tablet->tablet_uid();
- writer_context.tablet_id = new_tablet->tablet_id();
- writer_context.partition_id = new_tablet->partition_id();
- writer_context.tablet_schema_hash = new_tablet->schema_hash();
- writer_context.data_dir = new_tablet->data_dir();
- // linked schema change can't change rowset type, therefore we preserve rowset type in schema change now
- writer_context.rowset_type = rs_reader->rowset()->rowset_meta()->rowset_type();
- if (sc_params.new_tablet->tablet_meta()->preferred_rowset_type() == BETA_ROWSET) {
- // Use beta rowset to do schema change
- // And in this case, linked schema change will not be used.
- writer_context.rowset_type = BETA_ROWSET;
- }
- writer_context.path_desc = new_tablet->tablet_path_desc();
- writer_context.tablet_schema = &(new_tablet->tablet_schema());
- writer_context.rowset_state = VISIBLE;
- writer_context.version = rs_reader->version();
- writer_context.segments_overlap = rs_reader->rowset()->rowset_meta()->segments_overlap();
-
+ // When tablet create new rowset writer, it may change rowset type, in this case
+ // linked schema change will not be used.
std::unique_ptr<RowsetWriter> rowset_writer;
- OLAPStatus status = RowsetFactory::create_rowset_writer(writer_context, &rowset_writer);
+ OLAPStatus status = new_tablet->create_rowset_writer(rs_reader->version(), VISIBLE,
+ rs_reader->rowset()->rowset_meta()->segments_overlap(),
+ &rowset_writer);
if (status != OLAP_SUCCESS) {
res = OLAP_ERR_ROWSET_BUILDER_INIT;
goto PROCESS_ALTER_EXIT;
diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h
index 75235c96c8..74cbdc7a80 100644
--- a/be/src/olap/tablet.h
+++ b/be/src/olap/tablet.h
@@ -255,6 +255,13 @@ public:
return _tablet_meta->all_beta();
}
+ Status create_rowset_writer(Version& version, const RowsetStatePB& rowset_state,
+ const SegmentsOverlapPB& overlap, std::unique_ptr<RowsetWriter>* rowset_writer);
+
+ Status create_rowset_writer(const int64_t& txn_id, const PUniqueId& load_id,
+ const RowsetStatePB& rowset_state, const SegmentsOverlapPB& overlap,
+ std::unique_ptr<RowsetWriter>* rowset_writer);
+
private:
OLAPStatus _init_once_action();
void _print_missed_versions(const std::vector<Version>& missed_versions) const;
diff --git a/be/src/olap/tablet_manager.cpp b/be/src/olap/tablet_manager.cpp
index 0d5b9714ab..571ed40401 100644
--- a/be/src/olap/tablet_manager.cpp
+++ b/be/src/olap/tablet_manager.cpp
@@ -39,7 +39,6 @@
#include "olap/push_handler.h"
#include "olap/reader.h"
#include "olap/rowset/column_data_writer.h"
-#include "olap/rowset/rowset_factory.h"
#include "olap/rowset/rowset_id_generator.h"
#include "olap/schema_change.h"
#include "olap/tablet.h"
@@ -1097,34 +1096,25 @@ OLAPStatus TabletManager::_create_initial_rowset_unlocked(const TCreateTabletReq
VLOG_NOTICE << "begin to create init version. version=" << version;
RowsetSharedPtr new_rowset;
do {
- RowsetWriterContext context;
- context.rowset_id = StorageEngine::instance()->next_rowset_id();
- context.tablet_uid = tablet->tablet_uid();
- context.tablet_id = tablet->tablet_id();
- context.partition_id = tablet->partition_id();
- context.tablet_schema_hash = tablet->schema_hash();
- context.data_dir = tablet->data_dir();
+ RowsetTypePB rowset_type = BETA_ROWSET;
if (!request.__isset.storage_format ||
request.storage_format == TStorageFormat::DEFAULT) {
- context.rowset_type = StorageEngine::instance()->default_rowset_type();
+ rowset_type = StorageEngine::instance()->default_rowset_type();
} else if (request.storage_format == TStorageFormat::V1) {
- context.rowset_type = RowsetTypePB::ALPHA_ROWSET;
+ rowset_type = RowsetTypePB::ALPHA_ROWSET;
} else if (request.storage_format == TStorageFormat::V2) {
- context.rowset_type = RowsetTypePB::BETA_ROWSET;
+ rowset_type = RowsetTypePB::BETA_ROWSET;
} else {
LOG(ERROR) << "invalid TStorageFormat: " << request.storage_format;
- DCHECK(false);
- context.rowset_type = StorageEngine::instance()->default_rowset_type();
+ return OLAP_ERR_CE_CMD_PARAMS_ERROR;
+ }
+ if (rowset_type != BETA_ROWSET) {
+ LOG(WARNING) << "Only support beta rowset now, alpha rowset is not supported any more";
+ return OLAP_ERR_CE_CMD_PARAMS_ERROR;
}
- context.path_desc = tablet->tablet_path_desc();
- context.tablet_schema = &(tablet->tablet_schema());
- context.rowset_state = VISIBLE;
- context.version = version;
// there is no data in init rowset, so overlapping info is unknown.
- context.segments_overlap = OVERLAP_UNKNOWN;
-
std::unique_ptr<RowsetWriter> builder;
- res = RowsetFactory::create_rowset_writer(context, &builder);
+ res = tablet->create_rowset_writer(version, VISIBLE, OVERLAP_UNKNOWN, &builder);
if (res != OLAP_SUCCESS) {
LOG(WARNING) << "failed to init rowset writer for tablet " << tablet->full_name();
break;
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org