You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by li...@apache.org on 2022/06/16 02:50:12 UTC
[incubator-doris] branch master updated: [Feature] [Vectorized] Some pre-refactorings or interface additions for schema change part2 (#10003)
This is an automated email from the ASF dual-hosted git repository.
lihaopeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new 5805f8077f [Feature] [Vectorized] Some pre-refactorings or interface additions for schema change part2 (#10003)
5805f8077f is described below
commit 5805f8077f77295f2dd58a3fc7b6660c7fb24262
Author: Pxl <95...@qq.com>
AuthorDate: Thu Jun 16 10:50:08 2022 +0800
[Feature] [Vectorized] Some pre-refactorings or interface additions for schema change part2 (#10003)
---
be/src/agent/task_worker_pool.cpp | 6 +-
be/src/common/config.h | 3 +
be/src/olap/push_handler.cpp | 32 +-
be/src/olap/push_handler.h | 43 +-
be/src/olap/reader.cpp | 30 +-
be/src/olap/reader.h | 12 +-
be/src/olap/rowset/beta_rowset_reader.h | 3 +-
be/src/olap/schema_change.cpp | 493 ++++++++-------------
be/src/olap/schema_change.h | 166 ++++---
be/src/olap/tablet.cpp | 3 +-
be/src/olap/task/engine_alter_tablet_task.cpp | 9 +-
be/src/olap/task/engine_alter_tablet_task.h | 8 +-
.../olap/task/engine_storage_migration_task_v2.cpp | 4 +-
be/src/olap/tuple_reader.cpp | 6 +-
be/src/olap/tuple_reader.h | 15 -
be/src/runtime/descriptors.cpp | 15 +-
be/src/runtime/descriptors.h | 55 ++-
be/src/runtime/runtime_state.cpp | 14 +
be/src/runtime/runtime_state.h | 25 +-
be/src/runtime/thread_mem_tracker_mgr.h | 3 +-
be/src/vec/columns/column.h | 14 +-
be/src/vec/olap/block_reader.cpp | 7 +-
be/src/vec/olap/block_reader.h | 7 -
be/test/olap/schema_change_test.cpp | 6 +-
.../java/org/apache/doris/alter/RollupJobV2.java | 23 +-
.../org/apache/doris/alter/SchemaChangeJobV2.java | 51 ++-
.../java/org/apache/doris/analysis/Analyzer.java | 7 +-
.../java/org/apache/doris/analysis/InsertStmt.java | 6 +-
.../doris/load/loadv2/LoadingTaskPlanner.java | 6 +-
.../org/apache/doris/load/loadv2/SparkLoadJob.java | 6 +-
.../apache/doris/load/update/UpdatePlanner.java | 6 +-
.../org/apache/doris/task/AlterReplicaTask.java | 28 +-
.../doris/planner/StreamLoadScanNodeTest.java | 90 +---
gensrc/thrift/AgentService.thrift | 1 +
34 files changed, 552 insertions(+), 651 deletions(-)
diff --git a/be/src/agent/task_worker_pool.cpp b/be/src/agent/task_worker_pool.cpp
index 79953f6709..06ca61eef0 100644
--- a/be/src/agent/task_worker_pool.cpp
+++ b/be/src/agent/task_worker_pool.cpp
@@ -535,7 +535,7 @@ void TaskWorkerPool::_alter_tablet(const TAgentTaskRequest& agent_task_req, int6
if (sc_status.precise_code() == OLAP_ERR_DATA_QUALITY_ERR) {
error_msgs.push_back("The data quality does not satisfy, please check your data. ");
}
- status = Status::DataQualityError("The data quality does not satisfy");
+ status = sc_status;
} else {
status = Status::OK();
}
@@ -620,7 +620,7 @@ void TaskWorkerPool::_push_worker_thread_callback() {
agent_task_req = _tasks[index];
push_req = agent_task_req.push_req;
_tasks.erase(_tasks.begin() + index);
- } while (0);
+ } while (false);
if (index < 0) {
// there is no high priority task in queue
@@ -1764,7 +1764,7 @@ void TaskWorkerPool::_storage_medium_migrate_v2(const TAgentTaskRequest& agent_t
if (sc_status.precise_code() == OLAP_ERR_DATA_QUALITY_ERR) {
error_msgs.push_back("The data quality does not satisfy, please check your data. ");
}
- status = Status::DataQualityError("The data quality does not satisfy");
+ status = sc_status;
} else {
status = Status::OK();
}
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 8a922f076e..4885c17193 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -244,6 +244,9 @@ CONF_Bool(enable_low_cardinality_optimize, "true");
CONF_mBool(disable_auto_compaction, "false");
// whether enable vectorized compaction
CONF_Bool(enable_vectorized_compaction, "true");
+// whether enable vectorized schema change
+CONF_Bool(enable_vectorized_alter_table, "false");
+
// check the configuration of auto compaction in seconds when auto compaction disabled
CONF_mInt32(check_auto_compaction_interval_seconds, "5");
diff --git a/be/src/olap/push_handler.cpp b/be/src/olap/push_handler.cpp
index 02e0d46dc9..4ab7ecfddd 100644
--- a/be/src/olap/push_handler.cpp
+++ b/be/src/olap/push_handler.cpp
@@ -22,6 +22,7 @@
#include <iostream>
#include <sstream>
+#include "common/object_pool.h"
#include "common/status.h"
#include "exec/parquet_scanner.h"
#include "olap/row.h"
@@ -32,11 +33,6 @@
#include "olap/tablet.h"
#include "runtime/exec_env.h"
-using std::list;
-using std::map;
-using std::string;
-using std::vector;
-
namespace doris {
// Process push command, the main logical is as follows:
@@ -60,6 +56,9 @@ Status PushHandler::process_streaming_ingestion(TabletSharedPtr tablet, const TP
Status res = Status::OK();
_request = request;
+
+ DescriptorTbl::create(&_pool, _request.desc_tbl, &_desc_tbl);
+
std::vector<TabletVars> tablet_vars(1);
tablet_vars[0].tablet = tablet;
res = _do_streaming_ingestion(tablet, request, push_type, &tablet_vars, tablet_info_vec);
@@ -315,16 +314,15 @@ Status PushHandler::_convert_v2(TabletSharedPtr cur_tablet, TabletSharedPtr new_
// 5. Convert data for schema change tables
VLOG_TRACE << "load to related tables of schema_change if possible.";
if (new_tablet != nullptr) {
- auto schema_change_handler = SchemaChangeHandler::instance();
- res = schema_change_handler->schema_version_convert(cur_tablet, new_tablet, cur_rowset,
- new_rowset);
+ res = SchemaChangeHandler::schema_version_convert(cur_tablet, new_tablet, cur_rowset,
+ new_rowset, *_desc_tbl);
if (!res.ok()) {
LOG(WARNING) << "failed to change schema version for delta."
<< "[res=" << res << " new_tablet='" << new_tablet->full_name()
<< "']";
}
}
- } while (0);
+ } while (false);
VLOG_TRACE << "convert delta file end. res=" << res << ", tablet=" << cur_tablet->full_name()
<< ", processed_rows" << num_rows;
@@ -456,16 +454,15 @@ Status PushHandler::_convert(TabletSharedPtr cur_tablet, TabletSharedPtr new_tab
// 7. Convert data for schema change tables
VLOG_TRACE << "load to related tables of schema_change if possible.";
if (new_tablet != nullptr) {
- auto schema_change_handler = SchemaChangeHandler::instance();
- res = schema_change_handler->schema_version_convert(cur_tablet, new_tablet, cur_rowset,
- new_rowset);
+ res = SchemaChangeHandler::schema_version_convert(cur_tablet, new_tablet, cur_rowset,
+ new_rowset, *_desc_tbl);
if (!res.ok()) {
LOG(WARNING) << "failed to change schema version for delta."
<< "[res=" << res << " new_tablet='" << new_tablet->full_name()
<< "']";
}
}
- } while (0);
+ } while (false);
SAFE_DELETE(reader);
VLOG_TRACE << "convert delta file end. res=" << res << ", tablet=" << cur_tablet->full_name()
@@ -502,7 +499,7 @@ IBinaryReader* IBinaryReader::create(bool need_decompress) {
return reader;
}
-BinaryReader::BinaryReader() : IBinaryReader(), _row_buf(nullptr), _row_buf_size(0) {}
+BinaryReader::BinaryReader() : _row_buf(nullptr), _row_buf_size(0) {}
Status BinaryReader::init(TabletSharedPtr tablet, BinaryFile* file) {
Status res = Status::OK();
@@ -527,7 +524,7 @@ Status BinaryReader::init(TabletSharedPtr tablet, BinaryFile* file) {
_tablet = tablet;
_ready = true;
- } while (0);
+ } while (false);
if (!res.ok()) {
SAFE_DELETE_ARRAY(_row_buf);
@@ -637,8 +634,7 @@ Status BinaryReader::next(RowCursor* row) {
}
LzoBinaryReader::LzoBinaryReader()
- : IBinaryReader(),
- _row_buf(nullptr),
+ : _row_buf(nullptr),
_row_compressed_buf(nullptr),
_row_info_buf(nullptr),
_max_row_num(0),
@@ -670,7 +666,7 @@ Status LzoBinaryReader::init(TabletSharedPtr tablet, BinaryFile* file) {
_tablet = tablet;
_ready = true;
- } while (0);
+ } while (false);
if (!res.ok()) {
SAFE_DELETE_ARRAY(_row_info_buf);
diff --git a/be/src/olap/push_handler.h b/be/src/olap/push_handler.h
index 13da3018d4..a290eb01c7 100644
--- a/be/src/olap/push_handler.h
+++ b/be/src/olap/push_handler.h
@@ -45,10 +45,10 @@ struct TabletVars {
class PushHandler {
public:
- typedef std::vector<ColumnMapping> SchemaMapping;
+ using SchemaMapping = std::vector<ColumnMapping>;
- PushHandler() {}
- ~PushHandler() {}
+ PushHandler() = default;
+ ~PushHandler() = default;
// Load local data file into specified tablet.
Status process_streaming_ingestion(TabletSharedPtr tablet, const TPushReq& request,
@@ -80,6 +80,9 @@ private:
// mainly tablet_id, version and delta file path
TPushReq _request;
+ ObjectPool _pool;
+ DescriptorTbl* _desc_tbl = nullptr;
+
int64_t _write_bytes = 0;
int64_t _write_rows = 0;
DISALLOW_COPY_AND_ASSIGN(PushHandler);
@@ -88,7 +91,7 @@ private:
// package FileHandlerWithBuf to read header of dpp output file
class BinaryFile : public FileHandlerWithBuf {
public:
- BinaryFile() {}
+ BinaryFile() = default;
virtual ~BinaryFile() { close(); }
Status init(const char* path);
@@ -107,7 +110,7 @@ private:
class IBinaryReader {
public:
static IBinaryReader* create(bool need_decompress);
- virtual ~IBinaryReader() {}
+ virtual ~IBinaryReader() = default;
virtual Status init(TabletSharedPtr tablet, BinaryFile* file) = 0;
virtual Status finalize() = 0;
@@ -139,14 +142,14 @@ protected:
class BinaryReader : public IBinaryReader {
public:
explicit BinaryReader();
- virtual ~BinaryReader() { finalize(); }
+ ~BinaryReader() override { finalize(); }
- virtual Status init(TabletSharedPtr tablet, BinaryFile* file);
- virtual Status finalize();
+ Status init(TabletSharedPtr tablet, BinaryFile* file) override;
+ Status finalize() override;
- virtual Status next(RowCursor* row);
+ Status next(RowCursor* row) override;
- virtual bool eof() { return _curr >= _content_len; }
+ bool eof() override { return _curr >= _content_len; }
private:
char* _row_buf;
@@ -156,20 +159,20 @@ private:
class LzoBinaryReader : public IBinaryReader {
public:
explicit LzoBinaryReader();
- virtual ~LzoBinaryReader() { finalize(); }
+ ~LzoBinaryReader() override { finalize(); }
- virtual Status init(TabletSharedPtr tablet, BinaryFile* file);
- virtual Status finalize();
+ Status init(TabletSharedPtr tablet, BinaryFile* file) override;
+ Status finalize() override;
- virtual Status next(RowCursor* row);
+ Status next(RowCursor* row) override;
- virtual bool eof() { return _curr >= _content_len && _row_num == 0; }
+ bool eof() override { return _curr >= _content_len && _row_num == 0; }
private:
Status _next_block();
- typedef uint32_t RowNumType;
- typedef uint64_t CompressedSizeType;
+ using RowNumType = uint32_t;
+ using CompressedSizeType = uint64_t;
char* _row_buf;
char* _row_compressed_buf;
@@ -184,7 +187,7 @@ private:
class PushBrokerReader {
public:
PushBrokerReader() : _ready(false), _eof(false), _fill_tuple(false) {}
- ~PushBrokerReader() {}
+ ~PushBrokerReader() = default;
Status init(const Schema* schema, const TBrokerScanRange& t_scan_range,
const TDescriptorTable& t_desc_tbl);
@@ -195,8 +198,8 @@ public:
_ready = false;
return Status::OK();
}
- bool eof() { return _eof; }
- bool is_fill_tuple() { return _fill_tuple; }
+ bool eof() const { return _eof; }
+ bool is_fill_tuple() const { return _fill_tuple; }
MemPool* mem_pool() { return _mem_pool.get(); }
private:
diff --git a/be/src/olap/reader.cpp b/be/src/olap/reader.cpp
index 0d59b7f969..497c014e46 100644
--- a/be/src/olap/reader.cpp
+++ b/be/src/olap/reader.cpp
@@ -23,23 +23,19 @@
#include <charconv>
#include <unordered_set>
+#include "common/status.h"
#include "olap/bloom_filter_predicate.h"
#include "olap/comparison_predicate.h"
#include "olap/in_list_predicate.h"
#include "olap/null_predicate.h"
+#include "olap/olap_common.h"
#include "olap/row.h"
-#include "olap/row_block.h"
#include "olap/row_cursor.h"
-#include "olap/rowset/beta_rowset_reader.h"
-#include "olap/rowset/column_data.h"
#include "olap/schema.h"
-#include "olap/storage_engine.h"
#include "olap/tablet.h"
#include "runtime/mem_pool.h"
-#include "runtime/string_value.hpp"
#include "util/date_func.h"
#include "util/mem_util.hpp"
-#include "vec/olap/vcollect_iterator.h"
using std::nothrow;
using std::set;
@@ -313,7 +309,8 @@ Status TabletReader::_init_return_columns(const ReaderParams& read_params) {
}
VLOG_NOTICE << "return column is empty, using full column as default.";
} else if ((read_params.reader_type == READER_CUMULATIVE_COMPACTION ||
- read_params.reader_type == READER_BASE_COMPACTION) &&
+ read_params.reader_type == READER_BASE_COMPACTION ||
+ read_params.reader_type == READER_ALTER_TABLE) &&
!read_params.return_columns.empty()) {
_return_columns = read_params.return_columns;
for (auto id : read_params.return_columns) {
@@ -834,12 +831,6 @@ Status TabletReader::_init_delete_condition(const ReaderParams& read_params) {
if (read_params.reader_type == READER_CUMULATIVE_COMPACTION) {
return Status::OK();
}
- Status ret;
- {
- std::shared_lock rdlock(_tablet->get_header_lock());
- ret = _delete_handler.init(_tablet->tablet_schema(), _tablet->delete_predicates(),
- read_params.version.second, this);
- }
// Only BASE_COMPACTION need set filter_delete = true
// other reader type:
// QUERY will filter the row in query layer to keep right result use where clause.
@@ -847,7 +838,18 @@ Status TabletReader::_init_delete_condition(const ReaderParams& read_params) {
if (read_params.reader_type == READER_BASE_COMPACTION) {
_filter_delete = true;
}
- return ret;
+
+ auto delete_init = [&]() -> Status {
+ return _delete_handler.init(_tablet->tablet_schema(), _tablet->delete_predicates(),
+ read_params.version.second, this);
+ };
+
+ if (read_params.reader_type == READER_ALTER_TABLE) {
+ return delete_init();
+ }
+
+ std::shared_lock rdlock(_tablet->get_header_lock());
+ return delete_init();
}
} // namespace doris
diff --git a/be/src/olap/reader.h b/be/src/olap/reader.h
index f91145d07c..2593a51bae 100644
--- a/be/src/olap/reader.h
+++ b/be/src/olap/reader.h
@@ -20,20 +20,10 @@
#include <gen_cpp/PaloInternalService_types.h>
#include <thrift/protocol/TDebugProtocol.h>
-#include <list>
-#include <memory>
-#include <queue>
-#include <sstream>
-#include <stack>
-#include <string>
-#include <utility>
-#include <vector>
-
#include "exprs/bloomfilter_predicate.h"
#include "olap/column_predicate.h"
#include "olap/delete_handler.h"
#include "olap/olap_cond.h"
-#include "olap/olap_define.h"
#include "olap/row_cursor.h"
#include "olap/rowset/rowset_reader.h"
#include "olap/tablet.h"
@@ -130,7 +120,7 @@ public:
uint64_t filtered_rows() const {
return _stats.rows_del_filtered + _stats.rows_conditions_filtered +
- _stats.rows_vec_del_cond_filtered;
+ _stats.rows_vec_del_cond_filtered + _stats.rows_vec_cond_filtered;
}
void set_batch_size(int batch_size) { _batch_size = batch_size; }
diff --git a/be/src/olap/rowset/beta_rowset_reader.h b/be/src/olap/rowset/beta_rowset_reader.h
index 564980bac8..de1251f13f 100644
--- a/be/src/olap/rowset/beta_rowset_reader.h
+++ b/be/src/olap/rowset/beta_rowset_reader.h
@@ -47,7 +47,8 @@ public:
// Return the total number of filtered rows, will be used for validation of schema change
int64_t filtered_rows() override {
- return _stats->rows_del_filtered + _stats->rows_conditions_filtered;
+ return _stats->rows_del_filtered + _stats->rows_conditions_filtered +
+ _stats->rows_vec_del_cond_filtered + _stats->rows_vec_cond_filtered;
}
RowsetTypePB type() const override { return RowsetTypePB::BETA_ROWSET; }
diff --git a/be/src/olap/schema_change.cpp b/be/src/olap/schema_change.cpp
index f7d86d8d86..68724d29bc 100644
--- a/be/src/olap/schema_change.cpp
+++ b/be/src/olap/schema_change.cpp
@@ -17,15 +17,12 @@
#include "olap/schema_change.h"
-#include <pthread.h>
-#include <signal.h>
-
-#include <algorithm>
#include <vector>
-#include "agent/cgroups_mgr.h"
-#include "common/resource_tls.h"
+#include "common/status.h"
+#include "gutil/integral_types.h"
#include "olap/merger.h"
+#include "olap/olap_common.h"
#include "olap/row.h"
#include "olap/row_block.h"
#include "olap/row_cursor.h"
@@ -33,21 +30,23 @@
#include "olap/storage_engine.h"
#include "olap/tablet.h"
#include "olap/wrapper_field.h"
-#include "runtime/exec_env.h"
-#include "runtime/mem_pool.h"
#include "runtime/mem_tracker.h"
#include "util/defer_op.h"
+#include "vec/aggregate_functions/aggregate_function.h"
+#include "vec/aggregate_functions/aggregate_function_reader.h"
+#include "vec/aggregate_functions/aggregate_function_simple_factory.h"
+#include "vec/columns/column.h"
+#include "vec/core/block.h"
+#include "vec/exprs/vexpr.h"
+#include "vec/exprs/vexpr_context.h"
+#include "vec/olap/block_reader.h"
-using std::deque;
-using std::list;
using std::nothrow;
-using std::pair;
-using std::string;
-using std::stringstream;
-using std::vector;
namespace doris {
+constexpr int ALTER_TABLE_BATCH_SIZE = 4096;
+
class RowBlockSorter {
public:
explicit RowBlockSorter(RowBlockAllocator* allocator);
@@ -92,19 +91,20 @@ private:
std::priority_queue<MergeElement> _heap;
};
-RowBlockChanger::RowBlockChanger(const TabletSchema& tablet_schema) {
+RowBlockChanger::RowBlockChanger(const TabletSchema& tablet_schema, DescriptorTbl desc_tbl)
+ : _desc_tbl(desc_tbl) {
_schema_mapping.resize(tablet_schema.num_columns());
}
RowBlockChanger::RowBlockChanger(const TabletSchema& tablet_schema,
- const DeleteHandler* delete_handler) {
+ const DeleteHandler* delete_handler, DescriptorTbl desc_tbl)
+ : _desc_tbl(desc_tbl) {
_schema_mapping.resize(tablet_schema.num_columns());
_delete_handler = delete_handler;
}
RowBlockChanger::~RowBlockChanger() {
- SchemaMapping::iterator it = _schema_mapping.begin();
- for (; it != _schema_mapping.end(); ++it) {
+ for (auto it = _schema_mapping.begin(); it != _schema_mapping.end(); ++it) {
SAFE_DELETE(it->default_value);
}
_schema_mapping.clear();
@@ -212,7 +212,7 @@ public:
}
private:
- typedef std::pair<FieldType, FieldType> convert_type_pair;
+ using convert_type_pair = std::pair<FieldType, FieldType>;
std::unordered_set<convert_type_pair, ConvertTypeMapHash> _convert_type_set;
DISALLOW_COPY_AND_ASSIGN(ConvertTypeResolver);
@@ -230,7 +230,7 @@ ConvertTypeResolver::ConvertTypeResolver() {
add_convert_type_mapping<OLAP_FIELD_TYPE_CHAR, OLAP_FIELD_TYPE_DATE>();
// supported type convert should annotate in doc:
- // http://doris.incubator.apache.org/master/zh-CN/sql-reference/sql-statements/Data%20Definition/ALTER%20TABLE.html#description
+ // https://doris.apache.org/zh-CN/sql-manual/sql-reference/Data-Definition-Statements/Alter/ALTER-TABLE-COLUMN.html#alter-table-column
// If type convert is supported here, you should check fe/src/main/java/org/apache/doris/catalog/ColumnType.java to supported it either
// from varchar type
add_convert_type_mapping<OLAP_FIELD_TYPE_VARCHAR, OLAP_FIELD_TYPE_TINYINT>();
@@ -287,7 +287,7 @@ ConvertTypeResolver::ConvertTypeResolver() {
add_convert_type_mapping<OLAP_FIELD_TYPE_INT, OLAP_FIELD_TYPE_DATE>();
}
-ConvertTypeResolver::~ConvertTypeResolver() {}
+ConvertTypeResolver::~ConvertTypeResolver() = default;
bool to_bitmap(RowCursor* read_helper, RowCursor* write_helper, const TabletColumn& ref_column,
int field_idx, int ref_field_idx, MemPool* mem_pool) {
@@ -496,7 +496,7 @@ Status RowBlockChanger::change_row_block(const RowBlock* ref_block, int32_t data
if (_schema_mapping[i].ref_column >= 0) {
if (!_schema_mapping[i].materialized_function.empty()) {
bool (*_do_materialized_transform)(RowCursor*, RowCursor*, const TabletColumn&, int,
- int, MemPool*);
+ int, MemPool*) = nullptr;
if (_schema_mapping[i].materialized_function == "to_bitmap") {
_do_materialized_transform = to_bitmap;
} else if (_schema_mapping[i].materialized_function == "hll_hash") {
@@ -545,7 +545,7 @@ Status RowBlockChanger::change_row_block(const RowBlock* ref_block, int32_t data
mutable_block->get_row(new_row_index++, &write_helper);
ref_block->get_row(row_index, &read_helper);
- if (true == read_helper.is_null(ref_column)) {
+ if (read_helper.is_null(ref_column)) {
write_helper.set_null(i);
} else {
write_helper.set_not_null(i);
@@ -693,7 +693,7 @@ bool RowBlockSorter::sort(RowBlock** row_block) {
RowCursor helper_row;
auto res = helper_row.init(_swap_row_block->tablet_schema());
- if (!res.ok()) {
+ if (!res) {
LOG(WARNING) << "row cursor init failed.res:" << res;
return false;
}
@@ -807,7 +807,7 @@ bool RowBlockAllocator::is_memory_enough_for_sorting(size_t num_rows, size_t all
RowBlockMerger::RowBlockMerger(TabletSharedPtr tablet) : _tablet(tablet) {}
-RowBlockMerger::~RowBlockMerger() {}
+RowBlockMerger::~RowBlockMerger() = default;
bool RowBlockMerger::merge(const std::vector<RowBlock*>& row_block_arr, RowsetWriter* rowset_writer,
uint64_t* merged_rows) {
@@ -815,14 +815,24 @@ bool RowBlockMerger::merge(const std::vector<RowBlock*>& row_block_arr, RowsetWr
RowCursor row_cursor;
std::unique_ptr<MemPool> mem_pool(new MemPool("RowBlockMerger"));
std::unique_ptr<ObjectPool> agg_object_pool(new ObjectPool());
+
+ auto merge_error = [&]() -> bool {
+ while (!_heap.empty()) {
+ MergeElement element = _heap.top();
+ _heap.pop();
+ SAFE_DELETE(element.row_cursor);
+ }
+ return false;
+ };
+
if (row_cursor.init(_tablet->tablet_schema()) != Status::OK()) {
LOG(WARNING) << "fail to init row cursor.";
- goto MERGE_ERR;
+ return merge_error();
}
if (!_make_heap(row_block_arr)) {
// There is error log in _make_heap, so no need to more log.
- goto MERGE_ERR;
+ return merge_error();
}
row_cursor.allocate_memory_for_string_type(_tablet->tablet_schema());
@@ -835,7 +845,7 @@ bool RowBlockMerger::merge(const std::vector<RowBlock*>& row_block_arr, RowsetWr
if (KeysType::DUP_KEYS == _tablet->keys_type()) {
if (rowset_writer->add_row(row_cursor) != Status::OK()) {
LOG(WARNING) << "fail to add row to rowset writer.";
- goto MERGE_ERR;
+ return merge_error();
}
continue;
}
@@ -850,7 +860,7 @@ bool RowBlockMerger::merge(const std::vector<RowBlock*>& row_block_arr, RowsetWr
agg_finalize_row(&row_cursor, mem_pool.get());
if (rowset_writer->add_row(row_cursor) != Status::OK()) {
LOG(WARNING) << "fail to add row to rowset writer.";
- goto MERGE_ERR;
+ return merge_error();
}
// the memory allocate by mem pool has been copied,
@@ -860,20 +870,11 @@ bool RowBlockMerger::merge(const std::vector<RowBlock*>& row_block_arr, RowsetWr
}
if (rowset_writer->flush() != Status::OK()) {
LOG(WARNING) << "failed to finalizing writer.";
- goto MERGE_ERR;
+ return merge_error();
}
*merged_rows = tmp_merged_rows;
return true;
-
-MERGE_ERR:
- while (_heap.size() > 0) {
- MergeElement element = _heap.top();
- _heap.pop();
- SAFE_DELETE(element.row_cursor);
- }
-
- return false;
}
bool RowBlockMerger::_make_heap(const std::vector<RowBlock*>& row_block_arr) {
@@ -914,40 +915,35 @@ void RowBlockMerger::_pop_heap() {
element.row_block->get_row(element.row_block_index, element.row_cursor);
_heap.push(element);
- return;
}
-Status LinkedSchemaChange::process(RowsetReaderSharedPtr rowset_reader,
- RowsetWriter* new_rowset_writer, TabletSharedPtr new_tablet,
- TabletSharedPtr base_tablet) {
+Status LinkedSchemaChange::process(RowsetReaderSharedPtr rowset_reader, RowsetWriter* rowset_writer,
+ TabletSharedPtr new_tablet, TabletSharedPtr base_tablet) {
// In some cases, there may be more than one type of rowset in a tablet,
// in which case the conversion cannot be done directly by linked schema change,
// but requires direct schema change to rewrite the data.
- if (rowset_reader->type() != new_rowset_writer->type()) {
+ if (rowset_reader->type() != rowset_writer->type()) {
LOG(INFO) << "the type of rowset " << rowset_reader->rowset()->rowset_id()
<< " in base tablet " << base_tablet->tablet_id() << " is not same as type "
- << new_rowset_writer->type() << ", use direct schema change.";
- SchemaChangeDirectly scd(_row_block_changer);
- return scd.process(rowset_reader, new_rowset_writer, new_tablet, base_tablet);
+ << rowset_writer->type() << ", use direct schema change.";
+ return SchemaChangeHandler::get_sc_procedure(_row_block_changer, false, true)
+ ->process(rowset_reader, rowset_writer, new_tablet, base_tablet);
} else {
- Status status = new_rowset_writer->add_rowset_for_linked_schema_change(
+ Status status = rowset_writer->add_rowset_for_linked_schema_change(
rowset_reader->rowset(), _row_block_changer.get_schema_mapping());
- if (!status.ok()) {
+ if (!status) {
LOG(WARNING) << "fail to convert rowset."
<< ", new_tablet=" << new_tablet->full_name()
<< ", base_tablet=" << base_tablet->full_name()
- << ", version=" << new_rowset_writer->version().first << "-"
- << new_rowset_writer->version().second;
+ << ", version=" << rowset_writer->version().first << "-"
+ << rowset_writer->version().second;
}
return status;
}
}
SchemaChangeDirectly::SchemaChangeDirectly(const RowBlockChanger& row_block_changer)
- : SchemaChange(),
- _row_block_changer(row_block_changer),
- _row_block_allocator(nullptr),
- _cursor(nullptr) {}
+ : _row_block_changer(row_block_changer), _row_block_allocator(nullptr), _cursor(nullptr) {}
SchemaChangeDirectly::~SchemaChangeDirectly() {
VLOG_NOTICE << "~SchemaChangeDirectly()";
@@ -985,9 +981,9 @@ Status reserve_block(std::unique_ptr<RowBlock, RowBlockDeleter>* block_handle_pt
return Status::OK();
}
-Status SchemaChangeDirectly::process(RowsetReaderSharedPtr rowset_reader,
- RowsetWriter* rowset_writer, TabletSharedPtr new_tablet,
- TabletSharedPtr base_tablet) {
+Status SchemaChangeDirectly::_inner_process(RowsetReaderSharedPtr rowset_reader,
+ RowsetWriter* rowset_writer, TabletSharedPtr new_tablet,
+ TabletSharedPtr base_tablet) {
if (_row_block_allocator == nullptr) {
_row_block_allocator = new RowBlockAllocator(new_tablet->tablet_schema(), 0);
if (_row_block_allocator == nullptr) {
@@ -1010,16 +1006,6 @@ Status SchemaChangeDirectly::process(RowsetReaderSharedPtr rowset_reader,
}
Status res = Status::OK();
- if (rowset_reader->rowset()->empty() || rowset_reader->rowset()->num_rows() == 0) {
- res = rowset_writer->flush();
- if (!res.ok()) {
- LOG(WARNING) << "create empty version for schema change failed."
- << "version=" << rowset_writer->version().first << "-"
- << rowset_writer->version().second;
- return Status::OLAPInternalError(OLAP_ERR_INPUT_PARAMETER_ERROR);
- }
- return Status::OK();
- }
VLOG_NOTICE << "init writer. new_tablet=" << new_tablet->full_name()
<< ", block_row_number=" << new_tablet->num_rows_per_row_block();
@@ -1030,10 +1016,6 @@ Status SchemaChangeDirectly::process(RowsetReaderSharedPtr rowset_reader,
}
});
- // Reset filtered_rows and merged_rows statistic
- reset_merged_rows();
- reset_filtered_rows();
-
RowBlock* ref_row_block = nullptr;
rowset_reader->next_block(&ref_row_block);
while (ref_row_block != nullptr && ref_row_block->has_remaining()) {
@@ -1049,7 +1031,7 @@ Status SchemaChangeDirectly::process(RowsetReaderSharedPtr rowset_reader,
RETURN_NOT_OK_LOG(res, "failed to change data in row block.");
// rows filtered by delete handler one by one
- add_filtered_rows(filtered_rows);
+ _add_filtered_rows(filtered_rows);
if (!_write_row_block(rowset_writer, new_row_block.get())) {
res = Status::OLAPInternalError(OLAP_ERR_SCHEMA_CHANGE_INFO_INVALID);
@@ -1065,48 +1047,25 @@ Status SchemaChangeDirectly::process(RowsetReaderSharedPtr rowset_reader,
return Status::OLAPInternalError(OLAP_ERR_ALTER_STATUS_ERR);
}
- // rows filtered by zone map against delete handler
- add_filtered_rows(rowset_reader->filtered_rows());
-
- // Check row num changes
- if (config::row_nums_check) {
- if (rowset_reader->rowset()->num_rows() !=
- rowset_writer->num_rows() + merged_rows() + filtered_rows()) {
- LOG(WARNING) << "fail to check row num! "
- << "source_rows=" << rowset_reader->rowset()->num_rows()
- << ", merged_rows=" << merged_rows()
- << ", filtered_rows=" << filtered_rows()
- << ", new_index_rows=" << rowset_writer->num_rows();
- res = Status::OLAPInternalError(OLAP_ERR_ALTER_STATUS_ERR);
- }
- }
- LOG(INFO) << "all row nums. source_rows=" << rowset_reader->rowset()->num_rows()
- << ", merged_rows=" << merged_rows() << ", filtered_rows=" << filtered_rows()
- << ", new_index_rows=" << rowset_writer->num_rows();
return res;
}
SchemaChangeWithSorting::SchemaChangeWithSorting(const RowBlockChanger& row_block_changer,
size_t memory_limitation)
- : SchemaChange(),
- _row_block_changer(row_block_changer),
+ : _row_block_changer(row_block_changer),
_memory_limitation(memory_limitation),
- _row_block_allocator(nullptr) {
- // Every time SchemaChange is used for external rowing, some temporary versions (such as 999, 1000, 1001) will be written, in order to avoid Cache conflicts, temporary
- // The version number takes a BIG NUMBER plus the version number of the current SchemaChange
- _temp_delta_versions.first = (1 << 28);
- _temp_delta_versions.second = (1 << 28);
- // TODO(zyh): remove the magic number
-}
+ _temp_delta_versions(Version::mock()),
+ _row_block_allocator(nullptr) {}
SchemaChangeWithSorting::~SchemaChangeWithSorting() {
VLOG_NOTICE << "~SchemaChangeWithSorting()";
SAFE_DELETE(_row_block_allocator);
}
-Status SchemaChangeWithSorting::process(RowsetReaderSharedPtr rowset_reader,
- RowsetWriter* new_rowset_writer, TabletSharedPtr new_tablet,
- TabletSharedPtr base_tablet) {
+Status SchemaChangeWithSorting::_inner_process(RowsetReaderSharedPtr rowset_reader,
+ RowsetWriter* rowset_writer,
+ TabletSharedPtr new_tablet,
+ TabletSharedPtr base_tablet) {
if (_row_block_allocator == nullptr) {
_row_block_allocator =
new (nothrow) RowBlockAllocator(new_tablet->tablet_schema(), _memory_limitation);
@@ -1119,17 +1078,6 @@ Status SchemaChangeWithSorting::process(RowsetReaderSharedPtr rowset_reader,
Status res = Status::OK();
RowsetSharedPtr rowset = rowset_reader->rowset();
- if (rowset->empty() || rowset->num_rows() == 0) {
- res = new_rowset_writer->flush();
- if (!res.ok()) {
- LOG(WARNING) << "create empty version for schema change failed."
- << " version=" << new_rowset_writer->version().first << "-"
- << new_rowset_writer->version().second;
- return Status::OLAPInternalError(OLAP_ERR_INPUT_PARAMETER_ERROR);
- }
- return Status::OK();
- }
-
RowBlockSorter row_block_sorter(_row_block_allocator);
// for internal sorting
@@ -1155,10 +1103,6 @@ Status SchemaChangeWithSorting::process(RowsetReaderSharedPtr rowset_reader,
_temp_delta_versions.first = _temp_delta_versions.second;
- // Reset filtered_rows and merged_rows statistic
- reset_merged_rows();
- reset_filtered_rows();
-
SegmentsOverlapPB segments_overlap = rowset->rowset_meta()->segments_overlap();
RowBlock* ref_row_block = nullptr;
rowset_reader->next_block(&ref_row_block);
@@ -1180,7 +1124,7 @@ Status SchemaChangeWithSorting::process(RowsetReaderSharedPtr rowset_reader,
}
if (new_row_block == nullptr) {
- if (row_block_arr.size() < 1) {
+ if (row_block_arr.empty()) {
LOG(WARNING) << "Memory limitation is too small for Schema Change."
<< "memory_limitation=" << _memory_limitation;
return Status::OLAPInternalError(OLAP_ERR_INPUT_PARAMETER_ERROR);
@@ -1212,12 +1156,12 @@ Status SchemaChangeWithSorting::process(RowsetReaderSharedPtr rowset_reader,
uint64_t filtered_rows = 0;
res = _row_block_changer.change_row_block(ref_row_block, rowset_reader->version().second,
new_row_block, &filtered_rows);
- if (!res.ok()) {
+ if (!res) {
row_block_arr.push_back(new_row_block);
LOG(WARNING) << "failed to change data in row block.";
return res;
}
- add_filtered_rows(filtered_rows);
+ _add_filtered_rows(filtered_rows);
if (new_row_block->row_block_info().row_num > 0) {
if (!row_block_sorter.sort(&new_row_block)) {
@@ -1260,35 +1204,18 @@ Status SchemaChangeWithSorting::process(RowsetReaderSharedPtr rowset_reader,
}
if (src_rowsets.empty()) {
- res = new_rowset_writer->flush();
- if (!res.ok()) {
+ res = rowset_writer->flush();
+ if (!res) {
LOG(WARNING) << "create empty version for schema change failed."
- << " version=" << new_rowset_writer->version().first << "-"
- << new_rowset_writer->version().second;
+ << " version=" << rowset_writer->version().first << "-"
+ << rowset_writer->version().second;
return Status::OLAPInternalError(OLAP_ERR_ALTER_STATUS_ERR);
}
- } else if (!_external_sorting(src_rowsets, new_rowset_writer, new_tablet)) {
+ } else if (!_external_sorting(src_rowsets, rowset_writer, new_tablet)) {
LOG(WARNING) << "failed to sorting externally.";
return Status::OLAPInternalError(OLAP_ERR_ALTER_STATUS_ERR);
}
- add_filtered_rows(rowset_reader->filtered_rows());
-
- // Check row num changes
- if (config::row_nums_check) {
- if (rowset_reader->rowset()->num_rows() !=
- new_rowset_writer->num_rows() + merged_rows() + filtered_rows()) {
- LOG(WARNING) << "fail to check row num!"
- << " source_rows=" << rowset_reader->rowset()->num_rows()
- << ", merged_rows=" << merged_rows()
- << ", filtered_rows=" << filtered_rows()
- << ", new_index_rows=" << new_rowset_writer->num_rows();
- res = Status::OLAPInternalError(OLAP_ERR_ALTER_STATUS_ERR);
- }
- }
- LOG(INFO) << "all row nums. source_rows=" << rowset_reader->rowset()->num_rows()
- << ", merged_rows=" << merged_rows() << ", filtered_rows=" << filtered_rows()
- << ", new_index_rows=" << new_rowset_writer->num_rows();
return res;
}
@@ -1315,7 +1242,7 @@ bool SchemaChangeWithSorting::_internal_sorting(const std::vector<RowBlock*>& ro
}
new_tablet->data_dir()->remove_pending_ids(ROWSET_ID_PREFIX +
rowset_writer->rowset_id().to_string());
- add_merged_rows(merged_rows);
+ _add_merged_rows(merged_rows);
*rowset = rowset_writer->build();
return true;
}
@@ -1327,31 +1254,27 @@ bool SchemaChangeWithSorting::_external_sorting(vector<RowsetSharedPtr>& src_row
for (auto& rowset : src_rowsets) {
RowsetReaderSharedPtr rs_reader;
auto res = rowset->create_reader(&rs_reader);
- if (!res.ok()) {
+ if (!res) {
LOG(WARNING) << "failed to create rowset reader.";
return false;
}
- rs_readers.push_back(std::move(rs_reader));
+ rs_readers.push_back(rs_reader);
}
Merger::Statistics stats;
auto res = Merger::merge_rowsets(new_tablet, READER_ALTER_TABLE, rs_readers, rowset_writer,
&stats);
- if (!res.ok()) {
+ if (!res) {
LOG(WARNING) << "failed to merge rowsets. tablet=" << new_tablet->full_name()
<< ", version=" << rowset_writer->version().first << "-"
<< rowset_writer->version().second;
return false;
}
- add_merged_rows(stats.merged_rows);
- add_filtered_rows(stats.filtered_rows);
+ _add_merged_rows(stats.merged_rows);
+ _add_filtered_rows(stats.filtered_rows);
return true;
}
-SchemaChangeHandler::SchemaChangeHandler() {}
-
-SchemaChangeHandler::~SchemaChangeHandler() {}
-
Status SchemaChangeHandler::process_alter_tablet_v2(const TAlterTabletReqV2& request) {
LOG(INFO) << "begin to do request alter tablet: base_tablet_id=" << request.base_tablet_id
<< ", new_tablet_id=" << request.new_tablet_id
@@ -1377,6 +1300,9 @@ Status SchemaChangeHandler::process_alter_tablet_v2(const TAlterTabletReqV2& req
return res;
}
+std::shared_mutex SchemaChangeHandler::_mutex;
+std::unordered_set<int64_t> SchemaChangeHandler::_tablet_ids_in_converting;
+
// In the past schema change and rollup will create new tablet and will wait for txns starting before the task to finished
// It will cost a lot of time to wait and the task is very difficult to understand.
// In alter task v2, FE will call BE to create tablet and send an alter task to BE to convert historical data.
@@ -1457,12 +1383,13 @@ Status SchemaChangeHandler::_do_process_alter_tablet_v2(const TAlterTabletReqV2&
reader_context.seek_columns = &return_columns;
reader_context.sequence_id_idx = reader_context.tablet_schema->sequence_col_idx();
reader_context.is_unique = base_tablet->keys_type() == UNIQUE_KEYS;
+ reader_context.batch_size = ALTER_TABLE_BATCH_SIZE;
+ reader_context.is_vec = config::enable_vectorized_alter_table;
do {
RowsetSharedPtr max_rowset;
// get history data to be converted and it will check if there is hold in base tablet
- res = _get_versions_to_be_changed(base_tablet, &versions_to_be_changed, &max_rowset);
- if (!res.ok()) {
+ if (!_get_versions_to_be_changed(base_tablet, &versions_to_be_changed, &max_rowset)) {
LOG(WARNING) << "fail to get version to be changed. res=" << res;
break;
}
@@ -1514,27 +1441,14 @@ Status SchemaChangeHandler::_do_process_alter_tablet_v2(const TAlterTabletReqV2&
}
// init one delete handler
- int32_t end_version = -1;
+ int64_t end_version = -1;
for (auto& version : versions_to_be_changed) {
- if (version.second > end_version) {
- end_version = version.second;
- }
- }
-
- res = delete_handler.init(base_tablet->tablet_schema(),
- base_tablet->delete_predicates(), end_version);
- if (!res.ok()) {
- LOG(WARNING) << "init delete handler failed. base_tablet="
- << base_tablet->full_name() << ", end_version=" << end_version;
-
- // release delete handlers which have been inited successfully.
- delete_handler.finalize();
- break;
+ end_version = std::max(end_version, version.second);
}
// acquire data sources correspond to history versions
base_tablet->capture_rs_readers(versions_to_be_changed, &rs_readers);
- if (rs_readers.size() < 1) {
+ if (rs_readers.empty()) {
LOG(WARNING) << "fail to acquire all data sources. "
<< "version_num=" << versions_to_be_changed.size()
<< ", data_source_num=" << rs_readers.size();
@@ -1542,22 +1456,47 @@ Status SchemaChangeHandler::_do_process_alter_tablet_v2(const TAlterTabletReqV2&
break;
}
+ vectorized::BlockReader reader;
+ TabletReader::ReaderParams reader_params;
+ reader_params.tablet = base_tablet;
+ reader_params.reader_type = READER_ALTER_TABLE;
+ reader_params.rs_readers = rs_readers;
+ const auto& schema = base_tablet->tablet_schema();
+ reader_params.return_columns.resize(schema.num_columns());
+ std::iota(reader_params.return_columns.begin(), reader_params.return_columns.end(), 0);
+ reader_params.origin_return_columns = &reader_params.return_columns;
+ reader_params.version = {0, end_version};
+ // BlockReader::init will call base_tablet->get_header_lock(), but this lock we already get at outer layer, so we just call TabletReader::init
+ RETURN_NOT_OK(reader.TabletReader::init(reader_params));
+
+ res = delete_handler.init(base_tablet->tablet_schema(),
+ base_tablet->delete_predicates(), end_version, &reader);
+ if (!res) {
+ LOG(WARNING) << "init delete handler failed. base_tablet="
+ << base_tablet->full_name() << ", end_version=" << end_version;
+
+ // release delete handlers which have been inited successfully.
+ delete_handler.finalize();
+ break;
+ }
+
for (auto& rs_reader : rs_readers) {
res = rs_reader->init(&reader_context);
- if (!res.ok()) {
+ if (!res) {
LOG(WARNING) << "failed to init rowset reader: " << base_tablet->full_name();
break;
}
}
-
- } while (0);
+ } while (false);
}
do {
- if (!res.ok()) {
+ if (!res) {
break;
}
SchemaChangeParams sc_params;
+
+ DescriptorTbl::create(&sc_params.pool, request.desc_tbl, &sc_params.desc_tbl);
sc_params.base_tablet = base_tablet;
sc_params.new_tablet = new_tablet;
sc_params.ref_rowset_readers = rs_readers;
@@ -1588,6 +1527,7 @@ Status SchemaChangeHandler::_do_process_alter_tablet_v2(const TAlterTabletReqV2&
} else if (item.mv_expr.nodes[0].node_type == TExprNodeType::CASE_EXPR) {
mv_param.mv_expr = "count_field";
}
+ mv_param.expr = std::make_shared<TExpr>(item.mv_expr);
}
sc_params.materialized_params_map.insert(
std::make_pair(item.column_name, mv_param));
@@ -1602,26 +1542,26 @@ Status SchemaChangeHandler::_do_process_alter_tablet_v2(const TAlterTabletReqV2&
std::lock_guard<std::shared_mutex> wrlock(_mutex);
_tablet_ids_in_converting.erase(new_tablet->tablet_id());
}
- if (!res.ok()) {
+ if (!res) {
break;
}
// set state to ready
std::lock_guard<std::shared_mutex> new_wlock(new_tablet->get_header_lock());
res = new_tablet->set_tablet_state(TabletState::TABLET_RUNNING);
- if (!res.ok()) {
+ if (!res) {
break;
}
new_tablet->save_meta();
- } while (0);
+ } while (false);
- if (res.ok()) {
+ if (res) {
// _validate_alter_result should be outside the above while loop.
// to avoid requiring the header lock twice.
res = _validate_alter_result(new_tablet, request);
}
// if failed convert history data, then just remove the new tablet
- if (!res.ok()) {
+ if (!res) {
LOG(WARNING) << "failed to alter tablet. base_tablet=" << base_tablet->full_name()
<< ", drop new_tablet=" << new_tablet->full_name();
// do not drop the new tablet and its data. GC thread will
@@ -1638,7 +1578,8 @@ bool SchemaChangeHandler::tablet_in_converting(int64_t tablet_id) {
Status SchemaChangeHandler::schema_version_convert(TabletSharedPtr base_tablet,
TabletSharedPtr new_tablet,
RowsetSharedPtr* base_rowset,
- RowsetSharedPtr* new_rowset) {
+ RowsetSharedPtr* new_rowset,
+ DescriptorTbl desc_tbl) {
Status res = Status::OK();
LOG(INFO) << "begin to convert delta version for schema changing. "
<< "base_tablet=" << base_tablet->full_name()
@@ -1646,13 +1587,14 @@ Status SchemaChangeHandler::schema_version_convert(TabletSharedPtr base_tablet,
// a. Parse the Alter request and convert it into an internal representation
// Do not use the delete condition specified by the DELETE_DATA command
- RowBlockChanger rb_changer(new_tablet->tablet_schema());
+ RowBlockChanger rb_changer(new_tablet->tablet_schema(), desc_tbl);
bool sc_sorting = false;
bool sc_directly = false;
const std::unordered_map<std::string, AlterMaterializedViewParam> materialized_function_map;
- if (!(res = _parse_request(base_tablet, new_tablet, &rb_changer, &sc_sorting, &sc_directly,
- materialized_function_map))) {
+ if (res = _parse_request(base_tablet, new_tablet, &rb_changer, &sc_sorting, &sc_directly,
+ materialized_function_map, desc_tbl);
+ !res) {
LOG(WARNING) << "failed to parse the request. res=" << res;
return res;
}
@@ -1660,24 +1602,7 @@ Status SchemaChangeHandler::schema_version_convert(TabletSharedPtr base_tablet,
// NOTE split_table if row_block is used, the original block will become smaller
// But since the historical data will become normal after the subsequent base/cumulative, it is also possible to use directly
// b. Generate historical data converter
- SchemaChange* sc_procedure = nullptr;
- if (sc_sorting) {
- LOG(INFO) << "doing schema change with sorting for base_tablet "
- << base_tablet->full_name();
- sc_procedure = new (nothrow) SchemaChangeWithSorting(
- rb_changer, config::memory_limitation_per_thread_for_schema_change_bytes);
- } else if (sc_directly) {
- LOG(INFO) << "doing schema change directly for base_tablet " << base_tablet->full_name();
- sc_procedure = new (nothrow) SchemaChangeDirectly(rb_changer);
- } else {
- LOG(INFO) << "doing linked schema change for base_tablet " << base_tablet->full_name();
- sc_procedure = new (nothrow) LinkedSchemaChange(rb_changer);
- }
-
- if (sc_procedure == nullptr) {
- LOG(FATAL) << "failed to malloc SchemaChange. size=" << sizeof(SchemaChangeWithSorting);
- return Status::OLAPInternalError(OLAP_ERR_MALLOC_ERROR);
- }
+ auto sc_procedure = get_sc_procedure(rb_changer, sc_sorting, sc_directly);
// c. Convert data
DeleteHandler delete_handler;
@@ -1697,6 +1622,7 @@ Status SchemaChangeHandler::schema_version_convert(TabletSharedPtr base_tablet,
reader_context.seek_columns = &return_columns;
reader_context.sequence_id_idx = reader_context.tablet_schema->sequence_col_idx();
reader_context.is_unique = base_tablet->keys_type() == UNIQUE_KEYS;
+ reader_context.is_vec = config::enable_vectorized_alter_table;
RowsetReaderSharedPtr rowset_reader;
RETURN_NOT_OK((*base_rowset)->create_reader(&rowset_reader));
@@ -1709,8 +1635,19 @@ Status SchemaChangeHandler::schema_version_convert(TabletSharedPtr base_tablet,
(*base_rowset)->txn_id(), load_id, PREPARED,
(*base_rowset)->rowset_meta()->segments_overlap(), &rowset_writer));
- if ((res = sc_procedure->process(rowset_reader, rowset_writer.get(), new_tablet,
- base_tablet)) != Status::OK()) {
+ auto schema_version_convert_error = [&]() -> Status {
+ if (*new_rowset != nullptr) {
+ StorageEngine::instance()->add_unused_rowset(*new_rowset);
+ }
+
+ LOG(WARNING) << "failed to convert rowsets. "
+ << " base_tablet=" << base_tablet->full_name()
+ << ", new_tablet=" << new_tablet->full_name() << " res = " << res;
+ return res;
+ };
+
+ if (res = sc_procedure->process(rowset_reader, rowset_writer.get(), new_tablet, base_tablet);
+ !res) {
if ((*base_rowset)->is_pending()) {
LOG(WARNING) << "failed to process the transaction when schema change. "
<< "tablet=" << new_tablet->full_name() << "'"
@@ -1722,7 +1659,7 @@ Status SchemaChangeHandler::schema_version_convert(TabletSharedPtr base_tablet,
}
new_tablet->data_dir()->remove_pending_ids(ROWSET_ID_PREFIX +
rowset_writer->rowset_id().to_string());
- goto SCHEMA_VERSION_CONVERT_ERR;
+ return schema_version_convert_error();
}
*new_rowset = rowset_writer->build();
new_tablet->data_dir()->remove_pending_ids(ROWSET_ID_PREFIX +
@@ -1730,25 +1667,13 @@ Status SchemaChangeHandler::schema_version_convert(TabletSharedPtr base_tablet,
if (*new_rowset == nullptr) {
LOG(WARNING) << "build rowset failed.";
res = Status::OLAPInternalError(OLAP_ERR_MALLOC_ERROR);
- goto SCHEMA_VERSION_CONVERT_ERR;
+ return schema_version_convert_error();
}
- SAFE_DELETE(sc_procedure);
LOG(INFO) << "successfully convert rowsets. "
<< " base_tablet=" << base_tablet->full_name()
<< ", new_tablet=" << new_tablet->full_name();
return res;
-
-SCHEMA_VERSION_CONVERT_ERR:
- if (*new_rowset != nullptr) {
- StorageEngine::instance()->add_unused_rowset(*new_rowset);
- }
-
- SAFE_DELETE(sc_procedure);
- LOG(WARNING) << "failed to convert rowsets. "
- << " base_tablet=" << base_tablet->full_name()
- << ", new_tablet=" << new_tablet->full_name() << " res = " << res;
- return res;
}
Status SchemaChangeHandler::_get_versions_to_be_changed(
@@ -1782,42 +1707,41 @@ Status SchemaChangeHandler::_convert_historical_rowsets(const SchemaChangeParams
// Add filter information in change, and filter column information will be set in _parse_request
// And filter some data every time the row block changes
- RowBlockChanger rb_changer(sc_params.new_tablet->tablet_schema(), sc_params.delete_handler);
+ RowBlockChanger rb_changer(sc_params.new_tablet->tablet_schema(), sc_params.delete_handler,
+ *sc_params.desc_tbl);
bool sc_sorting = false;
bool sc_directly = false;
- SchemaChange* sc_procedure = nullptr;
// a.Parse the Alter request and convert it into an internal representation
- Status res = _parse_request(sc_params.base_tablet, sc_params.new_tablet, &rb_changer,
- &sc_sorting, &sc_directly, sc_params.materialized_params_map);
- if (!res.ok()) {
+ Status res =
+ _parse_request(sc_params.base_tablet, sc_params.new_tablet, &rb_changer, &sc_sorting,
+ &sc_directly, sc_params.materialized_params_map, *sc_params.desc_tbl);
+
+ auto process_alter_exit = [&]() -> Status {
+ {
+ // save tablet meta here because rowset meta is not saved during add rowset
+ std::lock_guard<std::shared_mutex> new_wlock(sc_params.new_tablet->get_header_lock());
+ sc_params.new_tablet->save_meta();
+ }
+ if (res) {
+ Version test_version(0, end_version);
+ res = sc_params.new_tablet->check_version_integrity(test_version);
+ }
+
+ LOG(INFO) << "finish converting rowsets for new_tablet from base_tablet. "
+ << "base_tablet=" << sc_params.base_tablet->full_name()
+ << ", new_tablet=" << sc_params.new_tablet->full_name();
+ return res;
+ };
+
+ if (!res) {
LOG(WARNING) << "failed to parse the request. res=" << res;
- goto PROCESS_ALTER_EXIT;
+ return process_alter_exit();
}
// b. Generate historical data converter
- if (sc_sorting) {
- LOG(INFO) << "doing schema change with sorting for base_tablet "
- << sc_params.base_tablet->full_name();
- sc_procedure = new (nothrow) SchemaChangeWithSorting(
- rb_changer, config::memory_limitation_per_thread_for_schema_change_bytes);
- } else if (sc_directly) {
- LOG(INFO) << "doing schema change directly for base_tablet "
- << sc_params.base_tablet->full_name();
- sc_procedure = new (nothrow) SchemaChangeDirectly(rb_changer);
- } else {
- LOG(INFO) << "doing linked schema change for base_tablet "
- << sc_params.base_tablet->full_name();
- sc_procedure = new (nothrow) LinkedSchemaChange(rb_changer);
- }
-
- if (sc_procedure == nullptr) {
- LOG(WARNING) << "failed to malloc SchemaChange. "
- << "malloc_size=" << sizeof(SchemaChangeWithSorting);
- res = Status::OLAPInternalError(OLAP_ERR_MALLOC_ERROR);
- goto PROCESS_ALTER_EXIT;
- }
+ auto sc_procedure = get_sc_procedure(rb_changer, sc_sorting, sc_directly);
// c.Convert historical data
for (auto& rs_reader : sc_params.ref_rowset_readers) {
@@ -1834,19 +1758,20 @@ Status SchemaChangeHandler::_convert_historical_rowsets(const SchemaChangeParams
Status status = new_tablet->create_rowset_writer(
rs_reader->version(), VISIBLE,
rs_reader->rowset()->rowset_meta()->segments_overlap(), &rowset_writer);
- if (!status.ok()) {
+ if (!status) {
res = Status::OLAPInternalError(OLAP_ERR_ROWSET_BUILDER_INIT);
- goto PROCESS_ALTER_EXIT;
+ return process_alter_exit();
}
- if ((res = sc_procedure->process(rs_reader, rowset_writer.get(), sc_params.new_tablet,
- sc_params.base_tablet)) != Status::OK()) {
+ if (res = sc_procedure->process(rs_reader, rowset_writer.get(), sc_params.new_tablet,
+ sc_params.base_tablet);
+ !res) {
LOG(WARNING) << "failed to process the version."
<< " version=" << rs_reader->version().first << "-"
<< rs_reader->version().second;
new_tablet->data_dir()->remove_pending_ids(ROWSET_ID_PREFIX +
rowset_writer->rowset_id().to_string());
- goto PROCESS_ALTER_EXIT;
+ return process_alter_exit();
}
new_tablet->data_dir()->remove_pending_ids(ROWSET_ID_PREFIX +
rowset_writer->rowset_id().to_string());
@@ -1856,7 +1781,7 @@ Status SchemaChangeHandler::_convert_historical_rowsets(const SchemaChangeParams
RowsetSharedPtr new_rowset = rowset_writer->build();
if (new_rowset == nullptr) {
LOG(WARNING) << "failed to build rowset, exit alter process";
- goto PROCESS_ALTER_EXIT;
+ return process_alter_exit();
}
res = sc_params.new_tablet->add_rowset(new_rowset);
if (res.precise_code() == OLAP_ERR_PUSH_VERSION_ALREADY_EXIST) {
@@ -1865,13 +1790,13 @@ Status SchemaChangeHandler::_convert_historical_rowsets(const SchemaChangeParams
<< rs_reader->version().first << "-" << rs_reader->version().second;
StorageEngine::instance()->add_unused_rowset(new_rowset);
res = Status::OK();
- } else if (!res.ok()) {
+ } else if (!res) {
LOG(WARNING) << "failed to register new version. "
<< " tablet=" << sc_params.new_tablet->full_name()
<< ", version=" << rs_reader->version().first << "-"
<< rs_reader->version().second;
StorageEngine::instance()->add_unused_rowset(new_rowset);
- goto PROCESS_ALTER_EXIT;
+ return process_alter_exit();
} else {
VLOG_NOTICE << "register new version. tablet=" << sc_params.new_tablet->full_name()
<< ", version=" << rs_reader->version().first << "-"
@@ -1882,22 +1807,9 @@ Status SchemaChangeHandler::_convert_historical_rowsets(const SchemaChangeParams
<< " version=" << rs_reader->version().first << "-"
<< rs_reader->version().second;
}
- // XXX:The SchemaChange state should not be canceled at this time, because the new Delta has to be converted to the old and new Schema version
-PROCESS_ALTER_EXIT : {
- // save tablet meta here because rowset meta is not saved during add rowset
- std::lock_guard<std::shared_mutex> new_wlock(sc_params.new_tablet->get_header_lock());
- sc_params.new_tablet->save_meta();
-}
- if (res.ok()) {
- Version test_version(0, end_version);
- res = sc_params.new_tablet->check_version_integrity(test_version);
- }
- SAFE_DELETE(sc_procedure);
- LOG(INFO) << "finish converting rowsets for new_tablet from base_tablet. "
- << "base_tablet=" << sc_params.base_tablet->full_name()
- << ", new_tablet=" << sc_params.new_tablet->full_name();
- return res;
+ // XXX:The SchemaChange state should not be canceled at this time, because the new Delta has to be converted to the old and new Schema version
+ return process_alter_exit();
}
// @static
@@ -1906,9 +1818,8 @@ Status SchemaChangeHandler::_parse_request(
TabletSharedPtr base_tablet, TabletSharedPtr new_tablet, RowBlockChanger* rb_changer,
bool* sc_sorting, bool* sc_directly,
const std::unordered_map<std::string, AlterMaterializedViewParam>&
- materialized_function_map) {
- Status res = Status::OK();
-
+ materialized_function_map,
+ DescriptorTbl desc_tbl) {
// set column mapping
for (int i = 0, new_schema_size = new_tablet->tablet_schema().num_columns();
i < new_schema_size; ++i) {
@@ -1933,11 +1844,10 @@ Status SchemaChangeHandler::_parse_request(
}
if (materialized_function_map.find(column_name) != materialized_function_map.end()) {
- AlterMaterializedViewParam mvParam =
- materialized_function_map.find(column_name)->second;
+ auto mvParam = materialized_function_map.find(column_name)->second;
column_mapping->materialized_function = mvParam.mv_expr;
- std::string origin_column_name = mvParam.origin_column_name;
- int32_t column_index = base_tablet->field_index(origin_column_name);
+ column_mapping->expr = mvParam.expr;
+ int32_t column_index = base_tablet->field_index(mvParam.origin_column_name);
if (column_index >= 0) {
column_mapping->ref_column = column_index;
continue;
@@ -1961,10 +1871,8 @@ Status SchemaChangeHandler::_parse_request(
if (i < base_tablet->num_short_key_columns()) {
*sc_directly = true;
}
- res = _init_column_mapping(column_mapping, new_column, new_column.default_value());
- if (!res) {
- return res;
- }
+ RETURN_IF_ERROR(
+ _init_column_mapping(column_mapping, new_column, new_column.default_value()));
VLOG_TRACE << "A column with default value will be added after schema changing. "
<< "column=" << column_name << ", default_value=" << new_column.default_value();
@@ -2000,7 +1908,7 @@ Status SchemaChangeHandler::_parse_request(
// If the sort of key has not been changed but the new keys num is less then base's,
// the new table should be re agg.
- // So we also need to set sc_sorting = true.
+ // So we also need to set sc_sorting = true.
// A, B, C are keys(sort keys), D is value
// followings need resort:
// old keys: A B C D
@@ -2025,23 +1933,12 @@ Status SchemaChangeHandler::_parse_request(
if (column_mapping->ref_column < 0) {
continue;
} else {
- if (new_tablet_schema.column(i).type() !=
- ref_tablet_schema.column(column_mapping->ref_column).type()) {
- *sc_directly = true;
- return Status::OK();
- } else if ((new_tablet_schema.column(i).type() ==
- ref_tablet_schema.column(column_mapping->ref_column).type()) &&
- (new_tablet_schema.column(i).length() !=
- ref_tablet_schema.column(column_mapping->ref_column).length())) {
- *sc_directly = true;
- return Status::OK();
-
- } else if (new_tablet_schema.column(i).is_bf_column() !=
- ref_tablet_schema.column(column_mapping->ref_column).is_bf_column()) {
- *sc_directly = true;
- return Status::OK();
- } else if (new_tablet_schema.column(i).has_bitmap_index() !=
- ref_tablet_schema.column(column_mapping->ref_column).has_bitmap_index()) {
+ auto column_new = new_tablet_schema.column(i);
+ auto column_old = ref_tablet_schema.column(column_mapping->ref_column);
+ if (column_new.type() != column_old.type() ||
+ column_new.length() != column_old.length() ||
+ column_new.is_bf_column() != column_old.is_bf_column() ||
+ column_new.has_bitmap_index() != column_old.has_bitmap_index()) {
*sc_directly = true;
return Status::OK();
}
@@ -2049,7 +1946,7 @@ Status SchemaChangeHandler::_parse_request(
}
if (base_tablet->delete_predicates().size() != 0) {
- //there exists delete condition in header, can't do linked schema change
+ // there exists delete condition in header, can't do linked schema change
*sc_directly = true;
}
diff --git a/be/src/olap/schema_change.h b/be/src/olap/schema_change.h
index 5c8757613d..2f820ae79e 100644
--- a/be/src/olap/schema_change.h
+++ b/be/src/olap/schema_change.h
@@ -17,29 +17,17 @@
#pragma once
-#include <deque>
-#include <functional>
-#include <queue>
-#include <utility>
-#include <vector>
-
+#include "common/status.h"
#include "gen_cpp/AgentService_types.h"
#include "olap/column_mapping.h"
#include "olap/delete_handler.h"
#include "olap/rowset/rowset.h"
#include "olap/rowset/rowset_writer.h"
#include "olap/tablet.h"
+#include "vec/columns/column.h"
+#include "vec/core/block.h"
namespace doris {
-// defined in 'field.h'
-class Field;
-class FieldInfo;
-// defined in 'tablet.h'
-class Tablet;
-// defined in 'row_block.h'
-class RowBlock;
-// defined in 'row_cursor.h'
-class RowCursor;
bool to_bitmap(RowCursor* read_helper, RowCursor* write_helper, const TabletColumn& ref_column,
int field_idx, int ref_field_idx, MemPool* mem_pool);
@@ -50,11 +38,12 @@ bool count_field(RowCursor* read_helper, RowCursor* write_helper, const TabletCo
class RowBlockChanger {
public:
- RowBlockChanger(const TabletSchema& tablet_schema, const DeleteHandler* delete_handler);
+ RowBlockChanger(const TabletSchema& tablet_schema, const DeleteHandler* delete_handler,
+ DescriptorTbl desc_tbl);
- RowBlockChanger(const TabletSchema& tablet_schema);
+ RowBlockChanger(const TabletSchema& tablet_schema, DescriptorTbl desc_tbl);
- virtual ~RowBlockChanger();
+ ~RowBlockChanger();
ColumnMapping* get_mutable_column_mapping(size_t column_index);
@@ -70,6 +59,8 @@ private:
// delete handler for filtering data which use specified in DELETE_DATA
const DeleteHandler* _delete_handler = nullptr;
+ DescriptorTbl _desc_tbl;
+
DISALLOW_COPY_AND_ASSIGN(RowBlockChanger);
};
@@ -94,20 +85,60 @@ public:
SchemaChange() : _filtered_rows(0), _merged_rows(0) {}
virtual ~SchemaChange() = default;
- virtual Status process(RowsetReaderSharedPtr rowset_reader, RowsetWriter* new_rowset_builder,
- TabletSharedPtr tablet, TabletSharedPtr base_tablet) = 0;
-
- void add_filtered_rows(uint64_t filtered_rows) { _filtered_rows += filtered_rows; }
-
- void add_merged_rows(uint64_t merged_rows) { _merged_rows += merged_rows; }
+ virtual Status process(RowsetReaderSharedPtr rowset_reader, RowsetWriter* rowset_writer,
+ TabletSharedPtr new_tablet, TabletSharedPtr base_tablet) {
+ if (rowset_reader->rowset()->empty() || rowset_reader->rowset()->num_rows() == 0) {
+ RETURN_WITH_WARN_IF_ERROR(
+ rowset_writer->flush(),
+ Status::OLAPInternalError(OLAP_ERR_INPUT_PARAMETER_ERROR),
+ fmt::format("create empty version for schema change failed. version= {}-{}",
+ rowset_writer->version().first, rowset_writer->version().second));
+
+ return Status::OK();
+ }
+
+ _filtered_rows = 0;
+ _merged_rows = 0;
+
+ RETURN_IF_ERROR(_inner_process(rowset_reader, rowset_writer, new_tablet, base_tablet));
+ _add_filtered_rows(rowset_reader->filtered_rows());
+
+ // Check row num changes
+ if (config::row_nums_check && !_check_row_nums(rowset_reader, *rowset_writer)) {
+ return Status::OLAPInternalError(OLAP_ERR_ALTER_STATUS_ERR);
+ }
+
+ LOG(INFO) << "all row nums. source_rows=" << rowset_reader->rowset()->num_rows()
+ << ", merged_rows=" << merged_rows() << ", filtered_rows=" << filtered_rows()
+ << ", new_index_rows=" << rowset_writer->num_rows();
+ return Status::OK();
+ }
uint64_t filtered_rows() const { return _filtered_rows; }
uint64_t merged_rows() const { return _merged_rows; }
- void reset_filtered_rows() { _filtered_rows = 0; }
+protected:
+ void _add_filtered_rows(uint64_t filtered_rows) { _filtered_rows += filtered_rows; }
- void reset_merged_rows() { _merged_rows = 0; }
+ void _add_merged_rows(uint64_t merged_rows) { _merged_rows += merged_rows; }
+
+ virtual Status _inner_process(RowsetReaderSharedPtr rowset_reader, RowsetWriter* rowset_writer,
+ TabletSharedPtr new_tablet, TabletSharedPtr base_tablet) {
+ return Status::NotSupported("inner process unsupported.");
+ };
+
+ bool _check_row_nums(RowsetReaderSharedPtr reader, const RowsetWriter& writer) const {
+ if (reader->rowset()->num_rows() != writer.num_rows() + _merged_rows + _filtered_rows) {
+ LOG(WARNING) << "fail to check row num! "
+ << "source_rows=" << reader->rowset()->num_rows()
+ << ", merged_rows=" << merged_rows()
+ << ", filtered_rows=" << filtered_rows()
+ << ", new_index_rows=" << writer.num_rows();
+ return false;
+ }
+ return true;
+ }
private:
uint64_t _filtered_rows;
@@ -117,11 +148,11 @@ private:
class LinkedSchemaChange : public SchemaChange {
public:
explicit LinkedSchemaChange(const RowBlockChanger& row_block_changer)
- : SchemaChange(), _row_block_changer(row_block_changer) {}
- ~LinkedSchemaChange() {}
+ : _row_block_changer(row_block_changer) {}
+ ~LinkedSchemaChange() override = default;
- virtual Status process(RowsetReaderSharedPtr rowset_reader, RowsetWriter* new_rowset_writer,
- TabletSharedPtr new_tablet, TabletSharedPtr base_tablet) override;
+ Status process(RowsetReaderSharedPtr rowset_reader, RowsetWriter* rowset_writer,
+ TabletSharedPtr new_tablet, TabletSharedPtr base_tablet) override;
private:
const RowBlockChanger& _row_block_changer;
@@ -134,12 +165,12 @@ public:
// @params tablet the instance of tablet which has new schema.
// @params row_block_changer changer to modify the data of RowBlock
explicit SchemaChangeDirectly(const RowBlockChanger& row_block_changer);
- virtual ~SchemaChangeDirectly();
-
- virtual Status process(RowsetReaderSharedPtr rowset_reader, RowsetWriter* new_rowset_writer,
- TabletSharedPtr new_tablet, TabletSharedPtr base_tablet) override;
+ ~SchemaChangeDirectly() override;
private:
+ Status _inner_process(RowsetReaderSharedPtr rowset_reader, RowsetWriter* rowset_writer,
+ TabletSharedPtr new_tablet, TabletSharedPtr base_tablet) override;
+
const RowBlockChanger& _row_block_changer;
RowBlockAllocator* _row_block_allocator;
RowCursor* _cursor;
@@ -154,12 +185,12 @@ class SchemaChangeWithSorting : public SchemaChange {
public:
explicit SchemaChangeWithSorting(const RowBlockChanger& row_block_changer,
size_t memory_limitation);
- virtual ~SchemaChangeWithSorting();
-
- virtual Status process(RowsetReaderSharedPtr rowset_reader, RowsetWriter* new_rowset_builder,
- TabletSharedPtr new_tablet, TabletSharedPtr base_tablet) override;
+ ~SchemaChangeWithSorting() override;
private:
+ Status _inner_process(RowsetReaderSharedPtr rowset_reader, RowsetWriter* rowset_writer,
+ TabletSharedPtr new_tablet, TabletSharedPtr base_tablet) override;
+
bool _internal_sorting(const std::vector<RowBlock*>& row_block_arr,
const Version& temp_delta_versions, TabletSharedPtr new_tablet,
SegmentsOverlapPB segments_overlap, RowsetSharedPtr* rowset);
@@ -177,18 +208,26 @@ private:
class SchemaChangeHandler {
public:
- static SchemaChangeHandler* instance() {
- static SchemaChangeHandler instance;
- return &instance;
- }
-
- Status schema_version_convert(TabletSharedPtr base_tablet, TabletSharedPtr new_tablet,
- RowsetSharedPtr* base_rowset, RowsetSharedPtr* new_rowset);
+ static Status schema_version_convert(TabletSharedPtr base_tablet, TabletSharedPtr new_tablet,
+ RowsetSharedPtr* base_rowset, RowsetSharedPtr* new_rowset,
+ DescriptorTbl desc_tbl);
// schema change v2, it will not set alter task in base tablet
- Status process_alter_tablet_v2(const TAlterTabletReqV2& request);
+ static Status process_alter_tablet_v2(const TAlterTabletReqV2& request);
+
+ static std::unique_ptr<SchemaChange> get_sc_procedure(const RowBlockChanger& rb_changer,
+ bool sc_sorting, bool sc_directly) {
+ if (sc_sorting) {
+ return std::make_unique<SchemaChangeWithSorting>(
+ rb_changer, config::memory_limitation_per_thread_for_schema_change_bytes);
+ }
+ if (sc_directly) {
+ return std::make_unique<SchemaChangeDirectly>(rb_changer);
+ }
+ return std::make_unique<LinkedSchemaChange>(rb_changer);
+ }
- bool tablet_in_converting(int64_t tablet_id);
+ static bool tablet_in_converting(int64_t tablet_id);
private:
// Check the status of schema change and clear information between "a pair" of Schema change tables
@@ -196,17 +235,18 @@ private:
// Returns:
// Success: If there is historical information, then clear it if there is no problem; or no historical information
// Failure: otherwise, if there is history information and it cannot be emptied (version has not been completed)
- Status _check_and_clear_schema_change_info(TabletSharedPtr tablet,
- const TAlterTabletReq& request);
+ static Status _check_and_clear_schema_change_info(TabletSharedPtr tablet,
+ const TAlterTabletReq& request);
- Status _get_versions_to_be_changed(TabletSharedPtr base_tablet,
- std::vector<Version>* versions_to_be_changed,
- RowsetSharedPtr* max_rowset);
+ static Status _get_versions_to_be_changed(TabletSharedPtr base_tablet,
+ std::vector<Version>* versions_to_be_changed,
+ RowsetSharedPtr* max_rowset);
struct AlterMaterializedViewParam {
std::string column_name;
std::string origin_column_name;
std::string mv_expr;
+ std::shared_ptr<TExpr> expr;
};
struct SchemaChangeParams {
@@ -216,31 +256,29 @@ private:
std::vector<RowsetReaderSharedPtr> ref_rowset_readers;
DeleteHandler* delete_handler = nullptr;
std::unordered_map<std::string, AlterMaterializedViewParam> materialized_params_map;
+ DescriptorTbl* desc_tbl = nullptr;
+ ObjectPool pool;
};
- Status _do_process_alter_tablet_v2(const TAlterTabletReqV2& request);
+ static Status _do_process_alter_tablet_v2(const TAlterTabletReqV2& request);
- Status _validate_alter_result(TabletSharedPtr new_tablet, const TAlterTabletReqV2& request);
+ static Status _validate_alter_result(TabletSharedPtr new_tablet,
+ const TAlterTabletReqV2& request);
- Status _convert_historical_rowsets(const SchemaChangeParams& sc_params);
+ static Status _convert_historical_rowsets(const SchemaChangeParams& sc_params);
static Status _parse_request(TabletSharedPtr base_tablet, TabletSharedPtr new_tablet,
RowBlockChanger* rb_changer, bool* sc_sorting, bool* sc_directly,
const std::unordered_map<std::string, AlterMaterializedViewParam>&
- materialized_function_map);
+ materialized_function_map,
+ DescriptorTbl desc_tbl);
// Initialization Settings for creating a default value
static Status _init_column_mapping(ColumnMapping* column_mapping,
const TabletColumn& column_schema, const std::string& value);
-private:
- SchemaChangeHandler();
- virtual ~SchemaChangeHandler();
- SchemaChangeHandler(const SchemaChangeHandler&) = delete;
- SchemaChangeHandler& operator=(const SchemaChangeHandler&) = delete;
-
- std::shared_mutex _mutex;
- std::unordered_set<int64_t> _tablet_ids_in_converting;
+ static std::shared_mutex _mutex;
+ static std::unordered_set<int64_t> _tablet_ids_in_converting;
};
using RowBlockDeleter = std::function<void(RowBlock*)>;
diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp
index 85d8f7e4ad..771069e4ce 100644
--- a/be/src/olap/tablet.cpp
+++ b/be/src/olap/tablet.cpp
@@ -732,8 +732,7 @@ bool Tablet::can_do_compaction(size_t path_hash, CompactionType compaction_type)
// Before doing schema change, tablet's rowsets that versions smaller than max converting version will be
// removed. So, we only need to do the compaction when it is being converted.
// After being converted, tablet's state will be changed to TABLET_RUNNING.
- auto schema_change_handler = SchemaChangeHandler::instance();
- return schema_change_handler->tablet_in_converting(tablet_id());
+ return SchemaChangeHandler::tablet_in_converting(tablet_id());
}
return true;
diff --git a/be/src/olap/task/engine_alter_tablet_task.cpp b/be/src/olap/task/engine_alter_tablet_task.cpp
index 24496822d3..55ba6ab6e9 100644
--- a/be/src/olap/task/engine_alter_tablet_task.cpp
+++ b/be/src/olap/task/engine_alter_tablet_task.cpp
@@ -23,8 +23,6 @@
namespace doris {
-using std::to_string;
-
EngineAlterTabletTask::EngineAlterTabletTask(const TAlterTabletReqV2& request)
: _alter_tablet_req(request) {
_mem_tracker = MemTracker::create_tracker(
@@ -39,8 +37,7 @@ Status EngineAlterTabletTask::execute() {
SCOPED_ATTACH_TASK_THREAD(ThreadContext::TaskType::STORAGE, _mem_tracker);
DorisMetrics::instance()->create_rollup_requests_total->increment(1);
- auto schema_change_handler = SchemaChangeHandler::instance();
- Status res = schema_change_handler->process_alter_tablet_v2(_alter_tablet_req);
+ Status res = SchemaChangeHandler::process_alter_tablet_v2(_alter_tablet_req);
if (!res.ok()) {
LOG(WARNING) << "failed to do alter task. res=" << res
@@ -53,8 +50,8 @@ Status EngineAlterTabletTask::execute() {
}
LOG(INFO) << "success to create new alter tablet. res=" << res
- << " base_tablet_id=" << _alter_tablet_req.base_tablet_id << ", base_schema_hash"
- << _alter_tablet_req.base_schema_hash
+ << " base_tablet_id=" << _alter_tablet_req.base_tablet_id
+ << ", base_schema_hash=" << _alter_tablet_req.base_schema_hash
<< ", new_tablet_id=" << _alter_tablet_req.new_tablet_id
<< ", new_schema_hash=" << _alter_tablet_req.new_schema_hash;
return res;
diff --git a/be/src/olap/task/engine_alter_tablet_task.h b/be/src/olap/task/engine_alter_tablet_task.h
index 1a2c0b3efa..7cc97395f1 100644
--- a/be/src/olap/task/engine_alter_tablet_task.h
+++ b/be/src/olap/task/engine_alter_tablet_task.h
@@ -15,8 +15,7 @@
// specific language governing permissions and limitations
// under the License.
-#ifndef DORIS_BE_SRC_OLAP_TASK_ENGINE_ALTER_TABLET_TASK_H
-#define DORIS_BE_SRC_OLAP_TASK_ENGINE_ALTER_TABLET_TASK_H
+#pragma once
#include "gen_cpp/AgentService_types.h"
#include "olap/olap_define.h"
@@ -28,11 +27,11 @@ namespace doris {
// add "Engine" as task prefix to prevent duplicate name with agent task
class EngineAlterTabletTask : public EngineTask {
public:
- virtual Status execute();
+ Status execute() override;
public:
EngineAlterTabletTask(const TAlterTabletReqV2& alter_tablet_request);
- ~EngineAlterTabletTask() {}
+ ~EngineAlterTabletTask() = default;
private:
const TAlterTabletReqV2& _alter_tablet_req;
@@ -41,4 +40,3 @@ private:
}; // EngineTask
} // namespace doris
-#endif //DORIS_BE_SRC_OLAP_TASK_ENGINE_ALTER_TABLET_TASK_H
\ No newline at end of file
diff --git a/be/src/olap/task/engine_storage_migration_task_v2.cpp b/be/src/olap/task/engine_storage_migration_task_v2.cpp
index fe00536662..118213b657 100644
--- a/be/src/olap/task/engine_storage_migration_task_v2.cpp
+++ b/be/src/olap/task/engine_storage_migration_task_v2.cpp
@@ -50,8 +50,8 @@ Status EngineStorageMigrationTaskV2::execute() {
}
LOG(INFO) << "success to create new storage migration v2. res=" << res
- << " base_tablet_id=" << _storage_migration_req.base_tablet_id << ", base_schema_hash"
- << _storage_migration_req.base_schema_hash
+ << " base_tablet_id=" << _storage_migration_req.base_tablet_id
+ << ", base_schema_hash=" << _storage_migration_req.base_schema_hash
<< ", new_tablet_id=" << _storage_migration_req.new_tablet_id
<< ", new_schema_hash=" << _storage_migration_req.new_schema_hash;
return res;
diff --git a/be/src/olap/tuple_reader.cpp b/be/src/olap/tuple_reader.cpp
index c7a9a2188c..b7a50b6808 100644
--- a/be/src/olap/tuple_reader.cpp
+++ b/be/src/olap/tuple_reader.cpp
@@ -23,14 +23,10 @@
#include <unordered_set>
#include "olap/collect_iterator.h"
+#include "olap/olap_common.h"
#include "olap/row.h"
-#include "olap/row_block.h"
#include "olap/row_cursor.h"
-#include "olap/rowset/beta_rowset_reader.h"
-#include "olap/schema.h"
-#include "olap/storage_engine.h"
#include "runtime/mem_pool.h"
-#include "util/date_func.h"
using std::nothrow;
using std::set;
diff --git a/be/src/olap/tuple_reader.h b/be/src/olap/tuple_reader.h
index 844135a327..7045393e1b 100644
--- a/be/src/olap/tuple_reader.h
+++ b/be/src/olap/tuple_reader.h
@@ -20,26 +20,11 @@
#include <gen_cpp/PaloInternalService_types.h>
#include <thrift/protocol/TDebugProtocol.h>
-#include <list>
-#include <memory>
-#include <queue>
-#include <sstream>
-#include <stack>
-#include <string>
-#include <utility>
-#include <vector>
-
-#include "exprs/bloomfilter_predicate.h"
#include "olap/collect_iterator.h"
-#include "olap/column_predicate.h"
#include "olap/delete_handler.h"
-#include "olap/olap_cond.h"
-#include "olap/olap_define.h"
#include "olap/reader.h"
#include "olap/row_cursor.h"
#include "olap/rowset/rowset_reader.h"
-#include "olap/tablet.h"
-#include "util/runtime_profile.h"
namespace doris {
diff --git a/be/src/runtime/descriptors.cpp b/be/src/runtime/descriptors.cpp
index aa762048b7..4b81fc2cfe 100644
--- a/be/src/runtime/descriptors.cpp
+++ b/be/src/runtime/descriptors.cpp
@@ -319,7 +319,9 @@ std::string TupleDescriptor::debug_string() const {
RowDescriptor::RowDescriptor(const DescriptorTbl& desc_tbl, const std::vector<TTupleId>& row_tuples,
const std::vector<bool>& nullable_tuples)
: _tuple_idx_nullable_map(nullable_tuples) {
- DCHECK(nullable_tuples.size() == row_tuples.size());
+ DCHECK(nullable_tuples.size() == row_tuples.size())
+ << "nullable_tuples size " << nullable_tuples.size() << " != row_tuples size "
+ << row_tuples.size();
DCHECK_GT(row_tuples.size(), 0);
_num_materialized_slots = 0;
_num_null_slots = 0;
@@ -570,6 +572,7 @@ Status DescriptorTbl::create(ObjectPool* pool, const TDescriptorTable& thrift_tb
}
(*tbl)->_tuple_desc_map[tdesc.id] = desc;
+ (*tbl)->_row_tuples.emplace_back(tdesc.id);
}
for (size_t i = 0; i < thrift_tbl.slotDescriptors.size(); ++i) {
@@ -622,16 +625,6 @@ SlotDescriptor* DescriptorTbl::get_slot_descriptor(SlotId id) const {
}
}
-// return all registered tuple descriptors
-void DescriptorTbl::get_tuple_descs(std::vector<TupleDescriptor*>* descs) const {
- descs->clear();
-
- for (TupleDescriptorMap::const_iterator i = _tuple_desc_map.begin(); i != _tuple_desc_map.end();
- ++i) {
- descs->push_back(i->second);
- }
-}
-
bool SlotDescriptor::layout_equals(const SlotDescriptor& other_desc) const {
if (type().type != other_desc.type().type) return false;
if (is_nullable() != other_desc.is_nullable()) return false;
diff --git a/be/src/runtime/descriptors.h b/be/src/runtime/descriptors.h
index a806a537c5..ee18f8a450 100644
--- a/be/src/runtime/descriptors.h
+++ b/be/src/runtime/descriptors.h
@@ -149,7 +149,7 @@ private:
class TableDescriptor {
public:
TableDescriptor(const TTableDescriptor& tdesc);
- virtual ~TableDescriptor() {}
+ virtual ~TableDescriptor() = default;
int num_cols() const { return _num_cols; }
int num_clustering_cols() const { return _num_clustering_cols; }
virtual std::string debug_string() const;
@@ -173,14 +173,14 @@ private:
class OlapTableDescriptor : public TableDescriptor {
public:
OlapTableDescriptor(const TTableDescriptor& tdesc);
- virtual std::string debug_string() const;
+ std::string debug_string() const override;
};
class SchemaTableDescriptor : public TableDescriptor {
public:
SchemaTableDescriptor(const TTableDescriptor& tdesc);
- virtual ~SchemaTableDescriptor();
- virtual std::string debug_string() const;
+ ~SchemaTableDescriptor() override;
+ std::string debug_string() const override;
TSchemaTableType::type schema_table_type() const { return _schema_table_type; }
private:
@@ -190,8 +190,8 @@ private:
class BrokerTableDescriptor : public TableDescriptor {
public:
BrokerTableDescriptor(const TTableDescriptor& tdesc);
- virtual ~BrokerTableDescriptor();
- virtual std::string debug_string() const;
+ ~BrokerTableDescriptor() override;
+ std::string debug_string() const override;
private:
};
@@ -199,8 +199,8 @@ private:
class HiveTableDescriptor : public TableDescriptor {
public:
HiveTableDescriptor(const TTableDescriptor& tdesc);
- virtual ~HiveTableDescriptor();
- virtual std::string debug_string() const;
+ ~HiveTableDescriptor() override;
+ std::string debug_string() const override;
private:
};
@@ -208,8 +208,8 @@ private:
class IcebergTableDescriptor : public TableDescriptor {
public:
IcebergTableDescriptor(const TTableDescriptor& tdesc);
- virtual ~IcebergTableDescriptor();
- virtual std::string debug_string() const;
+ ~IcebergTableDescriptor() override;
+ std::string debug_string() const override;
private:
};
@@ -217,8 +217,8 @@ private:
class EsTableDescriptor : public TableDescriptor {
public:
EsTableDescriptor(const TTableDescriptor& tdesc);
- virtual ~EsTableDescriptor();
- virtual std::string debug_string() const;
+ ~EsTableDescriptor() override;
+ std::string debug_string() const override;
private:
};
@@ -226,7 +226,7 @@ private:
class MySQLTableDescriptor : public TableDescriptor {
public:
MySQLTableDescriptor(const TTableDescriptor& tdesc);
- virtual std::string debug_string() const;
+ std::string debug_string() const override;
const std::string mysql_db() const { return _mysql_db; }
const std::string mysql_table() const { return _mysql_table; }
const std::string host() const { return _host; }
@@ -248,7 +248,7 @@ private:
class ODBCTableDescriptor : public TableDescriptor {
public:
ODBCTableDescriptor(const TTableDescriptor& tdesc);
- virtual std::string debug_string() const;
+ std::string debug_string() const override;
const std::string db() const { return _db; }
const std::string table() const { return _table; }
const std::string host() const { return _host; }
@@ -348,22 +348,32 @@ public:
TableDescriptor* get_table_descriptor(TableId id) const;
TupleDescriptor* get_tuple_descriptor(TupleId id) const;
SlotDescriptor* get_slot_descriptor(SlotId id) const;
+ const std::vector<TTupleId>& get_row_tuples() const { return _row_tuples; }
// return all registered tuple descriptors
- void get_tuple_descs(std::vector<TupleDescriptor*>* descs) const;
+ std::vector<TupleDescriptor*> get_tuple_descs() const {
+ std::vector<TupleDescriptor*> descs;
+
+ for (auto it : _tuple_desc_map) {
+ descs.push_back(it.second);
+ }
+
+ return descs;
+ }
std::string debug_string() const;
private:
- typedef std::unordered_map<TableId, TableDescriptor*> TableDescriptorMap;
- typedef std::unordered_map<TupleId, TupleDescriptor*> TupleDescriptorMap;
- typedef std::unordered_map<SlotId, SlotDescriptor*> SlotDescriptorMap;
+ using TableDescriptorMap = std::unordered_map<TableId, TableDescriptor*>;
+ using TupleDescriptorMap = std::unordered_map<TupleId, TupleDescriptor*>;
+ using SlotDescriptorMap = std::unordered_map<SlotId, SlotDescriptor*>;
TableDescriptorMap _tbl_desc_map;
TupleDescriptorMap _tuple_desc_map;
SlotDescriptorMap _slot_desc_map;
+ std::vector<TTupleId> _row_tuples;
- DescriptorTbl() : _tbl_desc_map(), _tuple_desc_map(), _slot_desc_map() {}
+ DescriptorTbl() = default;
};
// Records positions of tuples within row produced by ExecNode.
@@ -378,6 +388,11 @@ public:
RowDescriptor(const DescriptorTbl& desc_tbl, const std::vector<TTupleId>& row_tuples,
const std::vector<bool>& nullable_tuples);
+ static RowDescriptor create_default(const DescriptorTbl& desc_tbl,
+ const std::vector<bool>& nullable_tuples) {
+ return RowDescriptor(desc_tbl, desc_tbl.get_row_tuples(), nullable_tuples);
+ }
+
// standard copy c'tor, made explicit here
RowDescriptor(const RowDescriptor& desc)
: _tuple_desc_map(desc._tuple_desc_map),
@@ -399,7 +414,7 @@ public:
RowDescriptor(const RowDescriptor& lhs_row_desc, const RowDescriptor& rhs_row_desc);
// dummy descriptor, needed for the JNI EvalPredicate() function
- RowDescriptor() {}
+ RowDescriptor() = default;
// Returns total size in bytes.
// TODO: also take avg string lengths into account, ie, change this
diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp
index 482b56d378..6bbaabd1ea 100644
--- a/be/src/runtime/runtime_state.cpp
+++ b/be/src/runtime/runtime_state.cpp
@@ -130,6 +130,20 @@ RuntimeState::RuntimeState(const TQueryGlobals& query_globals)
TimezoneUtils::find_cctz_time_zone(_timezone, _timezone_obj);
}
+RuntimeState::RuntimeState()
+ : _profile("<unnamed>"),
+ _obj_pool(new ObjectPool()),
+ _data_stream_recvrs_pool(new ObjectPool()),
+ _unreported_error_idx(0),
+ _is_cancelled(false),
+ _per_fragment_instance_idx(0) {
+ _query_options.batch_size = DEFAULT_BATCH_SIZE;
+ _timezone = TimezoneUtils::default_time_zone;
+ _timestamp_ms = 0;
+ TimezoneUtils::find_cctz_time_zone(_timezone, _timezone_obj);
+ _exec_env = ExecEnv::GetInstance();
+}
+
RuntimeState::~RuntimeState() {
_block_mgr2.reset();
// close error log file
diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h
index eed32d8b82..ba07ecbf38 100644
--- a/be/src/runtime/runtime_state.h
+++ b/be/src/runtime/runtime_state.h
@@ -75,6 +75,9 @@ public:
// RuntimeState for executing expr in fe-support.
RuntimeState(const TQueryGlobals& query_globals);
+ // for job task only
+ RuntimeState();
+
// Empty d'tor to avoid issues with unique_ptr.
~RuntimeState();
@@ -105,7 +108,7 @@ public:
std::shared_ptr<ObjectPool> obj_pool_ptr() const { return _obj_pool; }
const DescriptorTbl& desc_tbl() const { return *_desc_tbl; }
- void set_desc_tbl(DescriptorTbl* desc_tbl) { _desc_tbl = desc_tbl; }
+ void set_desc_tbl(const DescriptorTbl* desc_tbl) { _desc_tbl = desc_tbl; }
int batch_size() const { return _query_options.batch_size; }
bool abort_on_error() const { return _query_options.abort_on_error; }
bool abort_on_default_limit_exceeded() const {
@@ -188,7 +191,7 @@ public:
int64_t backend_id() const { return _backend_id; }
void set_be_number(int be_number) { _be_number = be_number; }
- int be_number(void) { return _be_number; }
+ int be_number(void) const { return _be_number; }
// Sets _process_status with err_msg if no error has been set yet.
void set_process_status(const std::string& err_msg) {
@@ -239,7 +242,7 @@ public:
void set_load_job_id(int64_t job_id) { _load_job_id = job_id; }
- const int64_t load_job_id() { return _load_job_id; }
+ const int64_t load_job_id() const { return _load_job_id; }
// we only initialize object for load jobs
void set_load_error_hub_info(const TLoadErrorHubInfo& hub_info) {
@@ -312,17 +315,21 @@ public:
ReservationTracker* instance_buffer_reservation() { return _instance_buffer_reservation.get(); }
- int64_t min_reservation() { return _query_options.min_reservation; }
+ int64_t min_reservation() const { return _query_options.min_reservation; }
- int64_t max_reservation() { return _query_options.max_reservation; }
+ int64_t max_reservation() const { return _query_options.max_reservation; }
- bool disable_stream_preaggregations() { return _query_options.disable_stream_preaggregations; }
+ bool disable_stream_preaggregations() const {
+ return _query_options.disable_stream_preaggregations;
+ }
bool enable_spill() const { return _query_options.enable_spilling; }
- int32_t runtime_filter_wait_time_ms() { return _query_options.runtime_filter_wait_time_ms; }
+ int32_t runtime_filter_wait_time_ms() const {
+ return _query_options.runtime_filter_wait_time_ms;
+ }
- int32_t runtime_filter_max_in_num() { return _query_options.runtime_filter_max_in_num; }
+ int32_t runtime_filter_max_in_num() const { return _query_options.runtime_filter_max_in_num; }
bool enable_vectorized_exec() const { return _query_options.enable_vectorized_engine; }
@@ -387,7 +394,7 @@ private:
// _obj_pool. Because some of object in _obj_pool will use profile when deconstructing.
RuntimeProfile _profile;
- DescriptorTbl* _desc_tbl;
+ const DescriptorTbl* _desc_tbl;
std::shared_ptr<ObjectPool> _obj_pool;
// runtime filter
diff --git a/be/src/runtime/thread_mem_tracker_mgr.h b/be/src/runtime/thread_mem_tracker_mgr.h
index b476715612..e5b82fc939 100644
--- a/be/src/runtime/thread_mem_tracker_mgr.h
+++ b/be/src/runtime/thread_mem_tracker_mgr.h
@@ -24,8 +24,7 @@
namespace doris {
-typedef void (*ERRCALLBACK)();
-
+using ERRCALLBACK = void (*)();
struct ConsumeErrCallBackInfo {
std::string cancel_msg;
bool cancel_task; // Whether to cancel the task when the current tracker exceeds the limit.
diff --git a/be/src/vec/columns/column.h b/be/src/vec/columns/column.h
index ea8b952f8c..b0b6231cbb 100644
--- a/be/src/vec/columns/column.h
+++ b/be/src/vec/columns/column.h
@@ -166,7 +166,9 @@ public:
/// Appends one element from other column with the same type multiple times.
virtual void insert_many_from(const IColumn& src, size_t position, size_t length) {
- for (size_t i = 0; i < length; ++i) insert_from(src, position);
+ for (size_t i = 0; i < length; ++i) {
+ insert_from(src, position);
+ }
}
/// Appends a batch elements from other column with the same type
@@ -199,6 +201,12 @@ public:
LOG(FATAL) << "Method insert_many_binary_data is not supported for " << get_name();
}
+ void insert_many_data(const char* pos, size_t length, size_t data_num) {
+ for (size_t i = 0; i < data_num; ++i) {
+ insert_data(pos, length);
+ }
+ }
+
/// Appends "default value".
/// Is used when there are need to increase column size, but inserting value doesn't make sense.
/// For example, ColumnNullable(Nested) absolutely ignores values of nested column if it is marked as NULL.
@@ -206,7 +214,9 @@ public:
/// Appends "default value" multiple times.
virtual void insert_many_defaults(size_t length) {
- for (size_t i = 0; i < length; ++i) insert_default();
+ for (size_t i = 0; i < length; ++i) {
+ insert_default();
+ }
}
virtual void insert_elements(void* elements, size_t num) {
diff --git a/be/src/vec/olap/block_reader.cpp b/be/src/vec/olap/block_reader.cpp
index aeeb84c679..f929da979d 100644
--- a/be/src/vec/olap/block_reader.cpp
+++ b/be/src/vec/olap/block_reader.cpp
@@ -17,12 +17,9 @@
#include "vec/olap/block_reader.h"
-#include "olap/row_block.h"
-#include "olap/rowset/beta_rowset_reader.h"
-#include "olap/schema.h"
-#include "olap/storage_engine.h"
+#include "common/status.h"
+#include "olap/olap_common.h"
#include "runtime/mem_pool.h"
-#include "runtime/mem_tracker.h"
#include "vec/aggregate_functions/aggregate_function_reader.h"
#include "vec/olap/vcollect_iterator.h"
diff --git a/be/src/vec/olap/block_reader.h b/be/src/vec/olap/block_reader.h
index 91ae5f670f..ad2d27d1d8 100644
--- a/be/src/vec/olap/block_reader.h
+++ b/be/src/vec/olap/block_reader.h
@@ -19,13 +19,9 @@
#include <parallel_hashmap/phmap.h>
-#include "olap/collect_iterator.h"
#include "olap/reader.h"
#include "olap/rowset/rowset_reader.h"
-#include "olap/tablet.h"
#include "vec/aggregate_functions/aggregate_function.h"
-#include "vec/aggregate_functions/aggregate_function_reader.h"
-#include "vec/aggregate_functions/aggregate_function_simple_factory.h"
#include "vec/olap/vcollect_iterator.h"
namespace doris {
@@ -50,9 +46,6 @@ public:
}
private:
- friend class VCollectIterator;
- friend class DeleteHandler;
-
// Directly read row from rowset and pass to upper caller. No need to do aggregation.
// This is usually used for DUPLICATE KEY tables
Status _direct_next_block(Block* block, MemPool* mem_pool, ObjectPool* agg_pool, bool* eof);
diff --git a/be/test/olap/schema_change_test.cpp b/be/test/olap/schema_change_test.cpp
index 7bbf3533ab..019c7635f8 100644
--- a/be/test/olap/schema_change_test.cpp
+++ b/be/test/olap/schema_change_test.cpp
@@ -996,7 +996,7 @@ TEST_F(TestColumn, ConvertIntToBitmap) {
TabletSchema mv_tablet_schema;
mv_tablet_schema.init_from_pb(mv_tablet_schema_pb);
- RowBlockChanger row_block_changer(mv_tablet_schema);
+ RowBlockChanger row_block_changer(mv_tablet_schema, DescriptorTbl());
ColumnMapping* column_mapping = row_block_changer.get_mutable_column_mapping(0);
column_mapping->ref_column = 0;
column_mapping = row_block_changer.get_mutable_column_mapping(1);
@@ -1079,7 +1079,7 @@ TEST_F(TestColumn, ConvertCharToHLL) {
TabletSchema mv_tablet_schema;
mv_tablet_schema.init_from_pb(mv_tablet_schema_pb);
- RowBlockChanger row_block_changer(mv_tablet_schema);
+ RowBlockChanger row_block_changer(mv_tablet_schema, DescriptorTbl());
ColumnMapping* column_mapping = row_block_changer.get_mutable_column_mapping(0);
column_mapping->ref_column = 0;
column_mapping = row_block_changer.get_mutable_column_mapping(1);
@@ -1160,7 +1160,7 @@ TEST_F(TestColumn, ConvertCharToCount) {
TabletSchema mv_tablet_schema;
mv_tablet_schema.init_from_pb(mv_tablet_schema_pb);
- RowBlockChanger row_block_changer(mv_tablet_schema);
+ RowBlockChanger row_block_changer(mv_tablet_schema, DescriptorTbl());
ColumnMapping* column_mapping = row_block_changer.get_mutable_column_mapping(0);
column_mapping->ref_column = 0;
column_mapping = row_block_changer.get_mutable_column_mapping(1);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java b/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java
index 9a66f10772..a05863d9ea 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java
@@ -19,10 +19,13 @@ package org.apache.doris.alter;
import org.apache.doris.analysis.Analyzer;
import org.apache.doris.analysis.CreateMaterializedViewStmt;
+import org.apache.doris.analysis.DescriptorTable;
import org.apache.doris.analysis.Expr;
import org.apache.doris.analysis.MVColumnItem;
+import org.apache.doris.analysis.SlotDescriptor;
import org.apache.doris.analysis.SqlParser;
import org.apache.doris.analysis.SqlScanner;
+import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Database;
@@ -363,14 +366,22 @@ public class RollupJobV2 extends AlterJobV2 implements GsonPostProcessable {
}
}
+ List<Column> fullSchema = tbl.getBaseSchema(true);
+ DescriptorTable descTable = new DescriptorTable();
+ for (Column column : fullSchema) {
+ TupleDescriptor destTupleDesc = descTable.createTupleDescriptor();
+ SlotDescriptor destSlotDesc = descTable.addSlotDescriptor(destTupleDesc);
+ destSlotDesc.setIsMaterialized(true);
+ destSlotDesc.setColumn(column);
+ destSlotDesc.setIsNullable(column.isAllowNull());
+ }
+
List<Replica> rollupReplicas = rollupTablet.getReplicas();
for (Replica rollupReplica : rollupReplicas) {
- AlterReplicaTask rollupTask = new AlterReplicaTask(
- rollupReplica.getBackendId(), dbId, tableId, partitionId,
- rollupIndexId, baseIndexId,
- rollupTabletId, baseTabletId, rollupReplica.getId(),
- rollupSchemaHash, baseSchemaHash,
- visibleVersion, jobId, JobType.ROLLUP, defineExprs);
+ AlterReplicaTask rollupTask = new AlterReplicaTask(rollupReplica.getBackendId(), dbId, tableId,
+ partitionId, rollupIndexId, baseIndexId, rollupTabletId, baseTabletId,
+ rollupReplica.getId(), rollupSchemaHash, baseSchemaHash, visibleVersion, jobId,
+ JobType.ROLLUP, defineExprs, descTable);
rollupBatchTask.addTask(rollupTask);
}
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
index 411ab275a5..4f38cdafda 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
@@ -17,6 +17,11 @@
package org.apache.doris.alter;
+import org.apache.doris.analysis.DescriptorTable;
+import org.apache.doris.analysis.Expr;
+import org.apache.doris.analysis.SlotDescriptor;
+import org.apache.doris.analysis.SlotRef;
+import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Database;
@@ -375,13 +380,21 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
tbl.readLock();
try {
+ Map<String, Column> indexColumnMap = Maps.newHashMap();
+ for (Map.Entry<Long, List<Column>> entry : indexSchemaMap.entrySet()) {
+ for (Column column : entry.getValue()) {
+ indexColumnMap.put(column.getName(), column);
+ }
+ }
+
Preconditions.checkState(tbl.getState() == OlapTableState.SCHEMA_CHANGE);
for (long partitionId : partitionIndexMap.rowKeySet()) {
Partition partition = tbl.getPartition(partitionId);
Preconditions.checkNotNull(partition, partitionId);
- // the schema change task will transform the data before visible version(included).
+ // the schema change task will transform the data before visible
+ // version(included).
long visibleVersion = partition.getVisibleVersion();
Map<Long, MaterializedIndex> shadowIndexMap = partitionIndexMap.row(partitionId);
@@ -389,6 +402,32 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
long shadowIdxId = entry.getKey();
MaterializedIndex shadowIdx = entry.getValue();
+ Map<String, Expr> defineExprs = Maps.newHashMap();
+
+ List<Column> fullSchema = tbl.getBaseSchema(true);
+ DescriptorTable descTable = new DescriptorTable();
+ for (Column column : fullSchema) {
+ TupleDescriptor destTupleDesc = descTable.createTupleDescriptor();
+ SlotDescriptor destSlotDesc = descTable.addSlotDescriptor(destTupleDesc);
+ destSlotDesc.setIsMaterialized(true);
+ destSlotDesc.setColumn(column);
+ destSlotDesc.setIsNullable(column.isAllowNull());
+
+ if (indexColumnMap.containsKey(SchemaChangeHandler.SHADOW_NAME_PRFIX + column.getName())) {
+ Column newColumn = indexColumnMap
+ .get(SchemaChangeHandler.SHADOW_NAME_PRFIX + column.getName());
+ if (newColumn.getType() != column.getType()) {
+ try {
+ defineExprs.put(column.getName(),
+ new SlotRef(destSlotDesc).castTo(newColumn.getType()));
+ } catch (AnalysisException e) {
+ throw new AlterCancelException(e.getMessage());
+ }
+ }
+ }
+
+ }
+
long originIdxId = indexIdMap.get(shadowIdxId);
int shadowSchemaHash = indexSchemaVersionAndHashMap.get(shadowIdxId).schemaHash;
int originSchemaHash = tbl.getSchemaHashByIndexId(indexIdMap.get(shadowIdxId));
@@ -398,12 +437,10 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
long originTabletId = partitionIndexTabletMap.get(partitionId, shadowIdxId).get(shadowTabletId);
List<Replica> shadowReplicas = shadowTablet.getReplicas();
for (Replica shadowReplica : shadowReplicas) {
- AlterReplicaTask rollupTask = new AlterReplicaTask(
- shadowReplica.getBackendId(), dbId, tableId, partitionId,
- shadowIdxId, originIdxId,
- shadowTabletId, originTabletId, shadowReplica.getId(),
- shadowSchemaHash, originSchemaHash,
- visibleVersion, jobId, JobType.SCHEMA_CHANGE);
+ AlterReplicaTask rollupTask = new AlterReplicaTask(shadowReplica.getBackendId(), dbId,
+ tableId, partitionId, shadowIdxId, originIdxId, shadowTabletId, originTabletId,
+ shadowReplica.getId(), shadowSchemaHash, originSchemaHash, visibleVersion, jobId,
+ JobType.SCHEMA_CHANGE, defineExprs, descTable);
schemaChangeBatchTask.addTask(rollupTask);
}
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/Analyzer.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/Analyzer.java
index b965ec4b38..e94e3ef290 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/Analyzer.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/Analyzer.java
@@ -800,11 +800,8 @@ public class Analyzer {
}
result = globalState.descTbl.addSlotDescriptor(d);
result.setColumn(col);
- if (col.isAllowNull() || isOuterJoined(d.getId())) {
- result.setIsNullable(true);
- } else {
- result.setIsNullable(false);
- }
+ result.setIsNullable(col.isAllowNull() || isOuterJoined(d.getId()));
+
slotRefMap.put(key, result);
return result;
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/InsertStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/InsertStmt.java
index 7dd92af01c..579d9d1512 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/InsertStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/InsertStmt.java
@@ -352,11 +352,7 @@ public class InsertStmt extends DdlStmt {
slotDesc.setIsMaterialized(true);
slotDesc.setType(col.getType());
slotDesc.setColumn(col);
- if (col.isAllowNull()) {
- slotDesc.setIsNullable(true);
- } else {
- slotDesc.setIsNullable(false);
- }
+ slotDesc.setIsNullable(col.isAllowNull());
}
// will use it during create load job
indexIdToSchemaHash = olapTable.getIndexIdToSchemaHash();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java
index 240edf92fc..b91c839441 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java
@@ -112,11 +112,7 @@ public class LoadingTaskPlanner {
SlotDescriptor slotDesc = descTable.addSlotDescriptor(destTupleDesc);
slotDesc.setIsMaterialized(true);
slotDesc.setColumn(col);
- if (col.isAllowNull()) {
- slotDesc.setIsNullable(true);
- } else {
- slotDesc.setIsNullable(false);
- }
+ slotDesc.setIsNullable(col.isAllowNull());
}
// Generate plan trees
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java
index 37bae6bd07..cbc8f64f1f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java
@@ -881,11 +881,7 @@ public class SparkLoadJob extends BulkLoadJob {
SlotDescriptor destSlotDesc = descTable.addSlotDescriptor(destTupleDesc);
destSlotDesc.setIsMaterialized(true);
destSlotDesc.setColumn(column);
- if (column.isAllowNull()) {
- destSlotDesc.setIsNullable(true);
- } else {
- destSlotDesc.setIsNullable(false);
- }
+ destSlotDesc.setIsNullable(column.isAllowNull());
}
initTBrokerScanRange(descTable, destTupleDesc, columns, brokerDesc);
initTDescriptorTable(descTable);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/update/UpdatePlanner.java b/fe/fe-core/src/main/java/org/apache/doris/load/update/UpdatePlanner.java
index 501f5bcc76..887e2b33e1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/update/UpdatePlanner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/update/UpdatePlanner.java
@@ -113,11 +113,7 @@ public class UpdatePlanner extends Planner {
slotDesc.setIsMaterialized(true);
slotDesc.setType(col.getType());
slotDesc.setColumn(col);
- if (col.isAllowNull()) {
- slotDesc.setIsNullable(true);
- } else {
- slotDesc.setIsNullable(false);
- }
+ slotDesc.setIsNullable(col.isAllowNull());
}
targetTupleDesc.computeStatAndMemLayout();
return targetTupleDesc;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/AlterReplicaTask.java b/fe/fe-core/src/main/java/org/apache/doris/task/AlterReplicaTask.java
index f25b01d2f1..4235ce0ded 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/task/AlterReplicaTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/task/AlterReplicaTask.java
@@ -18,6 +18,7 @@
package org.apache.doris.task;
import org.apache.doris.alter.AlterJobV2;
+import org.apache.doris.analysis.DescriptorTable;
import org.apache.doris.analysis.Expr;
import org.apache.doris.analysis.SlotRef;
import org.apache.doris.thrift.TAlterMaterializedViewParam;
@@ -46,21 +47,16 @@ public class AlterReplicaTask extends AgentTask {
private AlterJobV2.JobType jobType;
private Map<String, Expr> defineExprs;
-
- public AlterReplicaTask(long backendId, long dbId, long tableId,
- long partitionId, long rollupIndexId, long baseIndexId, long rollupTabletId,
- long baseTabletId, long newReplicaId, int newSchemaHash, int baseSchemaHash,
- long version, long jobId, AlterJobV2.JobType jobType) {
- this(backendId, dbId, tableId, partitionId,
- rollupIndexId, baseIndexId, rollupTabletId,
- baseTabletId, newReplicaId, newSchemaHash, baseSchemaHash,
- version, jobId, jobType, null);
- }
-
- public AlterReplicaTask(long backendId, long dbId, long tableId,
- long partitionId, long rollupIndexId, long baseIndexId, long rollupTabletId,
- long baseTabletId, long newReplicaId, int newSchemaHash, int baseSchemaHash,
- long version, long jobId, AlterJobV2.JobType jobType, Map<String, Expr> defineExprs) {
+ private DescriptorTable descTable;
+
+ /**
+ * AlterReplicaTask constructor.
+ *
+ */
+ public AlterReplicaTask(long backendId, long dbId, long tableId, long partitionId, long rollupIndexId,
+ long baseIndexId, long rollupTabletId, long baseTabletId, long newReplicaId, int newSchemaHash,
+ int baseSchemaHash, long version, long jobId, AlterJobV2.JobType jobType, Map<String, Expr> defineExprs,
+ DescriptorTable descTable) {
super(null, backendId, TTaskType.ALTER, dbId, tableId, partitionId, rollupIndexId, rollupTabletId);
this.baseTabletId = baseTabletId;
@@ -74,6 +70,7 @@ public class AlterReplicaTask extends AgentTask {
this.jobType = jobType;
this.defineExprs = defineExprs;
+ this.descTable = descTable;
}
public long getBaseTabletId() {
@@ -117,6 +114,7 @@ public class AlterReplicaTask extends AgentTask {
req.addToMaterializedViewParams(mvParam);
}
}
+ req.setDescTbl(descTable.toThrift());
return req;
}
}
diff --git a/fe/fe-core/src/test/java/org/apache/doris/planner/StreamLoadScanNodeTest.java b/fe/fe-core/src/test/java/org/apache/doris/planner/StreamLoadScanNodeTest.java
index 44e6e2ec62..7ffc14dd06 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/planner/StreamLoadScanNodeTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/planner/StreamLoadScanNodeTest.java
@@ -182,11 +182,7 @@ public class StreamLoadScanNodeTest {
SlotDescriptor slot = descTbl.addSlotDescriptor(dstDesc);
slot.setColumn(column);
slot.setIsMaterialized(true);
- if (column.isAllowNull()) {
- slot.setIsNullable(true);
- } else {
- slot.setIsNullable(false);
- }
+ slot.setIsNullable(column.isAllowNull());
}
TStreamLoadPutRequest request = getBaseRequest();
@@ -230,11 +226,7 @@ public class StreamLoadScanNodeTest {
SlotDescriptor slot = descTbl.addSlotDescriptor(dstDesc);
slot.setColumn(column);
slot.setIsMaterialized(true);
- if (column.isAllowNull()) {
- slot.setIsNullable(true);
- } else {
- slot.setIsNullable(false);
- }
+ slot.setIsNullable(column.isAllowNull());
}
TStreamLoadPutRequest request = getBaseRequest();
@@ -259,11 +251,7 @@ public class StreamLoadScanNodeTest {
SlotDescriptor slot = descTbl.addSlotDescriptor(dstDesc);
slot.setColumn(column);
slot.setIsMaterialized(true);
- if (column.isAllowNull()) {
- slot.setIsNullable(true);
- } else {
- slot.setIsNullable(false);
- }
+ slot.setIsNullable(column.isAllowNull());
}
TStreamLoadPutRequest request = getBaseRequest();
@@ -288,11 +276,7 @@ public class StreamLoadScanNodeTest {
SlotDescriptor slot = descTbl.addSlotDescriptor(dstDesc);
slot.setColumn(column);
slot.setIsMaterialized(true);
- if (column.isAllowNull()) {
- slot.setIsNullable(true);
- } else {
- slot.setIsNullable(false);
- }
+ slot.setIsNullable(column.isAllowNull());
}
new Expectations() {
@@ -332,11 +316,7 @@ public class StreamLoadScanNodeTest {
SlotDescriptor slot = descTbl.addSlotDescriptor(dstDesc);
slot.setColumn(column);
slot.setIsMaterialized(true);
- if (column.isAllowNull()) {
- slot.setIsNullable(true);
- } else {
- slot.setIsNullable(false);
- }
+ slot.setIsNullable(column.isAllowNull());
}
new Expectations() {
@@ -379,11 +359,7 @@ public class StreamLoadScanNodeTest {
SlotDescriptor slot = descTbl.addSlotDescriptor(dstDesc);
slot.setColumn(column);
slot.setIsMaterialized(true);
- if (column.isAllowNull()) {
- slot.setIsNullable(true);
- } else {
- slot.setIsNullable(false);
- }
+ slot.setIsNullable(column.isAllowNull());
}
new Expectations() {
@@ -434,11 +410,7 @@ public class StreamLoadScanNodeTest {
SlotDescriptor slot = descTbl.addSlotDescriptor(dstDesc);
slot.setColumn(column);
slot.setIsMaterialized(true);
- if (column.isAllowNull()) {
- slot.setIsNullable(true);
- } else {
- slot.setIsNullable(false);
- }
+ slot.setIsNullable(column.isAllowNull());
}
TStreamLoadPutRequest request = getBaseRequest();
@@ -464,11 +436,7 @@ public class StreamLoadScanNodeTest {
SlotDescriptor slot = descTbl.addSlotDescriptor(dstDesc);
slot.setColumn(column);
slot.setIsMaterialized(true);
- if (column.isAllowNull()) {
- slot.setIsNullable(true);
- } else {
- slot.setIsNullable(false);
- }
+ slot.setIsNullable(column.isAllowNull());
}
TStreamLoadPutRequest request = getBaseRequest();
@@ -494,11 +462,7 @@ public class StreamLoadScanNodeTest {
SlotDescriptor slot = descTbl.addSlotDescriptor(dstDesc);
slot.setColumn(column);
slot.setIsMaterialized(true);
- if (column.isAllowNull()) {
- slot.setIsNullable(true);
- } else {
- slot.setIsNullable(false);
- }
+ slot.setIsNullable(column.isAllowNull());
}
new Expectations() {
@@ -548,11 +512,7 @@ public class StreamLoadScanNodeTest {
SlotDescriptor slot = descTbl.addSlotDescriptor(dstDesc);
slot.setColumn(column);
slot.setIsMaterialized(true);
- if (column.isAllowNull()) {
- slot.setIsNullable(true);
- } else {
- slot.setIsNullable(false);
- }
+ slot.setIsNullable(column.isAllowNull());
}
new Expectations() {
@@ -594,11 +554,7 @@ public class StreamLoadScanNodeTest {
SlotDescriptor slot = descTbl.addSlotDescriptor(dstDesc);
slot.setColumn(column);
slot.setIsMaterialized(true);
- if (column.isAllowNull()) {
- slot.setIsNullable(true);
- } else {
- slot.setIsNullable(false);
- }
+ slot.setIsNullable(column.isAllowNull());
}
new Expectations() {
@@ -646,11 +602,7 @@ public class StreamLoadScanNodeTest {
SlotDescriptor slot = descTbl.addSlotDescriptor(dstDesc);
slot.setColumn(column);
slot.setIsMaterialized(true);
- if (column.isAllowNull()) {
- slot.setIsNullable(true);
- } else {
- slot.setIsNullable(false);
- }
+ slot.setIsNullable(column.isAllowNull());
}
new Expectations() {
@@ -701,11 +653,7 @@ public class StreamLoadScanNodeTest {
SlotDescriptor slot = descTbl.addSlotDescriptor(dstDesc);
slot.setColumn(column);
slot.setIsMaterialized(true);
- if (column.isAllowNull()) {
- slot.setIsNullable(true);
- } else {
- slot.setIsNullable(false);
- }
+ slot.setIsNullable(column.isAllowNull());
}
new Expectations() {
@@ -757,11 +705,7 @@ public class StreamLoadScanNodeTest {
System.out.println(column);
slot.setColumn(column);
slot.setIsMaterialized(true);
- if (column.isAllowNull()) {
- slot.setIsNullable(true);
- } else {
- slot.setIsNullable(false);
- }
+ slot.setIsNullable(column.isAllowNull());
}
new Expectations() {
@@ -823,11 +767,7 @@ public class StreamLoadScanNodeTest {
slot.setColumn(column);
slot.setIsMaterialized(true);
- if (column.isAllowNull()) {
- slot.setIsNullable(true);
- } else {
- slot.setIsNullable(false);
- }
+ slot.setIsNullable(column.isAllowNull());
}
new Expectations() {
diff --git a/gensrc/thrift/AgentService.thrift b/gensrc/thrift/AgentService.thrift
index e695d7a737..740db1fd08 100644
--- a/gensrc/thrift/AgentService.thrift
+++ b/gensrc/thrift/AgentService.thrift
@@ -151,6 +151,7 @@ struct TAlterTabletReqV2 {
6: optional Types.TVersionHash alter_version_hash // Deprecated
7: optional list<TAlterMaterializedViewParam> materialized_view_params
8: optional TAlterTabletType alter_tablet_type = TAlterTabletType.SCHEMA_CHANGE
+ 9: optional Descriptors.TDescriptorTable desc_tbl
}
struct TAlterMaterializedViewParam {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org