You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by yi...@apache.org on 2022/04/13 08:50:13 UTC

[incubator-doris] 01/01: move rowset writer to a single place

This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch uniform_rowset_writer
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git

commit 49fc596024f9d1963f6c38334c35ef3bf3086424
Author: yiguolei <yi...@gmail.com>
AuthorDate: Wed Apr 13 16:49:48 2022 +0800

    move rowset writer to a single place
---
 be/src/olap/compaction.cpp     | 22 ++-----------
 be/src/olap/data_dir.cpp       |  1 -
 be/src/olap/delta_writer.cpp   | 22 +------------
 be/src/olap/push_handler.cpp   | 49 +++--------------------------
 be/src/olap/schema_change.cpp  | 70 ++++++++----------------------------------
 be/src/olap/tablet.h           |  7 +++++
 be/src/olap/tablet_manager.cpp | 30 ++++++------------
 7 files changed, 36 insertions(+), 165 deletions(-)

diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp
index 08a49b54b8..1f9937b100 100644
--- a/be/src/olap/compaction.cpp
+++ b/be/src/olap/compaction.cpp
@@ -18,7 +18,6 @@
 #include "olap/compaction.h"
 
 #include "gutil/strings/substitute.h"
-#include "olap/rowset/rowset_factory.h"
 #include "util/time.h"
 #include "util/trace.h"
 
@@ -144,25 +143,8 @@ OLAPStatus Compaction::do_compaction_impl(int64_t permits) {
 }
 
 OLAPStatus Compaction::construct_output_rowset_writer() {
-    RowsetWriterContext context;
-    context.rowset_id = StorageEngine::instance()->next_rowset_id();
-    context.tablet_uid = _tablet->tablet_uid();
-    context.tablet_id = _tablet->tablet_id();
-    context.partition_id = _tablet->partition_id();
-    context.tablet_schema_hash = _tablet->schema_hash();
-    context.data_dir = _tablet->data_dir();
-    context.rowset_type = StorageEngine::instance()->default_rowset_type();
-    if (_tablet->tablet_meta()->preferred_rowset_type() == BETA_ROWSET) {
-        context.rowset_type = BETA_ROWSET;
-    }
-    context.path_desc = _tablet->tablet_path_desc();
-    context.tablet_schema = &(_tablet->tablet_schema());
-    context.rowset_state = VISIBLE;
-    context.version = _output_version;
-    context.segments_overlap = NONOVERLAPPING;
-    // The test results show that one rs writer is low-memory-footprint, there is no need to tracker its mem pool
-    RETURN_NOT_OK(RowsetFactory::create_rowset_writer(context, &_output_rs_writer));
-    return OLAP_SUCCESS;
+    return _tablet->create_rowset_writer(_output_version, VISIBLE, 
+                                         NONOVERLAPPING, &_output_rs_writer);
 }
 
 OLAPStatus Compaction::construct_input_rowset_readers() {
diff --git a/be/src/olap/data_dir.cpp b/be/src/olap/data_dir.cpp
index d882391bb7..d3f84f46ba 100644
--- a/be/src/olap/data_dir.cpp
+++ b/be/src/olap/data_dir.cpp
@@ -37,7 +37,6 @@
 #include "olap/file_helper.h"
 #include "olap/olap_define.h"
 #include "olap/rowset/alpha_rowset_meta.h"
-#include "olap/rowset/rowset_factory.h"
 #include "olap/rowset/rowset_meta_manager.h"
 #include "olap/storage_engine.h"
 #include "olap/tablet_meta_manager.h"
diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp
index 60ea864282..0bf6610a6c 100644
--- a/be/src/olap/delta_writer.cpp
+++ b/be/src/olap/delta_writer.cpp
@@ -20,7 +20,6 @@
 #include "olap/data_dir.h"
 #include "olap/memtable.h"
 #include "olap/memtable_flush_executor.h"
-#include "olap/rowset/rowset_factory.h"
 #include "olap/schema.h"
 #include "olap/schema_change.h"
 #include "olap/storage_engine.h"
@@ -114,26 +113,7 @@ OLAPStatus DeltaWriter::init() {
                                                                   _req.txn_id, _req.load_id));
     }
 
-    RowsetWriterContext writer_context;
-    writer_context.rowset_id = _storage_engine->next_rowset_id();
-    writer_context.tablet_uid = _tablet->tablet_uid();
-    writer_context.tablet_id = _req.tablet_id;
-    writer_context.partition_id = _req.partition_id;
-    writer_context.tablet_schema_hash = _req.schema_hash;
-    if (_tablet->tablet_meta()->preferred_rowset_type() == BETA_ROWSET) {
-        writer_context.rowset_type = BETA_ROWSET;
-    } else {
-        writer_context.rowset_type = ALPHA_ROWSET;
-    }
-    writer_context.path_desc = _tablet->tablet_path_desc();
-    writer_context.tablet_schema = &(_tablet->tablet_schema());
-    writer_context.rowset_state = PREPARED;
-    writer_context.txn_id = _req.txn_id;
-    writer_context.load_id = _req.load_id;
-    writer_context.segments_overlap = OVERLAPPING;
-    writer_context.data_dir = _tablet->data_dir();
-    RETURN_NOT_OK(RowsetFactory::create_rowset_writer(writer_context, &_rowset_writer));
-
+    RETURN_NOT_OK(_tablet->create_rowset_writer(_req.txn_id, _req.load_id, PREPARED, OVERLAPPING, &_rowset_writer));
     _tablet_schema = &(_tablet->tablet_schema());
     _schema.reset(new Schema(*_tablet_schema));
     _reset_mem_table();
diff --git a/be/src/olap/push_handler.cpp b/be/src/olap/push_handler.cpp
index 234db7ad63..fd6c41babe 100644
--- a/be/src/olap/push_handler.cpp
+++ b/be/src/olap/push_handler.cpp
@@ -25,7 +25,6 @@
 #include "common/status.h"
 #include "exec/parquet_scanner.h"
 #include "olap/row.h"
-#include "olap/rowset/rowset_factory.h"
 #include "olap/rowset/rowset_id_generator.h"
 #include "olap/rowset/rowset_meta_manager.h"
 #include "olap/schema_change.h"
@@ -221,29 +220,11 @@ OLAPStatus PushHandler::_convert_v2(TabletSharedPtr cur_tablet, TabletSharedPtr
         // 1. init RowsetBuilder of cur_tablet for current push
         VLOG_NOTICE << "init rowset builder. tablet=" << cur_tablet->full_name()
                     << ", block_row_size=" << cur_tablet->num_rows_per_row_block();
-        RowsetWriterContext context;
-        context.rowset_id = StorageEngine::instance()->next_rowset_id();
-        context.tablet_uid = cur_tablet->tablet_uid();
-        context.tablet_id = cur_tablet->tablet_id();
-        context.partition_id = _request.partition_id;
-        context.tablet_schema_hash = cur_tablet->schema_hash();
-        context.data_dir = cur_tablet->data_dir();
-        context.rowset_type = StorageEngine::instance()->default_rowset_type();
-        if (cur_tablet->tablet_meta()->preferred_rowset_type() == BETA_ROWSET) {
-            context.rowset_type = BETA_ROWSET;
-        }
-        context.path_desc = cur_tablet->tablet_path_desc();
-        context.tablet_schema = &(cur_tablet->tablet_schema());
-        context.rowset_state = PREPARED;
-        context.txn_id = _request.transaction_id;
-        context.load_id = load_id;
         // although the spark load output files are fully sorted,
         // but it depends on thirparty implementation, so we conservatively
         // set this value to OVERLAP_UNKNOWN
-        context.segments_overlap = OVERLAP_UNKNOWN;
-
-        std::unique_ptr<RowsetWriter> rowset_writer;
-        res = RowsetFactory::create_rowset_writer(context, &rowset_writer);
+        res = cur_tablet->create_rowset_writer(_request.transaction_id, load_id, PREPARED, 
+                                               OVERLAP_UNKNOWN, &rowset_writer);
         if (OLAP_SUCCESS != res) {
             LOG(WARNING) << "failed to init rowset writer, tablet=" << cur_tablet->full_name()
                          << ", txn_id=" << _request.transaction_id << ", res=" << res;
@@ -407,30 +388,8 @@ OLAPStatus PushHandler::_convert(TabletSharedPtr cur_tablet, TabletSharedPtr new
         }
 
         // 2. init RowsetBuilder of cur_tablet for current push
-        VLOG_NOTICE << "init RowsetBuilder.";
-        RowsetWriterContext context;
-        context.rowset_id = StorageEngine::instance()->next_rowset_id();
-        context.tablet_uid = cur_tablet->tablet_uid();
-        context.tablet_id = cur_tablet->tablet_id();
-        context.partition_id = _request.partition_id;
-        context.tablet_schema_hash = cur_tablet->schema_hash();
-        context.data_dir = cur_tablet->data_dir();
-        context.rowset_type = StorageEngine::instance()->default_rowset_type();
-        if (cur_tablet->tablet_meta()->preferred_rowset_type() == BETA_ROWSET) {
-            context.rowset_type = BETA_ROWSET;
-        }
-        context.path_desc = cur_tablet->tablet_path_desc();
-        context.tablet_schema = &(cur_tablet->tablet_schema());
-        context.rowset_state = PREPARED;
-        context.txn_id = _request.transaction_id;
-        context.load_id = load_id;
-        // although the hadoop load output files are fully sorted,
-        // but it depends on thirparty implementation, so we conservatively
-        // set this value to OVERLAP_UNKNOWN
-        context.segments_overlap = OVERLAP_UNKNOWN;
-
-        std::unique_ptr<RowsetWriter> rowset_writer;
-        res = RowsetFactory::create_rowset_writer(context, &rowset_writer);
+        res = cur_tablet->create_rowset_writer(_request.transaction_id, load_id, PREPARED, 
+                                               OVERLAP_UNKNOWN, &rowset_writer);
         if (OLAP_SUCCESS != res) {
             LOG(WARNING) << "failed to init rowset writer, tablet=" << cur_tablet->full_name()
                          << ", txn_id=" << _request.transaction_id << ", res=" << res;
diff --git a/be/src/olap/schema_change.cpp b/be/src/olap/schema_change.cpp
index ebe69f8b75..da062f134d 100644
--- a/be/src/olap/schema_change.cpp
+++ b/be/src/olap/schema_change.cpp
@@ -29,7 +29,6 @@
 #include "olap/row.h"
 #include "olap/row_block.h"
 #include "olap/row_cursor.h"
-#include "olap/rowset/rowset_factory.h"
 #include "olap/rowset/rowset_id_generator.h"
 #include "olap/storage_engine.h"
 #include "olap/tablet.h"
@@ -1315,25 +1314,11 @@ bool SchemaChangeWithSorting::_internal_sorting(const std::vector<RowBlock*>& ro
     uint64_t merged_rows = 0;
     RowBlockMerger merger(new_tablet);
 
-    RowsetWriterContext context;
-    context.rowset_id = StorageEngine::instance()->next_rowset_id();
-    context.tablet_uid = new_tablet->tablet_uid();
-    context.tablet_id = new_tablet->tablet_id();
-    context.partition_id = new_tablet->partition_id();
-    context.tablet_schema_hash = new_tablet->schema_hash();
-    context.rowset_type = new_rowset_type;
-    context.path_desc = new_tablet->tablet_path_desc();
-    context.tablet_schema = &(new_tablet->tablet_schema());
-    context.data_dir = new_tablet->data_dir();
-    context.rowset_state = VISIBLE;
-    context.version = version;
-    context.segments_overlap = segments_overlap;
-
     VLOG_NOTICE << "init rowset builder. tablet=" << new_tablet->full_name()
                 << ", block_row_size=" << new_tablet->num_rows_per_row_block();
 
     std::unique_ptr<RowsetWriter> rowset_writer;
-    if (RowsetFactory::create_rowset_writer(context, &rowset_writer) != OLAP_SUCCESS) {
+    if (new_tablet->create_rowset_writer(version, VISIBLE, segments_overlap, &rowset_writer) != OLAP_SUCCESS) {
         return false;
     }
 
@@ -1700,28 +1685,14 @@ OLAPStatus SchemaChangeHandler::schema_version_convert(TabletSharedPtr base_tabl
     RowsetReaderSharedPtr rowset_reader;
     RETURN_NOT_OK((*base_rowset)->create_reader(&rowset_reader));
     RETURN_NOT_OK(rowset_reader->init(&reader_context));
-
-    RowsetWriterContext writer_context;
-    writer_context.rowset_id = StorageEngine::instance()->next_rowset_id();
-    writer_context.tablet_uid = new_tablet->tablet_uid();
-    writer_context.tablet_id = new_tablet->tablet_id();
-    writer_context.partition_id = (*base_rowset)->partition_id();
-    writer_context.tablet_schema_hash = new_tablet->schema_hash();
-    writer_context.data_dir = new_tablet->data_dir();
-    writer_context.rowset_type = (*base_rowset)->rowset_meta()->rowset_type();
-    if (new_tablet->tablet_meta()->preferred_rowset_type() == BETA_ROWSET) {
-        writer_context.rowset_type = BETA_ROWSET;
-    }
-    writer_context.path_desc = new_tablet->tablet_path_desc();
-    writer_context.tablet_schema = &(new_tablet->tablet_schema());
-    writer_context.rowset_state = PREPARED;
-    writer_context.txn_id = (*base_rowset)->txn_id();
-    writer_context.load_id.set_hi((*base_rowset)->load_id().hi());
-    writer_context.load_id.set_lo((*base_rowset)->load_id().lo());
-    writer_context.segments_overlap = (*base_rowset)->rowset_meta()->segments_overlap();
-
+    PUniqueId load_id;
+    load_id.set_hi((*base_rowset)->load_id().hi());
+    load_id.set_lo((*base_rowset)->load_id().lo());
     std::unique_ptr<RowsetWriter> rowset_writer;
     RowsetFactory::create_rowset_writer(writer_context, &rowset_writer);
+    RETURN_NOT_OK(new_tablet->create_rowset_writer((*base_rowset)->txn_id(), load_id, PREPARED, 
+                                     (*base_rowset)->rowset_meta()->segments_overlap(),
+                                     &rowset_writer));
 
     if ((res = sc_procedure->process(rowset_reader, rowset_writer.get(), new_tablet,
                                      base_tablet)) != OLAP_SUCCESS) {
@@ -1843,29 +1814,12 @@ OLAPStatus SchemaChangeHandler::_convert_historical_rowsets(const SchemaChangePa
         // As long as there is a new_table as running, ref table is set as running
         // NOTE If the first sub_table fails first, it will continue to go as normal here
         TabletSharedPtr new_tablet = sc_params.new_tablet;
-
-        RowsetWriterContext writer_context;
-        writer_context.rowset_id = StorageEngine::instance()->next_rowset_id();
-        writer_context.tablet_uid = new_tablet->tablet_uid();
-        writer_context.tablet_id = new_tablet->tablet_id();
-        writer_context.partition_id = new_tablet->partition_id();
-        writer_context.tablet_schema_hash = new_tablet->schema_hash();
-        writer_context.data_dir = new_tablet->data_dir();
-        // linked schema change can't change rowset type, therefore we preserve rowset type in schema change now
-        writer_context.rowset_type = rs_reader->rowset()->rowset_meta()->rowset_type();
-        if (sc_params.new_tablet->tablet_meta()->preferred_rowset_type() == BETA_ROWSET) {
-            // Use beta rowset to do schema change
-            // And in this case, linked schema change will not be used.
-            writer_context.rowset_type = BETA_ROWSET;
-        }
-        writer_context.path_desc = new_tablet->tablet_path_desc();
-        writer_context.tablet_schema = &(new_tablet->tablet_schema());
-        writer_context.rowset_state = VISIBLE;
-        writer_context.version = rs_reader->version();
-        writer_context.segments_overlap = rs_reader->rowset()->rowset_meta()->segments_overlap();
-
+        // When tablet create new rowset writer, it may change rowset type, in this case
+        // linked schema change will not be used.
         std::unique_ptr<RowsetWriter> rowset_writer;
-        OLAPStatus status = RowsetFactory::create_rowset_writer(writer_context, &rowset_writer);
+        OLAPStatus status = new_tablet->create_rowset_writer(rs_reader->version(), VISIBLE, 
+                                                             rs_reader->rowset()->rowset_meta()->segments_overlap(),
+                                                             &rowset_writer);
         if (status != OLAP_SUCCESS) {
             res = OLAP_ERR_ROWSET_BUILDER_INIT;
             goto PROCESS_ALTER_EXIT;
diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h
index 75235c96c8..74cbdc7a80 100644
--- a/be/src/olap/tablet.h
+++ b/be/src/olap/tablet.h
@@ -255,6 +255,13 @@ public:
         return _tablet_meta->all_beta();
     }
 
+    Status create_rowset_writer(Version& version, const RowsetStatePB& rowset_state, 
+                                const SegmentsOverlapPB& overlap, std::unique_ptr<RowsetWriter>* rowset_writer);
+
+    Status create_rowset_writer(const int64_t& txn_id, const PUniqueId& load_id, 
+                                const RowsetStatePB& rowset_state, const SegmentsOverlapPB& overlap, 
+                                std::unique_ptr<RowsetWriter>* rowset_writer);
+
 private:
     OLAPStatus _init_once_action();
     void _print_missed_versions(const std::vector<Version>& missed_versions) const;
diff --git a/be/src/olap/tablet_manager.cpp b/be/src/olap/tablet_manager.cpp
index 0d5b9714ab..571ed40401 100644
--- a/be/src/olap/tablet_manager.cpp
+++ b/be/src/olap/tablet_manager.cpp
@@ -39,7 +39,6 @@
 #include "olap/push_handler.h"
 #include "olap/reader.h"
 #include "olap/rowset/column_data_writer.h"
-#include "olap/rowset/rowset_factory.h"
 #include "olap/rowset/rowset_id_generator.h"
 #include "olap/schema_change.h"
 #include "olap/tablet.h"
@@ -1097,34 +1096,25 @@ OLAPStatus TabletManager::_create_initial_rowset_unlocked(const TCreateTabletReq
         VLOG_NOTICE << "begin to create init version. version=" << version;
         RowsetSharedPtr new_rowset;
         do {
-            RowsetWriterContext context;
-            context.rowset_id = StorageEngine::instance()->next_rowset_id();
-            context.tablet_uid = tablet->tablet_uid();
-            context.tablet_id = tablet->tablet_id();
-            context.partition_id = tablet->partition_id();
-            context.tablet_schema_hash = tablet->schema_hash();
-            context.data_dir = tablet->data_dir();
+            RowsetTypePB rowset_type = BETA_ROWSET;
             if (!request.__isset.storage_format ||
                 request.storage_format == TStorageFormat::DEFAULT) {
-                context.rowset_type = StorageEngine::instance()->default_rowset_type();
+                rowset_type = StorageEngine::instance()->default_rowset_type();
             } else if (request.storage_format == TStorageFormat::V1) {
-                context.rowset_type = RowsetTypePB::ALPHA_ROWSET;
+                rowset_type = RowsetTypePB::ALPHA_ROWSET;
             } else if (request.storage_format == TStorageFormat::V2) {
-                context.rowset_type = RowsetTypePB::BETA_ROWSET;
+                rowset_type = RowsetTypePB::BETA_ROWSET;
             } else {
                 LOG(ERROR) << "invalid TStorageFormat: " << request.storage_format;
-                DCHECK(false);
-                context.rowset_type = StorageEngine::instance()->default_rowset_type();
+                return OLAP_ERR_CE_CMD_PARAMS_ERROR;
+            }
+            if (rowset_type != BETA_ROWSET) {
+                LOG(WARNING) << "Only support beta rowset now, alpha rowset is not supported any more";
+                return OLAP_ERR_CE_CMD_PARAMS_ERROR;
             }
-            context.path_desc = tablet->tablet_path_desc();
-            context.tablet_schema = &(tablet->tablet_schema());
-            context.rowset_state = VISIBLE;
-            context.version = version;
             // there is no data in init rowset, so overlapping info is unknown.
-            context.segments_overlap = OVERLAP_UNKNOWN;
-
             std::unique_ptr<RowsetWriter> builder;
-            res = RowsetFactory::create_rowset_writer(context, &builder);
+            res = tablet->create_rowset_writer(version, VISIBLE, OVERLAP_UNKNOWN, &builder);
             if (res != OLAP_SUCCESS) {
                 LOG(WARNING) << "failed to init rowset writer for tablet " << tablet->full_name();
                 break;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org