You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by li...@apache.org on 2020/07/11 02:18:40 UTC
[incubator-doris] branch master updated: [Bug]Fix some schema
change not work right (#4009)
This is an automated email from the ASF dual-hosted git repository.
lichaoyong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new d7893f0 [Bug]Fix some schema change not work right (#4009)
d7893f0 is described below
commit d7893f0fa7c9f68a50f438080891dacff0ad1408
Author: WingC <10...@qq.com>
AuthorDate: Fri Jul 10 21:18:29 2020 -0500
[Bug]Fix some schema change not work right (#4009)
[Bug]Fix some schema change not work right
This CL mainly fix some schema change to varchar type not work right
because forget to logic check && Add ConvertTypeResolver to add
supported convert type in order to avoid forget logic check
---
be/src/olap/schema_change.cpp | 708 +++++++++++----------
be/src/olap/schema_change.h | 130 ++--
.../sql-statements/Data Definition/ALTER TABLE.md | 1 -
.../sql-statements/Data Definition/ALTER TABLE.md | 1 -
.../main/java/org/apache/doris/catalog/Column.java | 6 +
.../java/org/apache/doris/catalog/ColumnType.java | 9 +-
6 files changed, 419 insertions(+), 436 deletions(-)
diff --git a/be/src/olap/schema_change.cpp b/be/src/olap/schema_change.cpp
index 54b9794..4458081 100644
--- a/be/src/olap/schema_change.cpp
+++ b/be/src/olap/schema_change.cpp
@@ -23,21 +23,21 @@
#include <algorithm>
#include <vector>
+#include "agent/cgroups_mgr.h"
+#include "common/resource_tls.h"
#include "olap/merger.h"
-#include "olap/storage_engine.h"
-#include "olap/tablet.h"
+#include "olap/row.h"
#include "olap/row_block.h"
#include "olap/row_cursor.h"
-#include "olap/wrapper_field.h"
-#include "olap/row.h"
#include "olap/rowset/rowset_factory.h"
#include "olap/rowset/rowset_id_generator.h"
-#include "runtime/mem_pool.h"
-#include "runtime/mem_tracker.h"
-#include "common/resource_tls.h"
-#include "agent/cgroups_mgr.h"
+#include "olap/storage_engine.h"
+#include "olap/tablet.h"
+#include "olap/wrapper_field.h"
#include "runtime/exec_env.h"
#include "runtime/heartbeat_flags.h"
+#include "runtime/mem_pool.h"
+#include "runtime/mem_tracker.h"
using std::deque;
using std::list;
@@ -70,10 +70,8 @@ public:
explicit RowBlockMerger(TabletSharedPtr tablet);
virtual ~RowBlockMerger();
- bool merge(
- const std::vector<RowBlock*>& row_block_arr,
- RowsetWriter* rowset_writer,
- uint64_t* merged_rows);
+ bool merge(const std::vector<RowBlock*>& row_block_arr, RowsetWriter* rowset_writer,
+ uint64_t* merged_rows);
private:
struct MergeElement {
@@ -93,9 +91,8 @@ private:
std::priority_queue<MergeElement> _heap;
};
-
RowBlockChanger::RowBlockChanger(const TabletSchema& tablet_schema,
- const TabletSharedPtr &base_tablet) {
+ const TabletSharedPtr& base_tablet) {
_schema_mapping.resize(tablet_schema.num_columns());
}
@@ -124,81 +121,141 @@ ColumnMapping* RowBlockChanger::get_mutable_column_mapping(size_t column_index)
return &(_schema_mapping[column_index]);
}
-#define TYPE_REINTERPRET_CAST(FromType, ToType) \
-{ \
- size_t row_num = ref_block->row_block_info().row_num; \
- for (size_t row = 0, mutable_row = 0; row < row_num; ++row) { \
- if (is_data_left_vec[row] != 0) { \
- char* ref_ptr = ref_block->field_ptr(row, ref_column); \
- char* new_ptr = mutable_block->field_ptr(mutable_row++, i); \
- *new_ptr = *ref_ptr; \
- *(ToType*)(new_ptr + 1) = *(FromType*)(ref_ptr + 1); \
- } \
- } \
- break; \
-}
+#define TYPE_REINTERPRET_CAST(FromType, ToType) \
+ { \
+ size_t row_num = ref_block->row_block_info().row_num; \
+ for (size_t row = 0, mutable_row = 0; row < row_num; ++row) { \
+ if (is_data_left_vec[row] != 0) { \
+ char* ref_ptr = ref_block->field_ptr(row, ref_column); \
+ char* new_ptr = mutable_block->field_ptr(mutable_row++, i); \
+ *new_ptr = *ref_ptr; \
+ *(ToType*)(new_ptr + 1) = *(FromType*)(ref_ptr + 1); \
+ } \
+ } \
+ break; \
+ }
+
+#define LARGEINT_REINTERPRET_CAST(FromType, ToType) \
+ { \
+ size_t row_num = ref_block->row_block_info().row_num; \
+ for (size_t row = 0, mutable_row = 0; row < row_num; ++row) { \
+ if (is_data_left_vec[row] != 0) { \
+ char* ref_ptr = ref_block->field_ptr(row, ref_column); \
+ char* new_ptr = mutable_block->field_ptr(mutable_row++, i); \
+ *new_ptr = *ref_ptr; \
+ ToType new_value = *(FromType*)(ref_ptr + 1); \
+ memcpy(new_ptr + 1, &new_value, sizeof(ToType)); \
+ } \
+ } \
+ break; \
+ }
+
+#define CONVERT_FROM_TYPE(from_type) \
+ { \
+ switch (mutable_block->tablet_schema().column(i).type()) { \
+ case OLAP_FIELD_TYPE_TINYINT: \
+ TYPE_REINTERPRET_CAST(from_type, int8_t); \
+ case OLAP_FIELD_TYPE_UNSIGNED_TINYINT: \
+ TYPE_REINTERPRET_CAST(from_type, uint8_t); \
+ case OLAP_FIELD_TYPE_SMALLINT: \
+ TYPE_REINTERPRET_CAST(from_type, int16_t); \
+ case OLAP_FIELD_TYPE_UNSIGNED_SMALLINT: \
+ TYPE_REINTERPRET_CAST(from_type, uint16_t); \
+ case OLAP_FIELD_TYPE_INT: \
+ TYPE_REINTERPRET_CAST(from_type, int32_t); \
+ case OLAP_FIELD_TYPE_UNSIGNED_INT: \
+ TYPE_REINTERPRET_CAST(from_type, uint32_t); \
+ case OLAP_FIELD_TYPE_BIGINT: \
+ TYPE_REINTERPRET_CAST(from_type, int64_t); \
+ case OLAP_FIELD_TYPE_UNSIGNED_BIGINT: \
+ TYPE_REINTERPRET_CAST(from_type, uint64_t); \
+ case OLAP_FIELD_TYPE_LARGEINT: \
+ LARGEINT_REINTERPRET_CAST(from_type, int128_t); \
+ case OLAP_FIELD_TYPE_DOUBLE: \
+ TYPE_REINTERPRET_CAST(from_type, double); \
+ default: \
+ LOG(WARNING) << "the column type which was altered to was unsupported." \
+ << " origin_type=" \
+ << ref_block->tablet_schema().column(ref_column).type() \
+ << ", alter_type=" << mutable_block->tablet_schema().column(i).type(); \
+ return false; \
+ } \
+ break; \
+ }
+
+#define ASSIGN_DEFAULT_VALUE(length) \
+ case length: { \
+ for (size_t row = 0; row < ref_block.row_block_info().row_num; ++row) { \
+ memcpy(buf, _schema_mapping[i].default_value->ptr(), length); \
+ buf += length; \
+ } \
+ break; \
+ }
-#define LARGEINT_REINTERPRET_CAST(FromType, ToType) \
-{ \
- size_t row_num = ref_block->row_block_info().row_num; \
- for (size_t row = 0, mutable_row = 0; row < row_num; ++row) { \
- if (is_data_left_vec[row] != 0) { \
- char* ref_ptr = ref_block->field_ptr(row, ref_column); \
- char* new_ptr = mutable_block->field_ptr(mutable_row++, i); \
- *new_ptr = *ref_ptr; \
- ToType new_value = *(FromType*)(ref_ptr + 1); \
- memcpy(new_ptr + 1, &new_value, sizeof(ToType)); \
- } \
- } \
- break; \
-}
+struct ConvertTypeMapHash {
+ size_t operator()(const std::pair<FieldType, FieldType>& pair) const {
+ return (pair.first + 31) ^ pair.second;
+ }
+};
-#define CONVERT_FROM_TYPE(from_type) \
-{ \
- switch (mutable_block->tablet_schema().column(i).type()) {\
- case OLAP_FIELD_TYPE_TINYINT: \
- TYPE_REINTERPRET_CAST(from_type, int8_t); \
- case OLAP_FIELD_TYPE_UNSIGNED_TINYINT: \
- TYPE_REINTERPRET_CAST(from_type, uint8_t); \
- case OLAP_FIELD_TYPE_SMALLINT: \
- TYPE_REINTERPRET_CAST(from_type, int16_t); \
- case OLAP_FIELD_TYPE_UNSIGNED_SMALLINT: \
- TYPE_REINTERPRET_CAST(from_type, uint16_t); \
- case OLAP_FIELD_TYPE_INT: \
- TYPE_REINTERPRET_CAST(from_type, int32_t); \
- case OLAP_FIELD_TYPE_UNSIGNED_INT: \
- TYPE_REINTERPRET_CAST(from_type, uint32_t); \
- case OLAP_FIELD_TYPE_BIGINT: \
- TYPE_REINTERPRET_CAST(from_type, int64_t); \
- case OLAP_FIELD_TYPE_UNSIGNED_BIGINT: \
- TYPE_REINTERPRET_CAST(from_type, uint64_t); \
- case OLAP_FIELD_TYPE_LARGEINT: \
- LARGEINT_REINTERPRET_CAST(from_type, int128_t); \
- case OLAP_FIELD_TYPE_DOUBLE: \
- TYPE_REINTERPRET_CAST(from_type, double); \
- default: \
- LOG(WARNING) << "the column type which was altered to was unsupported." \
- << " origin_type=" << ref_block->tablet_schema().column(ref_column).type() \
- << ", alter_type=" << mutable_block->tablet_schema().column(i).type(); \
- return false; \
- } \
- break; \
-}
+class ConvertTypeResolver {
+ DECLARE_SINGLETON(ConvertTypeResolver);
-#define ASSIGN_DEFAULT_VALUE(length) \
- case length: { \
- for (size_t row = 0; row < ref_block.row_block_info().row_num; ++row) { \
- memcpy(buf, _schema_mapping[i].default_value->ptr(), length); \
- buf += length; \
- } \
- break; \
+public:
+ bool get_convert_type_info(const FieldType from_type, const FieldType to_type) const {
+ return _convert_type_set.find(std::make_pair(from_type, to_type)) !=
+ _convert_type_set.end();
+ }
+
+ template <FieldType from_type, FieldType to_type>
+ void add_convert_type_mapping() {
+ _convert_type_set.emplace(std::make_pair(from_type, to_type));
}
-bool RowBlockChanger::change_row_block(
- const RowBlock* ref_block,
- int32_t data_version,
- RowBlock* mutable_block,
- uint64_t* filtered_rows) const {
+private:
+ typedef std::pair<FieldType, FieldType> convert_type_pair;
+ std::unordered_set<convert_type_pair, ConvertTypeMapHash> _convert_type_set;
+
+ DISALLOW_COPY_AND_ASSIGN(ConvertTypeResolver);
+};
+
+ConvertTypeResolver::ConvertTypeResolver() {
+ // supported type convert should annotate in doc:
+ // http://doris.incubator.apache.org/master/zh-CN/sql-reference/sql-statements/Data%20Definition/ALTER%20TABLE.html#description
+ // If type convert is supported here, you should check fe/src/main/java/org/apache/doris/catalog/ColumnType.java to supported it either
+ // from varchar type
+ add_convert_type_mapping<OLAP_FIELD_TYPE_VARCHAR, OLAP_FIELD_TYPE_TINYINT>();
+ add_convert_type_mapping<OLAP_FIELD_TYPE_VARCHAR, OLAP_FIELD_TYPE_SMALLINT>();
+ add_convert_type_mapping<OLAP_FIELD_TYPE_VARCHAR, OLAP_FIELD_TYPE_INT>();
+ add_convert_type_mapping<OLAP_FIELD_TYPE_VARCHAR, OLAP_FIELD_TYPE_BIGINT>();
+ add_convert_type_mapping<OLAP_FIELD_TYPE_VARCHAR, OLAP_FIELD_TYPE_LARGEINT>();
+ add_convert_type_mapping<OLAP_FIELD_TYPE_VARCHAR, OLAP_FIELD_TYPE_FLOAT>();
+ add_convert_type_mapping<OLAP_FIELD_TYPE_VARCHAR, OLAP_FIELD_TYPE_DOUBLE>();
+ add_convert_type_mapping<OLAP_FIELD_TYPE_VARCHAR, OLAP_FIELD_TYPE_DATE>();
+
+ // to varchar type
+ add_convert_type_mapping<OLAP_FIELD_TYPE_TINYINT, OLAP_FIELD_TYPE_VARCHAR>();
+ add_convert_type_mapping<OLAP_FIELD_TYPE_SMALLINT, OLAP_FIELD_TYPE_VARCHAR>();
+ add_convert_type_mapping<OLAP_FIELD_TYPE_INT, OLAP_FIELD_TYPE_VARCHAR>();
+ add_convert_type_mapping<OLAP_FIELD_TYPE_BIGINT, OLAP_FIELD_TYPE_VARCHAR>();
+ add_convert_type_mapping<OLAP_FIELD_TYPE_LARGEINT, OLAP_FIELD_TYPE_VARCHAR>();
+ add_convert_type_mapping<OLAP_FIELD_TYPE_FLOAT, OLAP_FIELD_TYPE_VARCHAR>();
+ add_convert_type_mapping<OLAP_FIELD_TYPE_DOUBLE, OLAP_FIELD_TYPE_VARCHAR>();
+ add_convert_type_mapping<OLAP_FIELD_TYPE_DECIMAL, OLAP_FIELD_TYPE_VARCHAR>();
+
+ add_convert_type_mapping<OLAP_FIELD_TYPE_DATE, OLAP_FIELD_TYPE_DATETIME>();
+
+ add_convert_type_mapping<OLAP_FIELD_TYPE_DATETIME, OLAP_FIELD_TYPE_DATE>();
+
+ add_convert_type_mapping<OLAP_FIELD_TYPE_FLOAT, OLAP_FIELD_TYPE_DOUBLE>();
+
+ add_convert_type_mapping<OLAP_FIELD_TYPE_INT, OLAP_FIELD_TYPE_DATE>();
+}
+
+ConvertTypeResolver::~ConvertTypeResolver() {}
+
+bool RowBlockChanger::change_row_block(const RowBlock* ref_block, int32_t data_version,
+ RowBlock* mutable_block, uint64_t* filtered_rows) const {
if (mutable_block == nullptr) {
LOG(FATAL) << "mutable block is uninitialized.";
return false;
@@ -262,7 +319,8 @@ bool RowBlockChanger::change_row_block(
MemPool* mem_pool = mutable_block->mem_pool();
// b. 根据前面的过滤信息,只对还标记为1的处理
- for (size_t i = 0, len = mutable_block->tablet_schema().num_columns(); !filter_all && i < len; ++i) {
+ for (size_t i = 0, len = mutable_block->tablet_schema().num_columns(); !filter_all && i < len;
+ ++i) {
int32_t ref_column = _schema_mapping[i].ref_column;
if (_schema_mapping[i].ref_column >= 0) {
@@ -273,7 +331,7 @@ bool RowBlockChanger::change_row_block(
if (newtype == reftype) {
// 效率低下,也可以直接计算变长域拷贝,但仍然会破坏封装
for (size_t row_index = 0, new_row_index = 0;
- row_index < ref_block->row_block_info().row_num; ++row_index) {
+ row_index < ref_block->row_block_info().row_num; ++row_index) {
// 不需要的row,每次处理到这个row时就跳过
if (need_filter_data && is_data_left_vec[row_index] == 0) {
continue;
@@ -286,7 +344,6 @@ bool RowBlockChanger::change_row_block(
if (true == read_helper.is_null(ref_column)) {
write_helper.set_null(i);
} else {
-
write_helper.set_not_null(i);
if (newtype == OLAP_FIELD_TYPE_CHAR) {
// if modify length of CHAR type, the size of slice should be equal
@@ -298,7 +355,8 @@ bool RowBlockChanger::change_row_block(
size_t copy_size = (size < src->size) ? size : src->size;
memcpy(buf, src->data, copy_size);
Slice dst(buf, size);
- write_helper.set_field_content(i, reinterpret_cast<char*>(&dst), mem_pool);
+ write_helper.set_field_content(i, reinterpret_cast<char*>(&dst),
+ mem_pool);
} else {
char* src = read_helper.cell_ptr(ref_column);
write_helper.set_field_content(i, src, mem_pool);
@@ -310,7 +368,7 @@ bool RowBlockChanger::change_row_block(
} else if (newtype == OLAP_FIELD_TYPE_VARCHAR && reftype == OLAP_FIELD_TYPE_CHAR) {
// 效率低下,也可以直接计算变长域拷贝,但仍然会破坏封装
for (size_t row_index = 0, new_row_index = 0;
- row_index < ref_block->row_block_info().row_num; ++row_index) {
+ row_index < ref_block->row_block_info().row_num; ++row_index) {
// 不需要的row,每次处理到这个row时就跳过
if (need_filter_data && is_data_left_vec[row_index] == 0) {
continue;
@@ -337,20 +395,9 @@ bool RowBlockChanger::change_row_block(
write_helper.set_field_content(i, reinterpret_cast<char*>(slice), mem_pool);
}
}
- } else if ((reftype == OLAP_FIELD_TYPE_FLOAT && newtype == OLAP_FIELD_TYPE_DOUBLE)
- || (reftype == OLAP_FIELD_TYPE_INT && newtype == OLAP_FIELD_TYPE_DATE)
- || (reftype == OLAP_FIELD_TYPE_DATE && newtype == OLAP_FIELD_TYPE_DATETIME)
- || (reftype == OLAP_FIELD_TYPE_DATETIME && newtype == OLAP_FIELD_TYPE_DATE)
- || (reftype == OLAP_FIELD_TYPE_VARCHAR && newtype == OLAP_FIELD_TYPE_DATE)
- || (reftype == OLAP_FIELD_TYPE_VARCHAR && newtype == OLAP_FIELD_TYPE_FLOAT)
- || (reftype == OLAP_FIELD_TYPE_VARCHAR && newtype == OLAP_FIELD_TYPE_DOUBLE)
- || (reftype == OLAP_FIELD_TYPE_VARCHAR && newtype == OLAP_FIELD_TYPE_TINYINT)
- || (reftype == OLAP_FIELD_TYPE_VARCHAR && newtype == OLAP_FIELD_TYPE_SMALLINT)
- || (reftype == OLAP_FIELD_TYPE_VARCHAR && newtype == OLAP_FIELD_TYPE_INT)
- || (reftype == OLAP_FIELD_TYPE_VARCHAR && newtype == OLAP_FIELD_TYPE_BIGINT)
- || (reftype == OLAP_FIELD_TYPE_VARCHAR && newtype == OLAP_FIELD_TYPE_LARGEINT)) {
+ } else if (ConvertTypeResolver::instance()->get_convert_type_info(reftype, newtype)) {
for (size_t row_index = 0, new_row_index = 0;
- row_index < ref_block->row_block_info().row_num; ++row_index) {
+ row_index < ref_block->row_block_info().row_num; ++row_index) {
// Skip filtered rows
if (need_filter_data && is_data_left_vec[row_index] == 0) {
continue;
@@ -363,10 +410,13 @@ bool RowBlockChanger::change_row_block(
write_helper.set_not_null(i);
const Field* ref_field = read_helper.column_schema(ref_column);
char* ref_value = read_helper.cell_ptr(ref_column);
- OLAPStatus st = write_helper.convert_from(i, ref_value, ref_field->type_info(), mem_pool);
+ OLAPStatus st = write_helper.convert_from(i, ref_value,
+ ref_field->type_info(), mem_pool);
if (st != OLAPStatus::OLAP_SUCCESS) {
- LOG(WARNING) << "the column type which was altered from was unsupported."
- << "status:" << st << ", from_type=" << reftype << ", to_type=" << newtype;
+ LOG(WARNING)
+ << "the column type which was altered from was unsupported."
+ << "status:" << st << ", from_type=" << reftype
+ << ", to_type=" << newtype;
return false;
}
}
@@ -394,21 +444,23 @@ bool RowBlockChanger::change_row_block(
CONVERT_FROM_TYPE(uint64_t);
default:
LOG(WARNING) << "the column type which was altered from was unsupported."
- << " from_type=" << ref_block->tablet_schema().column(ref_column).type();
+ << " from_type="
+ << ref_block->tablet_schema().column(ref_column).type();
return false;
}
if (newtype < reftype) {
VLOG(3) << "type degraded while altering column. "
<< "column=" << mutable_block->tablet_schema().column(i).name()
- << ", origin_type=" << ref_block->tablet_schema().column(ref_column).type()
+ << ", origin_type="
+ << ref_block->tablet_schema().column(ref_column).type()
<< ", alter_type=" << mutable_block->tablet_schema().column(i).type();
}
}
} else {
// 新增列,写入默认值
for (size_t row_index = 0, new_row_index = 0;
- row_index < ref_block->row_block_info().row_num; ++row_index) {
+ row_index < ref_block->row_block_info().row_num; ++row_index) {
// 不需要的row,每次处理到这个row时就跳过
if (need_filter_data && is_data_left_vec[row_index] == 0) {
continue;
@@ -420,8 +472,8 @@ bool RowBlockChanger::change_row_block(
write_helper.set_null(i);
} else {
write_helper.set_not_null(i);
- write_helper.set_field_content(
- i, _schema_mapping[i].default_value->ptr(), mem_pool);
+ write_helper.set_field_content(i, _schema_mapping[i].default_value->ptr(),
+ mem_pool);
}
}
}
@@ -438,9 +490,8 @@ bool RowBlockChanger::change_row_block(
#undef TYPE_REINTERPRET_CAST
#undef ASSIGN_DEFAULT_VALUE
-RowBlockSorter::RowBlockSorter(RowBlockAllocator* row_block_allocator) :
- _row_block_allocator(row_block_allocator),
- _swap_row_block(nullptr) {}
+RowBlockSorter::RowBlockSorter(RowBlockAllocator* row_block_allocator)
+ : _row_block_allocator(row_block_allocator), _swap_row_block(nullptr) {}
RowBlockSorter::~RowBlockSorter() {
if (_swap_row_block) {
@@ -459,8 +510,9 @@ bool RowBlockSorter::sort(RowBlock** row_block) {
_swap_row_block = nullptr;
}
- if (_row_block_allocator->allocate(&_swap_row_block, row_num, null_supported) != OLAP_SUCCESS
- || _swap_row_block == nullptr) {
+ if (_row_block_allocator->allocate(&_swap_row_block, row_num, null_supported) !=
+ OLAP_SUCCESS ||
+ _swap_row_block == nullptr) {
LOG(WARNING) << "fail to allocate memory.";
return false;
}
@@ -478,7 +530,7 @@ bool RowBlockSorter::sort(RowBlock** row_block) {
// create an list of row cursor as long as the number of rows in data block.
for (size_t i = 0; i < (*row_block)->row_block_info().row_num; ++i) {
- if ((row_cursor_list[i] = new(nothrow) RowCursor()) == nullptr) {
+ if ((row_cursor_list[i] = new (nothrow) RowCursor()) == nullptr) {
LOG(WARNING) << "failed to malloc RowCursor. size=" << sizeof(RowCursor);
goto SORT_ERR_EXIT;
}
@@ -521,11 +573,10 @@ SORT_ERR_EXIT:
return false;
}
-RowBlockAllocator::RowBlockAllocator(const TabletSchema& tablet_schema,
- size_t memory_limitation) :
- _tablet_schema(tablet_schema),
- _memory_allocated(0),
- _memory_limitation(memory_limitation) {
+RowBlockAllocator::RowBlockAllocator(const TabletSchema& tablet_schema, size_t memory_limitation)
+ : _tablet_schema(tablet_schema),
+ _memory_allocated(0),
+ _memory_limitation(memory_limitation) {
_row_len = 0;
_row_len = tablet_schema.row_size();
@@ -538,13 +589,10 @@ RowBlockAllocator::~RowBlockAllocator() {
}
}
-OLAPStatus RowBlockAllocator::allocate(RowBlock** row_block,
- size_t num_rows,
- bool null_supported) {
+OLAPStatus RowBlockAllocator::allocate(RowBlock** row_block, size_t num_rows, bool null_supported) {
size_t row_block_size = _row_len * num_rows;
- if (_memory_limitation > 0
- && _memory_allocated + row_block_size > _memory_limitation) {
+ if (_memory_limitation > 0 && _memory_allocated + row_block_size > _memory_limitation) {
VLOG(3) << "RowBlockAllocator::alocate() memory exceeded. "
<< "m_memory_allocated=" << _memory_allocated;
*row_block = nullptr;
@@ -552,7 +600,7 @@ OLAPStatus RowBlockAllocator::allocate(RowBlock** row_block,
}
// TODO(lijiao) : 为什么舍弃原有的m_row_block_buffer
- *row_block = new(nothrow) RowBlock(&_tablet_schema);
+ *row_block = new (nothrow) RowBlock(&_tablet_schema);
if (*row_block == nullptr) {
LOG(WARNING) << "failed to malloc RowBlock. size=" << sizeof(RowBlock);
@@ -570,10 +618,8 @@ OLAPStatus RowBlockAllocator::allocate(RowBlock** row_block,
}
_memory_allocated += row_block_size;
- VLOG(3) << "RowBlockAllocator::allocate() this=" << this
- << ", num_rows=" << num_rows
- << ", m_memory_allocated=" << _memory_allocated
- << ", row_block_addr=" << *row_block;
+ VLOG(3) << "RowBlockAllocator::allocate() this=" << this << ", num_rows=" << num_rows
+ << ", m_memory_allocated=" << _memory_allocated << ", row_block_addr=" << *row_block;
return res;
}
@@ -587,8 +633,7 @@ void RowBlockAllocator::release(RowBlock* row_block) {
VLOG(3) << "RowBlockAllocator::release() this=" << this
<< ", num_rows=" << row_block->capacity()
- << ", m_memory_allocated=" << _memory_allocated
- << ", row_block_addr=" << row_block;
+ << ", m_memory_allocated=" << _memory_allocated << ", row_block_addr=" << row_block;
delete row_block;
}
@@ -596,10 +641,8 @@ RowBlockMerger::RowBlockMerger(TabletSharedPtr tablet) : _tablet(tablet) {}
RowBlockMerger::~RowBlockMerger() {}
-bool RowBlockMerger::merge(
- const vector<RowBlock*>& row_block_arr,
- RowsetWriter* rowset_writer,
- uint64_t* merged_rows) {
+bool RowBlockMerger::merge(const vector<RowBlock*>& row_block_arr, RowsetWriter* rowset_writer,
+ uint64_t* merged_rows) {
uint64_t tmp_merged_rows = 0;
RowCursor row_cursor;
std::unique_ptr<MemTracker> tracker(new MemTracker(-1));
@@ -614,7 +657,8 @@ bool RowBlockMerger::merge(
row_cursor.allocate_memory_for_string_type(_tablet->tablet_schema());
while (_heap.size() > 0) {
- init_row_with_others(&row_cursor, *(_heap.top().row_cursor), mem_pool.get(), agg_object_pool.get());
+ init_row_with_others(&row_cursor, *(_heap.top().row_cursor), mem_pool.get(),
+ agg_object_pool.get());
if (!_pop_heap()) {
goto MERGE_ERR;
@@ -671,7 +715,7 @@ bool RowBlockMerger::_make_heap(const vector<RowBlock*>& row_block_arr) {
MergeElement element;
element.row_block = row_block;
element.row_block_index = 0;
- element.row_cursor = new(nothrow) RowCursor();
+ element.row_cursor = new (nothrow) RowCursor();
if (element.row_cursor == nullptr) {
LOG(FATAL) << "failed to malloc RowCursor. size=" << sizeof(RowCursor);
@@ -707,30 +751,25 @@ bool RowBlockMerger::_pop_heap() {
return true;
}
-bool LinkedSchemaChange::process(
- RowsetReaderSharedPtr rowset_reader,
- RowsetWriter* new_rowset_writer,
- TabletSharedPtr new_tablet,
- TabletSharedPtr base_tablet) {
+bool LinkedSchemaChange::process(RowsetReaderSharedPtr rowset_reader,
+ RowsetWriter* new_rowset_writer, TabletSharedPtr new_tablet,
+ TabletSharedPtr base_tablet) {
OLAPStatus status = new_rowset_writer->add_rowset_for_linked_schema_change(
- rowset_reader->rowset(), _row_block_changer.get_schema_mapping());
+ rowset_reader->rowset(), _row_block_changer.get_schema_mapping());
if (status != OLAP_SUCCESS) {
LOG(WARNING) << "fail to convert rowset."
<< ", new_tablet=" << new_tablet->full_name()
<< ", base_tablet=" << base_tablet->full_name()
- << ", version=" << new_rowset_writer->version().first
- << "-" << new_rowset_writer->version().second;
+ << ", version=" << new_rowset_writer->version().first << "-"
+ << new_rowset_writer->version().second;
return false;
}
return true;
}
-SchemaChangeDirectly::SchemaChangeDirectly(
- const RowBlockChanger& row_block_changer) :
- _row_block_changer(row_block_changer),
- _row_block_allocator(nullptr),
- _cursor(nullptr) { }
+SchemaChangeDirectly::SchemaChangeDirectly(const RowBlockChanger& row_block_changer)
+ : _row_block_changer(row_block_changer), _row_block_allocator(nullptr), _cursor(nullptr) {}
SchemaChangeDirectly::~SchemaChangeDirectly() {
VLOG(3) << "~SchemaChangeDirectly()";
@@ -751,8 +790,7 @@ bool SchemaChangeDirectly::_write_row_block(RowsetWriter* rowset_writer, RowBloc
}
bool SchemaChangeDirectly::process(RowsetReaderSharedPtr rowset_reader, RowsetWriter* rowset_writer,
- TabletSharedPtr new_tablet,
- TabletSharedPtr base_tablet) {
+ TabletSharedPtr new_tablet, TabletSharedPtr base_tablet) {
if (_row_block_allocator == nullptr) {
_row_block_allocator = new RowBlockAllocator(new_tablet->tablet_schema(), 0);
if (_row_block_allocator == nullptr) {
@@ -762,7 +800,7 @@ bool SchemaChangeDirectly::process(RowsetReaderSharedPtr rowset_reader, RowsetWr
}
if (nullptr == _cursor) {
- _cursor = new(nothrow) RowCursor();
+ _cursor = new (nothrow) RowCursor();
if (nullptr == _cursor) {
LOG(WARNING) << "fail to allocate row cursor.";
return false;
@@ -790,7 +828,8 @@ bool SchemaChangeDirectly::process(RowsetReaderSharedPtr rowset_reader, RowsetWr
res = rowset_writer->flush();
if (res != OLAP_SUCCESS) {
LOG(WARNING) << "create empty version for schema change failed."
- << "version=" << rowset_writer->version().first << "-" << rowset_writer->version().second;
+ << "version=" << rowset_writer->version().first << "-"
+ << rowset_writer->version().second;
return false;
}
return true;
@@ -809,17 +848,16 @@ bool SchemaChangeDirectly::process(RowsetReaderSharedPtr rowset_reader, RowsetWr
rowset_reader->next_block(&ref_row_block);
while (ref_row_block != nullptr && ref_row_block->has_remaining()) {
// 注意这里强制分配和旧块等大的块(小了可能会存不下)
- if (new_row_block == nullptr
- || new_row_block->capacity() < ref_row_block->row_block_info().row_num) {
+ if (new_row_block == nullptr ||
+ new_row_block->capacity() < ref_row_block->row_block_info().row_num) {
if (new_row_block != nullptr) {
_row_block_allocator->release(new_row_block);
new_row_block = nullptr;
}
- if (OLAP_SUCCESS != _row_block_allocator->allocate(
- &new_row_block,
- ref_row_block->row_block_info().row_num,
- true)) {
+ if (OLAP_SUCCESS !=
+ _row_block_allocator->allocate(&new_row_block,
+ ref_row_block->row_block_info().row_num, true)) {
LOG(WARNING) << "failed to allocate RowBlock.";
result = false;
goto DIRECTLY_PROCESS_ERR;
@@ -830,10 +868,8 @@ bool SchemaChangeDirectly::process(RowsetReaderSharedPtr rowset_reader, RowsetWr
// 将ref改为new。这一步按道理来说确实需要等大的块,但理论上和writer无关。
uint64_t filtered_rows = 0;
- if (!_row_block_changer.change_row_block(ref_row_block,
- rowset_reader->version().second,
- new_row_block,
- &filtered_rows)) {
+ if (!_row_block_changer.change_row_block(ref_row_block, rowset_reader->version().second,
+ new_row_block, &filtered_rows)) {
LOG(WARNING) << "failed to change data in row block.";
result = false;
goto DIRECTLY_PROCESS_ERR;
@@ -861,23 +897,21 @@ bool SchemaChangeDirectly::process(RowsetReaderSharedPtr rowset_reader, RowsetWr
// Check row num changes
if (config::row_nums_check) {
- if (rowset_reader->rowset()->num_rows()
- != rowset_writer->num_rows() + merged_rows() + filtered_rows()) {
+ if (rowset_reader->rowset()->num_rows() !=
+ rowset_writer->num_rows() + merged_rows() + filtered_rows()) {
LOG(WARNING) << "fail to check row num! "
- << "source_rows=" << rowset_reader->rowset()->num_rows()
- << ", merged_rows=" << merged_rows()
- << ", filtered_rows=" << filtered_rows()
- << ", new_index_rows=" << rowset_writer->num_rows();
+ << "source_rows=" << rowset_reader->rowset()->num_rows()
+ << ", merged_rows=" << merged_rows()
+ << ", filtered_rows=" << filtered_rows()
+ << ", new_index_rows=" << rowset_writer->num_rows();
result = false;
}
LOG(INFO) << "all row nums. source_rows=" << rowset_reader->rowset()->num_rows()
- << ", merged_rows=" << merged_rows()
- << ", filtered_rows=" << filtered_rows()
+ << ", merged_rows=" << merged_rows() << ", filtered_rows=" << filtered_rows()
<< ", new_index_rows=" << rowset_writer->num_rows();
} else {
LOG(INFO) << "all row nums. source_rows=" << rowset_reader->rowset()->num_rows()
- << ", merged_rows=" << merged_rows()
- << ", filtered_rows=" << filtered_rows()
+ << ", merged_rows=" << merged_rows() << ", filtered_rows=" << filtered_rows()
<< ", new_index_rows=" << rowset_writer->num_rows();
}
@@ -890,10 +924,10 @@ DIRECTLY_PROCESS_ERR:
}
SchemaChangeWithSorting::SchemaChangeWithSorting(const RowBlockChanger& row_block_changer,
- size_t memory_limitation) :
- _row_block_changer(row_block_changer),
- _memory_limitation(memory_limitation),
- _row_block_allocator(nullptr) {
+ size_t memory_limitation)
+ : _row_block_changer(row_block_changer),
+ _memory_limitation(memory_limitation),
+ _row_block_allocator(nullptr) {
// 每次SchemaChange做外排的时候,会写一些临时版本(比如999,1000,1001),为避免Cache冲突,临时
// 版本进行2个处理:
// 1. 随机值作为VersionHash
@@ -908,13 +942,12 @@ SchemaChangeWithSorting::~SchemaChangeWithSorting() {
SAFE_DELETE(_row_block_allocator);
}
-bool SchemaChangeWithSorting::process(
- RowsetReaderSharedPtr rowset_reader,
- RowsetWriter* new_rowset_writer,
- TabletSharedPtr new_tablet,
- TabletSharedPtr base_tablet) {
+bool SchemaChangeWithSorting::process(RowsetReaderSharedPtr rowset_reader,
+ RowsetWriter* new_rowset_writer, TabletSharedPtr new_tablet,
+ TabletSharedPtr base_tablet) {
if (_row_block_allocator == nullptr) {
- _row_block_allocator = new (nothrow) RowBlockAllocator(new_tablet->tablet_schema(), _memory_limitation);
+ _row_block_allocator =
+ new (nothrow) RowBlockAllocator(new_tablet->tablet_schema(), _memory_limitation);
if (_row_block_allocator == nullptr) {
LOG(FATAL) << "failed to malloc RowBlockAllocator. size=" << sizeof(RowBlockAllocator);
return false;
@@ -932,14 +965,13 @@ bool SchemaChangeWithSorting::process(
res = new_rowset_writer->flush();
if (res != OLAP_SUCCESS) {
LOG(WARNING) << "create empty version for schema change failed."
- << " version=" << new_rowset_writer->version().first
- << "-" << new_rowset_writer->version().second;
+ << " version=" << new_rowset_writer->version().first << "-"
+ << new_rowset_writer->version().second;
return false;
}
return true;
}
-
bool result = true;
RowBlockSorter row_block_sorter(_row_block_allocator);
@@ -966,8 +998,9 @@ bool SchemaChangeWithSorting::process(
RowBlock* ref_row_block = nullptr;
rowset_reader->next_block(&ref_row_block);
while (ref_row_block != nullptr && ref_row_block->has_remaining()) {
- if (OLAP_SUCCESS != _row_block_allocator->allocate(
- &new_row_block, ref_row_block->row_block_info().row_num, true)) {
+ if (OLAP_SUCCESS != _row_block_allocator->allocate(&new_row_block,
+ ref_row_block->row_block_info().row_num,
+ true)) {
LOG(WARNING) << "failed to allocate RowBlock.";
result = false;
goto SORTING_PROCESS_ERR;
@@ -986,14 +1019,11 @@ bool SchemaChangeWithSorting::process(
if (use_beta_rowset) {
new_rowset_type = BETA_ROWSET;
}
- if (!_internal_sorting(row_block_arr,
- Version(_temp_delta_versions.second,
- _temp_delta_versions.second),
- rowset_reader->version_hash(),
- new_tablet,
- new_rowset_type,
- segments_overlap,
- &rowset)) {
+ if (!_internal_sorting(
+ row_block_arr,
+ Version(_temp_delta_versions.second, _temp_delta_versions.second),
+ rowset_reader->version_hash(), new_tablet, new_rowset_type,
+ segments_overlap, &rowset)) {
LOG(WARNING) << "failed to sorting internally.";
result = false;
goto SORTING_PROCESS_ERR;
@@ -1001,8 +1031,8 @@ bool SchemaChangeWithSorting::process(
src_rowsets.push_back(rowset);
- for (vector<RowBlock*>::iterator it = row_block_arr.begin();
- it != row_block_arr.end(); ++it) {
+ for (vector<RowBlock*>::iterator it = row_block_arr.begin(); it != row_block_arr.end();
+ ++it) {
_row_block_allocator->release(*it);
}
@@ -1014,8 +1044,7 @@ bool SchemaChangeWithSorting::process(
}
uint64_t filtered_rows = 0;
- if (!_row_block_changer.change_row_block(ref_row_block,
- rowset_reader->version().second,
+ if (!_row_block_changer.change_row_block(ref_row_block, rowset_reader->version().second,
new_row_block, &filtered_rows)) {
LOG(WARNING) << "failed to change data in row block.";
result = false;
@@ -1051,11 +1080,8 @@ bool SchemaChangeWithSorting::process(
}
if (!_internal_sorting(row_block_arr,
Version(_temp_delta_versions.second, _temp_delta_versions.second),
- rowset_reader->version_hash(),
- new_tablet,
- new_rowset_type,
- segments_overlap,
- &rowset)) {
+ rowset_reader->version_hash(), new_tablet, new_rowset_type,
+ segments_overlap, &rowset)) {
LOG(WARNING) << "failed to sorting internally.";
result = false;
goto SORTING_PROCESS_ERR;
@@ -1063,8 +1089,8 @@ bool SchemaChangeWithSorting::process(
src_rowsets.push_back(rowset);
- for (vector<RowBlock*>::iterator it = row_block_arr.begin();
- it != row_block_arr.end(); ++it) {
+ for (vector<RowBlock*>::iterator it = row_block_arr.begin(); it != row_block_arr.end();
+ ++it) {
_row_block_allocator->release(*it);
}
@@ -1078,8 +1104,8 @@ bool SchemaChangeWithSorting::process(
res = new_rowset_writer->flush();
if (res != OLAP_SUCCESS) {
LOG(WARNING) << "create empty version for schema change failed."
- << " version=" << new_rowset_writer->version().first
- << "-" << new_rowset_writer->version().second;
+ << " version=" << new_rowset_writer->version().first << "-"
+ << new_rowset_writer->version().second;
return false;
}
} else if (!_external_sorting(src_rowsets, new_rowset_writer, new_tablet)) {
@@ -1092,8 +1118,8 @@ bool SchemaChangeWithSorting::process(
// Check row num changes
if (config::row_nums_check) {
- if (rowset_reader->rowset()->num_rows()
- != new_rowset_writer->num_rows() + merged_rows() + filtered_rows()) {
+ if (rowset_reader->rowset()->num_rows() !=
+ new_rowset_writer->num_rows() + merged_rows() + filtered_rows()) {
LOG(WARNING) << "fail to check row num!"
<< " source_rows=" << rowset_reader->rowset()->num_rows()
<< ", merged_rows=" << merged_rows()
@@ -1102,26 +1128,23 @@ bool SchemaChangeWithSorting::process(
result = false;
}
LOG(INFO) << "all row nums. source_rows=" << rowset_reader->rowset()->num_rows()
- << ", merged_rows=" << merged_rows()
- << ", filtered_rows=" << filtered_rows()
+ << ", merged_rows=" << merged_rows() << ", filtered_rows=" << filtered_rows()
<< ", new_index_rows=" << new_rowset_writer->num_rows();
} else {
LOG(INFO) << "all row nums. source_rows=" << rowset_reader->rowset()->num_rows()
- << ", merged_rows=" << merged_rows()
- << ", filtered_rows=" << filtered_rows()
+ << ", merged_rows=" << merged_rows() << ", filtered_rows=" << filtered_rows()
<< ", new_index_rows=" << new_rowset_writer->num_rows();
}
SORTING_PROCESS_ERR:
// remove the intermediate rowsets generated by internal sorting
- for (vector<RowsetSharedPtr>::iterator it = src_rowsets.begin();
- it != src_rowsets.end(); ++it) {
+ for (vector<RowsetSharedPtr>::iterator it = src_rowsets.begin(); it != src_rowsets.end();
+ ++it) {
StorageEngine::instance()->add_unused_rowset(*it);
}
- for (vector<RowBlock*>::iterator it = row_block_arr.begin();
- it != row_block_arr.end(); ++it) {
+ for (vector<RowBlock*>::iterator it = row_block_arr.begin(); it != row_block_arr.end(); ++it) {
_row_block_allocator->release(*it);
}
@@ -1130,8 +1153,7 @@ SORTING_PROCESS_ERR:
}
bool SchemaChangeWithSorting::_internal_sorting(const vector<RowBlock*>& row_block_arr,
- const Version& version,
- VersionHash version_hash,
+ const Version& version, VersionHash version_hash,
TabletSharedPtr new_tablet,
RowsetTypePB new_rowset_type,
SegmentsOverlapPB segments_overlap,
@@ -1162,10 +1184,12 @@ bool SchemaChangeWithSorting::_internal_sorting(const vector<RowBlock*>& row_blo
if (!merger.merge(row_block_arr, rowset_writer.get(), &merged_rows)) {
LOG(WARNING) << "failed to merge row blocks.";
- new_tablet->data_dir()->remove_pending_ids(ROWSET_ID_PREFIX + rowset_writer->rowset_id().to_string());
+ new_tablet->data_dir()->remove_pending_ids(ROWSET_ID_PREFIX +
+ rowset_writer->rowset_id().to_string());
return false;
}
- new_tablet->data_dir()->remove_pending_ids(ROWSET_ID_PREFIX + rowset_writer->rowset_id().to_string());
+ new_tablet->data_dir()->remove_pending_ids(ROWSET_ID_PREFIX +
+ rowset_writer->rowset_id().to_string());
add_merged_rows(merged_rows);
*rowset = rowset_writer->build();
return true;
@@ -1186,11 +1210,12 @@ bool SchemaChangeWithSorting::_external_sorting(vector<RowsetSharedPtr>& src_row
}
Merger::Statistics stats;
- auto res = Merger::merge_rowsets(new_tablet, READER_ALTER_TABLE, rs_readers, rowset_writer, &stats);
+ auto res = Merger::merge_rowsets(new_tablet, READER_ALTER_TABLE, rs_readers, rowset_writer,
+ &stats);
if (res != OLAP_SUCCESS) {
LOG(WARNING) << "failed to merge rowsets. tablet=" << new_tablet->full_name()
- << ", version=" << rowset_writer->version().first
- << "-" << rowset_writer->version().second;
+ << ", version=" << rowset_writer->version().first << "-"
+ << rowset_writer->version().second;
return false;
}
add_merged_rows(stats.merged_rows);
@@ -1207,7 +1232,8 @@ OLAPStatus SchemaChangeHandler::process_alter_tablet_v2(const TAlterTabletReqV2&
<< ", alter_version_hash=" << request.alter_version_hash;
// Lock schema_change_lock util schema change info is stored in tablet header
- if (!StorageEngine::instance()->tablet_manager()->try_schema_change_lock(request.base_tablet_id)) {
+ if (!StorageEngine::instance()->tablet_manager()->try_schema_change_lock(
+ request.base_tablet_id)) {
LOG(WARNING) << "failed to obtain schema change lock. "
<< "base_tablet=" << request.base_tablet_id;
return OLAP_ERR_TRY_LOCK_FAILED;
@@ -1254,7 +1280,8 @@ OLAPStatus SchemaChangeHandler::_do_process_alter_tablet_v2(const TAlterTabletRe
return res;
}
- LOG(INFO) << "finish to validate alter tablet request. begin to convert data from base tablet to new tablet"
+ LOG(INFO) << "finish to validate alter tablet request. begin to convert data from base tablet "
+ "to new tablet"
<< " base_tablet=" << base_tablet->full_name()
<< " new_tablet=" << new_tablet->full_name();
@@ -1298,7 +1325,8 @@ OLAPStatus SchemaChangeHandler::_do_process_alter_tablet_v2(const TAlterTabletRe
// should check the max_version >= request.alter_version, if not the convert is useless
RowsetSharedPtr max_rowset = base_tablet->rowset_with_max_version();
if (max_rowset == nullptr || max_rowset->end_version() < request.alter_version) {
- LOG(WARNING) << "base tablet's max version=" << (max_rowset == nullptr ? 0 : max_rowset->end_version())
+ LOG(WARNING) << "base tablet's max version="
+ << (max_rowset == nullptr ? 0 : max_rowset->end_version())
<< " is less than request version=" << request.alter_version;
res = OLAP_ERR_VERSION_NOT_EXIST;
break;
@@ -1335,7 +1363,8 @@ OLAPStatus SchemaChangeHandler::_do_process_alter_tablet_v2(const TAlterTabletRe
}
}
- res = delete_handler.init(base_tablet->tablet_schema(), base_tablet->delete_predicates(), end_version);
+ res = delete_handler.init(base_tablet->tablet_schema(), base_tablet->delete_predicates(),
+ end_version);
if (res != OLAP_SUCCESS) {
LOG(WARNING) << "init delete handler failed. base_tablet=" << base_tablet->full_name()
<< ", end_version=" << end_version;
@@ -1395,7 +1424,7 @@ OLAPStatus SchemaChangeHandler::_do_process_alter_tablet_v2(const TAlterTabletRe
break;
}
new_tablet->save_meta();
- } while(0);
+ } while (0);
if (res == OLAP_SUCCESS) {
// _validate_alter_result should be outside the above while loop.
@@ -1413,11 +1442,10 @@ OLAPStatus SchemaChangeHandler::_do_process_alter_tablet_v2(const TAlterTabletRe
return res;
}
-OLAPStatus SchemaChangeHandler::schema_version_convert(
- TabletSharedPtr base_tablet,
- TabletSharedPtr new_tablet,
- RowsetSharedPtr* base_rowset,
- RowsetSharedPtr* new_rowset) {
+OLAPStatus SchemaChangeHandler::schema_version_convert(TabletSharedPtr base_tablet,
+ TabletSharedPtr new_tablet,
+ RowsetSharedPtr* base_rowset,
+ RowsetSharedPtr* new_rowset) {
OLAPStatus res = OLAP_SUCCESS;
LOG(INFO) << "begin to convert delta version for schema changing. "
<< "base_tablet=" << base_tablet->full_name()
@@ -1429,11 +1457,8 @@ OLAPStatus SchemaChangeHandler::schema_version_convert(
bool sc_sorting = false;
bool sc_directly = false;
- if (OLAP_SUCCESS != (res = _parse_request(base_tablet,
- new_tablet,
- &rb_changer,
- &sc_sorting,
- &sc_directly))) {
+ if (OLAP_SUCCESS !=
+ (res = _parse_request(base_tablet, new_tablet, &rb_changer, &sc_sorting, &sc_directly))) {
LOG(WARNING) << "failed to parse the request. res=" << res;
return res;
}
@@ -1444,16 +1469,16 @@ OLAPStatus SchemaChangeHandler::schema_version_convert(
SchemaChange* sc_procedure = nullptr;
if (sc_sorting) {
size_t memory_limitation = config::memory_limitation_per_thread_for_schema_change;
- LOG(INFO) << "doing schema change with sorting for base_tablet " << base_tablet->full_name();
- sc_procedure = new(nothrow) SchemaChangeWithSorting(
- rb_changer,
- memory_limitation * 1024 * 1024 * 1024);
+ LOG(INFO) << "doing schema change with sorting for base_tablet "
+ << base_tablet->full_name();
+ sc_procedure = new (nothrow)
+ SchemaChangeWithSorting(rb_changer, memory_limitation * 1024 * 1024 * 1024);
} else if (sc_directly) {
LOG(INFO) << "doing schema change directly for base_tablet " << base_tablet->full_name();
- sc_procedure = new(nothrow) SchemaChangeDirectly(rb_changer);
+ sc_procedure = new (nothrow) SchemaChangeDirectly(rb_changer);
} else {
LOG(INFO) << "doing linked schema change for base_tablet " << base_tablet->full_name();
- sc_procedure = new(nothrow) LinkedSchemaChange(rb_changer);
+ sc_procedure = new (nothrow) LinkedSchemaChange(rb_changer);
}
if (sc_procedure == nullptr) {
@@ -1505,18 +1530,20 @@ OLAPStatus SchemaChangeHandler::schema_version_convert(
if ((*base_rowset)->is_pending()) {
LOG(WARNING) << "failed to process the transaction when schema change. "
<< "tablet=" << new_tablet->full_name() << "'"
- << ", transaction="<< (*base_rowset)->txn_id();
+ << ", transaction=" << (*base_rowset)->txn_id();
} else {
LOG(WARNING) << "failed to process the version. "
- << "version=" << (*base_rowset)->version().first
- << "-" << (*base_rowset)->version().second;
+ << "version=" << (*base_rowset)->version().first << "-"
+ << (*base_rowset)->version().second;
}
res = OLAP_ERR_INPUT_PARAMETER_ERROR;
- new_tablet->data_dir()->remove_pending_ids(ROWSET_ID_PREFIX + rowset_writer->rowset_id().to_string());
+ new_tablet->data_dir()->remove_pending_ids(ROWSET_ID_PREFIX +
+ rowset_writer->rowset_id().to_string());
goto SCHEMA_VERSION_CONVERT_ERR;
}
*new_rowset = rowset_writer->build();
- new_tablet->data_dir()->remove_pending_ids(ROWSET_ID_PREFIX + rowset_writer->rowset_id().to_string());
+ new_tablet->data_dir()->remove_pending_ids(ROWSET_ID_PREFIX +
+ rowset_writer->rowset_id().to_string());
if (*new_rowset == nullptr) {
LOG(WARNING) << "build rowset failed.";
res = OLAP_ERR_MALLOC_ERROR;
@@ -1536,15 +1563,13 @@ SCHEMA_VERSION_CONVERT_ERR:
SAFE_DELETE(sc_procedure);
LOG(WARNING) << "failed to convert rowsets. "
- << " base_tablet=" << base_tablet->full_name()
- << ", new_tablet=" << new_tablet->full_name()
- << " res = " << res;
+ << " base_tablet=" << base_tablet->full_name()
+ << ", new_tablet=" << new_tablet->full_name() << " res = " << res;
return res;
}
OLAPStatus SchemaChangeHandler::_get_versions_to_be_changed(
- TabletSharedPtr base_tablet,
- vector<Version>* versions_to_be_changed) {
+ TabletSharedPtr base_tablet, vector<Version>* versions_to_be_changed) {
RowsetSharedPtr rowset = base_tablet->rowset_with_max_version();
if (rowset == nullptr) {
LOG(WARNING) << "Tablet has no version. base_tablet=" << base_tablet->full_name();
@@ -1552,7 +1577,8 @@ OLAPStatus SchemaChangeHandler::_get_versions_to_be_changed(
}
vector<Version> span_versions;
- RETURN_NOT_OK(base_tablet->capture_consistent_versions(Version(0, rowset->version().second), &span_versions));
+ RETURN_NOT_OK(base_tablet->capture_consistent_versions(Version(0, rowset->version().second),
+ &span_versions));
for (uint32_t i = 0; i < span_versions.size(); i++) {
versions_to_be_changed->push_back(span_versions[i]);
}
@@ -1560,40 +1586,34 @@ OLAPStatus SchemaChangeHandler::_get_versions_to_be_changed(
return OLAP_SUCCESS;
}
-OLAPStatus SchemaChangeHandler::_add_alter_task(
- AlterTabletType alter_tablet_type,
- TabletSharedPtr base_tablet,
- TabletSharedPtr new_tablet,
- const vector<Version>& versions_to_be_changed) {
-
+OLAPStatus SchemaChangeHandler::_add_alter_task(AlterTabletType alter_tablet_type,
+ TabletSharedPtr base_tablet,
+ TabletSharedPtr new_tablet,
+ const vector<Version>& versions_to_be_changed) {
// check new tablet exists,
// prevent to set base's status after new's dropping (clear base's status)
if (StorageEngine::instance()->tablet_manager()->get_tablet(
- new_tablet->tablet_id(), new_tablet->schema_hash()) == nullptr) {
+ new_tablet->tablet_id(), new_tablet->schema_hash()) == nullptr) {
LOG(WARNING) << "new_tablet does not exist. tablet=" << new_tablet->full_name();
return OLAP_ERR_TABLE_NOT_FOUND;
}
// 1. 在新表和旧表中添加schema change标志
base_tablet->delete_alter_task();
- base_tablet->add_alter_task(new_tablet->tablet_id(),
- new_tablet->schema_hash(),
- versions_to_be_changed,
- alter_tablet_type);
+ base_tablet->add_alter_task(new_tablet->tablet_id(), new_tablet->schema_hash(),
+ versions_to_be_changed, alter_tablet_type);
base_tablet->save_meta();
- new_tablet->add_alter_task(base_tablet->tablet_id(),
- base_tablet->schema_hash(),
- vector<Version>(), // empty versions
+ new_tablet->add_alter_task(base_tablet->tablet_id(), base_tablet->schema_hash(),
+ vector<Version>(), // empty versions
alter_tablet_type);
new_tablet->save_meta();
LOG(INFO) << "successfully add alter task to both base and new";
return OLAP_SUCCESS;
}
-OLAPStatus SchemaChangeHandler::_save_alter_state(
- AlterTabletState state,
- TabletSharedPtr base_tablet,
- TabletSharedPtr new_tablet) {
+OLAPStatus SchemaChangeHandler::_save_alter_state(AlterTabletState state,
+ TabletSharedPtr base_tablet,
+ TabletSharedPtr new_tablet) {
WriteLock base_wlock(base_tablet->get_header_lock_ptr());
WriteLock new_wlock(new_tablet->get_header_lock_ptr());
AlterTabletTaskSharedPtr base_alter_task = base_tablet->alter_task();
@@ -1604,8 +1624,7 @@ OLAPStatus SchemaChangeHandler::_save_alter_state(
OLAPStatus res = base_tablet->set_alter_state(state);
if (res != OLAP_SUCCESS) {
LOG(WARNING) << "failed to set alter state to " << state
- << " tablet=" << base_tablet->full_name()
- << " res=" << res;
+ << " tablet=" << base_tablet->full_name() << " res=" << res;
return res;
}
base_tablet->save_meta();
@@ -1616,9 +1635,8 @@ OLAPStatus SchemaChangeHandler::_save_alter_state(
}
res = new_tablet->set_alter_state(state);
if (res != OLAP_SUCCESS) {
- LOG(WARNING) << "failed to set alter state to " << state
- << " tablet " << new_tablet->full_name()
- << " res" << res;
+ LOG(WARNING) << "failed to set alter state to " << state << " tablet "
+ << new_tablet->full_name() << " res" << res;
return res;
}
new_tablet->save_meta();
@@ -1641,16 +1659,16 @@ OLAPStatus SchemaChangeHandler::_convert_historical_rowsets(const SchemaChangePa
// change中增加了filter信息,在_parse_request中会设置filter的column信息
// 并在每次row block的change时,过滤一些数据
- RowBlockChanger rb_changer(sc_params.new_tablet->tablet_schema(),
- sc_params.base_tablet, sc_params.delete_handler);
+ RowBlockChanger rb_changer(sc_params.new_tablet->tablet_schema(), sc_params.base_tablet,
+ sc_params.delete_handler);
bool sc_sorting = false;
bool sc_directly = false;
SchemaChange* sc_procedure = nullptr;
// a. 解析Alter请求,转换成内部的表示形式
- OLAPStatus res = _parse_request(sc_params.base_tablet, sc_params.new_tablet,
- &rb_changer, &sc_sorting, &sc_directly);
+ OLAPStatus res = _parse_request(sc_params.base_tablet, sc_params.new_tablet, &rb_changer,
+ &sc_sorting, &sc_directly);
if (res != OLAP_SUCCESS) {
LOG(WARNING) << "failed to parse the request. res=" << res;
goto PROCESS_ALTER_EXIT;
@@ -1659,15 +1677,18 @@ OLAPStatus SchemaChangeHandler::_convert_historical_rowsets(const SchemaChangePa
// b. 生成历史数据转换器
if (sc_sorting) {
size_t memory_limitation = config::memory_limitation_per_thread_for_schema_change;
- LOG(INFO) << "doing schema change with sorting for base_tablet " << sc_params.base_tablet->full_name();
- sc_procedure = new(nothrow) SchemaChangeWithSorting(rb_changer,
- memory_limitation * 1024 * 1024 * 1024);
+ LOG(INFO) << "doing schema change with sorting for base_tablet "
+ << sc_params.base_tablet->full_name();
+ sc_procedure = new (nothrow)
+ SchemaChangeWithSorting(rb_changer, memory_limitation * 1024 * 1024 * 1024);
} else if (sc_directly) {
- LOG(INFO) << "doing schema change directly for base_tablet " << sc_params.base_tablet->full_name();
- sc_procedure = new(nothrow) SchemaChangeDirectly(rb_changer);
+ LOG(INFO) << "doing schema change directly for base_tablet "
+ << sc_params.base_tablet->full_name();
+ sc_procedure = new (nothrow) SchemaChangeDirectly(rb_changer);
} else {
- LOG(INFO) << "doing linked schema change for base_tablet " << sc_params.base_tablet->full_name();
- sc_procedure = new(nothrow) LinkedSchemaChange(rb_changer);
+ LOG(INFO) << "doing linked schema change for base_tablet "
+ << sc_params.base_tablet->full_name();
+ sc_procedure = new (nothrow) LinkedSchemaChange(rb_changer);
}
if (sc_procedure == nullptr) {
@@ -1679,8 +1700,8 @@ OLAPStatus SchemaChangeHandler::_convert_historical_rowsets(const SchemaChangePa
// c. 转换历史数据
for (auto& rs_reader : sc_params.ref_rowset_readers) {
- VLOG(10) << "begin to convert a history rowset. version="
- << rs_reader->version().first << "-" << rs_reader->version().second;
+ VLOG(10) << "begin to convert a history rowset. version=" << rs_reader->version().first
+ << "-" << rs_reader->version().second;
// set status for monitor
// 只要有一个new_table为running,ref table就设置为running
@@ -1714,15 +1735,18 @@ OLAPStatus SchemaChangeHandler::_convert_historical_rowsets(const SchemaChangePa
goto PROCESS_ALTER_EXIT;
}
- if (!sc_procedure->process(rs_reader, rowset_writer.get(), sc_params.new_tablet, sc_params.base_tablet)) {
+ if (!sc_procedure->process(rs_reader, rowset_writer.get(), sc_params.new_tablet,
+ sc_params.base_tablet)) {
LOG(WARNING) << "failed to process the version."
- << " version=" << rs_reader->version().first
- << "-" << rs_reader->version().second;
+ << " version=" << rs_reader->version().first << "-"
+ << rs_reader->version().second;
res = OLAP_ERR_INPUT_PARAMETER_ERROR;
- new_tablet->data_dir()->remove_pending_ids(ROWSET_ID_PREFIX + rowset_writer->rowset_id().to_string());
+ new_tablet->data_dir()->remove_pending_ids(ROWSET_ID_PREFIX +
+ rowset_writer->rowset_id().to_string());
goto PROCESS_ALTER_EXIT;
}
- new_tablet->data_dir()->remove_pending_ids(ROWSET_ID_PREFIX + rowset_writer->rowset_id().to_string());
+ new_tablet->data_dir()->remove_pending_ids(ROWSET_ID_PREFIX +
+ rowset_writer->rowset_id().to_string());
// 将新版本的数据加入header
// 为了防止死锁的出现,一定要先锁住旧表,再锁住新表
sc_params.new_tablet->obtain_push_lock();
@@ -1735,37 +1759,34 @@ OLAPStatus SchemaChangeHandler::_convert_historical_rowsets(const SchemaChangePa
res = sc_params.new_tablet->add_rowset(new_rowset, false);
if (res == OLAP_ERR_PUSH_VERSION_ALREADY_EXIST) {
LOG(WARNING) << "version already exist, version revert occured. "
- << "tablet=" << sc_params.new_tablet->full_name()
- << ", version='" << rs_reader->version().first
- << "-" << rs_reader->version().second;
+ << "tablet=" << sc_params.new_tablet->full_name() << ", version='"
+ << rs_reader->version().first << "-" << rs_reader->version().second;
StorageEngine::instance()->add_unused_rowset(new_rowset);
res = OLAP_SUCCESS;
} else if (res != OLAP_SUCCESS) {
LOG(WARNING) << "failed to register new version. "
<< " tablet=" << sc_params.new_tablet->full_name()
- << ", version=" << rs_reader->version().first
- << "-" << rs_reader->version().second;
+ << ", version=" << rs_reader->version().first << "-"
+ << rs_reader->version().second;
StorageEngine::instance()->add_unused_rowset(new_rowset);
sc_params.new_tablet->release_push_lock();
goto PROCESS_ALTER_EXIT;
} else {
VLOG(3) << "register new version. tablet=" << sc_params.new_tablet->full_name()
- << ", version=" << rs_reader->version().first
- << "-" << rs_reader->version().second;
+ << ", version=" << rs_reader->version().first << "-"
+ << rs_reader->version().second;
}
sc_params.new_tablet->release_push_lock();
VLOG(10) << "succeed to convert a history version."
- << " version=" << rs_reader->version().first
- << "-" << rs_reader->version().second;
+ << " version=" << rs_reader->version().first << "-" << rs_reader->version().second;
}
// XXX: 此时应该不取消SchemaChange状态,因为新Delta还要转换成新旧Schema的版本
-PROCESS_ALTER_EXIT:
- {
- // save tablet meta here because rowset meta is not saved during add rowset
- WriteLock new_wlock(sc_params.new_tablet->get_header_lock_ptr());
- sc_params.new_tablet->save_meta();
- }
+PROCESS_ALTER_EXIT : {
+ // save tablet meta here because rowset meta is not saved during add rowset
+ WriteLock new_wlock(sc_params.new_tablet->get_header_lock_ptr());
+ sc_params.new_tablet->save_meta();
+}
if (res == OLAP_SUCCESS) {
Version test_version(0, end_version);
res = sc_params.new_tablet->check_version_integrity(test_version);
@@ -1782,14 +1803,13 @@ PROCESS_ALTER_EXIT:
// 分析column的mapping以及filter key的mapping
OLAPStatus SchemaChangeHandler::_parse_request(TabletSharedPtr base_tablet,
TabletSharedPtr new_tablet,
- RowBlockChanger* rb_changer,
- bool* sc_sorting,
+ RowBlockChanger* rb_changer, bool* sc_sorting,
bool* sc_directly) {
OLAPStatus res = OLAP_SUCCESS;
// set column mapping
for (int i = 0, new_schema_size = new_tablet->tablet_schema().num_columns();
- i < new_schema_size; ++i) {
+ i < new_schema_size; ++i) {
const TabletColumn& new_column = new_tablet->tablet_schema().column(i);
const string& column_name = new_column.name();
ColumnMapping* column_mapping = rb_changer->get_mutable_column_mapping(i);
@@ -1799,8 +1819,8 @@ OLAPStatus SchemaChangeHandler::_parse_request(TabletSharedPtr base_tablet,
if (column_index < 0) {
LOG(WARNING) << "referenced column was missing. "
- << "[column=" << column_name
- << " referenced_column=" << column_index << "]";
+ << "[column=" << column_name << " referenced_column=" << column_index
+ << "]";
return OLAP_ERR_CE_CMD_PARAMS_ERROR;
}
@@ -1825,10 +1845,8 @@ OLAPStatus SchemaChangeHandler::_parse_request(TabletSharedPtr base_tablet,
*sc_directly = true;
}
- if (OLAP_SUCCESS != (res = _init_column_mapping(
- column_mapping,
- new_column,
- new_column.default_value()))) {
+ if (OLAP_SUCCESS != (res = _init_column_mapping(column_mapping, new_column,
+ new_column.default_value()))) {
return res;
}
@@ -1838,14 +1856,10 @@ OLAPStatus SchemaChangeHandler::_parse_request(TabletSharedPtr base_tablet,
continue;
}
-
// XXX: 只有DROP COLUMN时,遇到新Schema转旧Schema时会进入这里。
column_mapping->ref_column = -1;
- if (OLAP_SUCCESS != (res = _init_column_mapping(
- column_mapping,
- new_column,
- ""))) {
+ if (OLAP_SUCCESS != (res = _init_column_mapping(column_mapping, new_column, ""))) {
return res;
}
@@ -1859,8 +1873,7 @@ OLAPStatus SchemaChangeHandler::_parse_request(TabletSharedPtr base_tablet,
// 若Key列的引用序列出现乱序,则需要重排序
int num_default_value = 0;
- for (int i = 0, new_schema_size = new_tablet->num_key_columns();
- i < new_schema_size; ++i) {
+ for (int i = 0, new_schema_size = new_tablet->num_key_columns(); i < new_schema_size; ++i) {
ColumnMapping* column_mapping = rb_changer->get_mutable_column_mapping(i);
if (column_mapping->ref_column < 0) {
@@ -1887,29 +1900,30 @@ OLAPStatus SchemaChangeHandler::_parse_request(TabletSharedPtr base_tablet,
if (column_mapping->ref_column < 0) {
continue;
} else {
- if (new_tablet_schema.column(i).type() != ref_tablet_schema.column(column_mapping->ref_column).type()) {
+ if (new_tablet_schema.column(i).type() !=
+ ref_tablet_schema.column(column_mapping->ref_column).type()) {
*sc_directly = true;
return OLAP_SUCCESS;
- } else if (
- (new_tablet_schema.column(i).type() == ref_tablet_schema.column(column_mapping->ref_column).type())
- && (new_tablet_schema.column(i).length()
- != ref_tablet_schema.column(column_mapping->ref_column).length())) {
+ } else if ((new_tablet_schema.column(i).type() ==
+ ref_tablet_schema.column(column_mapping->ref_column).type()) &&
+ (new_tablet_schema.column(i).length() !=
+ ref_tablet_schema.column(column_mapping->ref_column).length())) {
*sc_directly = true;
return OLAP_SUCCESS;
- } else if (new_tablet_schema.column(i).is_bf_column()
- != ref_tablet_schema.column(column_mapping->ref_column).is_bf_column()) {
+ } else if (new_tablet_schema.column(i).is_bf_column() !=
+ ref_tablet_schema.column(column_mapping->ref_column).is_bf_column()) {
*sc_directly = true;
return OLAP_SUCCESS;
- } else if (new_tablet_schema.column(i).has_bitmap_index()
- != ref_tablet_schema.column(column_mapping->ref_column).has_bitmap_index()) {
+ } else if (new_tablet_schema.column(i).has_bitmap_index() !=
+ ref_tablet_schema.column(column_mapping->ref_column).has_bitmap_index()) {
*sc_directly = true;
return OLAP_SUCCESS;
}
}
}
- if (base_tablet->delete_predicates().size() != 0){
+ if (base_tablet->delete_predicates().size() != 0) {
//there exists delete condition in header, can't do linked schema change
*sc_directly = true;
}
@@ -1941,10 +1955,12 @@ OLAPStatus SchemaChangeHandler::_init_column_mapping(ColumnMapping* column_mappi
return OLAP_SUCCESS;
}
-OLAPStatus SchemaChangeHandler::_validate_alter_result(TabletSharedPtr new_tablet, const TAlterTabletReqV2& request) {
+OLAPStatus SchemaChangeHandler::_validate_alter_result(TabletSharedPtr new_tablet,
+ const TAlterTabletReqV2& request) {
Version max_continuous_version = {-1, 0};
VersionHash max_continuous_version_hash = 0;
- new_tablet->max_continuous_version_from_begining(&max_continuous_version, &max_continuous_version_hash);
+ new_tablet->max_continuous_version_from_begining(&max_continuous_version,
+ &max_continuous_version_hash);
LOG(INFO) << "find max continuous version of tablet=" << new_tablet->full_name()
<< ", start_version=" << max_continuous_version.first
<< ", end_version=" << max_continuous_version.second;
@@ -1955,4 +1971,4 @@ OLAPStatus SchemaChangeHandler::_validate_alter_result(TabletSharedPtr new_table
}
}
-} // namespace doris
+} // namespace doris
diff --git a/be/src/olap/schema_change.h b/be/src/olap/schema_change.h
index ac6f55d..c749fec 100644
--- a/be/src/olap/schema_change.h
+++ b/be/src/olap/schema_change.h
@@ -23,11 +23,11 @@
#include <vector>
#include "gen_cpp/AgentService_types.h"
+#include "olap/column_mapping.h"
#include "olap/delete_handler.h"
#include "olap/rowset/rowset.h"
#include "olap/rowset/rowset_writer.h"
#include "olap/tablet.h"
-#include "olap/column_mapping.h"
namespace doris {
// defined in 'field.h'
@@ -42,26 +42,19 @@ class RowCursor;
class RowBlockChanger {
public:
- RowBlockChanger(const TabletSchema& tablet_schema,
- const TabletSharedPtr& base_tablet,
+ RowBlockChanger(const TabletSchema& tablet_schema, const TabletSharedPtr& base_tablet,
const DeleteHandler& delete_handler);
- RowBlockChanger(const TabletSchema& tablet_schema,
- const TabletSharedPtr& base_tablet);
+ RowBlockChanger(const TabletSchema& tablet_schema, const TabletSharedPtr& base_tablet);
virtual ~RowBlockChanger();
ColumnMapping* get_mutable_column_mapping(size_t column_index);
- SchemaMapping get_schema_mapping() const {
- return _schema_mapping;
- }
+ SchemaMapping get_schema_mapping() const { return _schema_mapping; }
- bool change_row_block(
- const RowBlock* ref_block,
- int32_t data_version,
- RowBlock* mutable_block,
- uint64_t* filtered_rows) const;
+ bool change_row_block(const RowBlock* ref_block, int32_t data_version, RowBlock* mutable_block,
+ uint64_t* filtered_rows) const;
private:
// @brief column-mapping specification of new schema
@@ -78,8 +71,7 @@ public:
RowBlockAllocator(const TabletSchema& tablet_schema, size_t memory_limitation);
virtual ~RowBlockAllocator();
- OLAPStatus allocate(RowBlock** row_block, size_t num_rows,
- bool null_supported);
+ OLAPStatus allocate(RowBlock** row_block, size_t num_rows, bool null_supported);
void release(RowBlock* row_block);
private:
@@ -94,34 +86,20 @@ public:
SchemaChange() : _filtered_rows(0), _merged_rows(0) {}
virtual ~SchemaChange() {}
- virtual bool process(RowsetReaderSharedPtr rowset_reader,
- RowsetWriter* new_rowset_builder,
- TabletSharedPtr tablet,
- TabletSharedPtr base_tablet) = 0;
+ virtual bool process(RowsetReaderSharedPtr rowset_reader, RowsetWriter* new_rowset_builder,
+ TabletSharedPtr tablet, TabletSharedPtr base_tablet) = 0;
- void add_filtered_rows(uint64_t filtered_rows) {
- _filtered_rows += filtered_rows;
- }
+ void add_filtered_rows(uint64_t filtered_rows) { _filtered_rows += filtered_rows; }
- void add_merged_rows(uint64_t merged_rows) {
- _merged_rows += merged_rows;
- }
+ void add_merged_rows(uint64_t merged_rows) { _merged_rows += merged_rows; }
- uint64_t filtered_rows() const {
- return _filtered_rows;
- }
+ uint64_t filtered_rows() const { return _filtered_rows; }
- uint64_t merged_rows() const {
- return _merged_rows;
- }
+ uint64_t merged_rows() const { return _merged_rows; }
- void reset_filtered_rows() {
- _filtered_rows = 0;
- }
+ void reset_filtered_rows() { _filtered_rows = 0; }
- void reset_merged_rows() {
- _merged_rows = 0;
- }
+ void reset_merged_rows() { _merged_rows = 0; }
private:
uint64_t _filtered_rows;
@@ -131,13 +109,12 @@ private:
class LinkedSchemaChange : public SchemaChange {
public:
explicit LinkedSchemaChange(const RowBlockChanger& row_block_changer)
- : _row_block_changer(row_block_changer) { }
+ : _row_block_changer(row_block_changer) {}
~LinkedSchemaChange() {}
- bool process(RowsetReaderSharedPtr rowset_reader,
- RowsetWriter* new_rowset_writer,
- TabletSharedPtr new_tablet,
- TabletSharedPtr base_tablet) override;
+ bool process(RowsetReaderSharedPtr rowset_reader, RowsetWriter* new_rowset_writer,
+ TabletSharedPtr new_tablet, TabletSharedPtr base_tablet) override;
+
private:
const RowBlockChanger& _row_block_changer;
DISALLOW_COPY_AND_ASSIGN(LinkedSchemaChange);
@@ -148,14 +125,11 @@ class SchemaChangeDirectly : public SchemaChange {
public:
// @params tablet the instance of tablet which has new schema.
// @params row_block_changer changer to modifiy the data of RowBlock
- explicit SchemaChangeDirectly(
- const RowBlockChanger& row_block_changer);
+ explicit SchemaChangeDirectly(const RowBlockChanger& row_block_changer);
virtual ~SchemaChangeDirectly();
- virtual bool process(RowsetReaderSharedPtr rowset_reader,
- RowsetWriter* new_rowset_writer,
- TabletSharedPtr new_tablet,
- TabletSharedPtr base_tablet) override;
+ virtual bool process(RowsetReaderSharedPtr rowset_reader, RowsetWriter* new_rowset_writer,
+ TabletSharedPtr new_tablet, TabletSharedPtr base_tablet) override;
private:
const RowBlockChanger& _row_block_changer;
@@ -170,30 +144,21 @@ private:
// @breif schema change with sorting
class SchemaChangeWithSorting : public SchemaChange {
public:
- explicit SchemaChangeWithSorting(
- const RowBlockChanger& row_block_changer,
- size_t memory_limitation);
+ explicit SchemaChangeWithSorting(const RowBlockChanger& row_block_changer,
+ size_t memory_limitation);
virtual ~SchemaChangeWithSorting();
- virtual bool process(RowsetReaderSharedPtr rowset_reader,
- RowsetWriter* new_rowset_builder,
- TabletSharedPtr new_tablet,
- TabletSharedPtr base_tablet) override;
+ virtual bool process(RowsetReaderSharedPtr rowset_reader, RowsetWriter* new_rowset_builder,
+ TabletSharedPtr new_tablet, TabletSharedPtr base_tablet) override;
private:
- bool _internal_sorting(
- const std::vector<RowBlock*>& row_block_arr,
- const Version& temp_delta_versions,
- const VersionHash version_hash,
- TabletSharedPtr new_tablet,
- RowsetTypePB new_rowset_type,
- SegmentsOverlapPB segments_overlap,
- RowsetSharedPtr* rowset);
-
- bool _external_sorting(
- std::vector<RowsetSharedPtr>& src_rowsets,
- RowsetWriter* rowset_writer,
- TabletSharedPtr new_tablet);
+ bool _internal_sorting(const std::vector<RowBlock*>& row_block_arr,
+ const Version& temp_delta_versions, const VersionHash version_hash,
+ TabletSharedPtr new_tablet, RowsetTypePB new_rowset_type,
+ SegmentsOverlapPB segments_overlap, RowsetSharedPtr* rowset);
+
+ bool _external_sorting(std::vector<RowsetSharedPtr>& src_rowsets, RowsetWriter* rowset_writer,
+ TabletSharedPtr new_tablet);
const RowBlockChanger& _row_block_changer;
size_t _memory_limitation;
@@ -209,13 +174,11 @@ public:
virtual ~SchemaChangeHandler() {}
OLAPStatus process_alter_tablet(AlterTabletType alter_tablet_type,
- const TAlterTabletReq& request);
+ const TAlterTabletReq& request);
+
+ OLAPStatus schema_version_convert(TabletSharedPtr base_tablet, TabletSharedPtr new_tablet,
+ RowsetSharedPtr* base_rowset, RowsetSharedPtr* new_rowset);
- OLAPStatus schema_version_convert(TabletSharedPtr base_tablet,
- TabletSharedPtr new_tablet,
- RowsetSharedPtr* base_rowset,
- RowsetSharedPtr* new_rowset);
-
// schema change v2, it will not set alter task in base tablet
OLAPStatus process_alter_tablet_v2(const TAlterTabletReqV2& request);
@@ -241,36 +204,33 @@ private:
// add alter task to base_tablet and new_tablet.
// add A->(B|C|...) relation chain to all of them.
- OLAPStatus _add_alter_task(AlterTabletType alter_tablet_type,
- TabletSharedPtr base_tablet,
+ OLAPStatus _add_alter_task(AlterTabletType alter_tablet_type, TabletSharedPtr base_tablet,
TabletSharedPtr new_tablet,
const std::vector<Version>& versions_to_be_changed);
- OLAPStatus _save_alter_state(AlterTabletState state,
- TabletSharedPtr base_tablet,
+ OLAPStatus _save_alter_state(AlterTabletState state, TabletSharedPtr base_tablet,
TabletSharedPtr new_tablet);
-
+
OLAPStatus _do_process_alter_tablet_v2(const TAlterTabletReqV2& request);
-
+
OLAPStatus _validate_alter_result(TabletSharedPtr new_tablet, const TAlterTabletReqV2& request);
static OLAPStatus _convert_historical_rowsets(const SchemaChangeParams& sc_params);
- static OLAPStatus _parse_request(TabletSharedPtr base_tablet,
- TabletSharedPtr new_tablet,
- RowBlockChanger* rb_changer,
- bool* sc_sorting,
+ static OLAPStatus _parse_request(TabletSharedPtr base_tablet, TabletSharedPtr new_tablet,
+ RowBlockChanger* rb_changer, bool* sc_sorting,
bool* sc_directly);
// 需要新建default_value时的初始化设置
static OLAPStatus _init_column_mapping(ColumnMapping* column_mapping,
const TabletColumn& column_schema,
const std::string& value);
+
private:
RowsetReaderContext _reader_context;
DISALLOW_COPY_AND_ASSIGN(SchemaChangeHandler);
};
-} // namespace doris
+} // namespace doris
#endif // DORIS_BE_SRC_OLAP_SCHEMA_CHANGE_H
diff --git a/docs/en/sql-reference/sql-statements/Data Definition/ALTER TABLE.md b/docs/en/sql-reference/sql-statements/Data Definition/ALTER TABLE.md
index c57f3ee..d1708d9 100644
--- a/docs/en/sql-reference/sql-statements/Data Definition/ALTER TABLE.md
+++ b/docs/en/sql-reference/sql-statements/Data Definition/ALTER TABLE.md
@@ -154,7 +154,6 @@ under the License.
5) The following types of conversions are currently supported (accuracy loss is guaranteed by the user)
TINYINT/SMALLINT/INT/BIGINT is converted to TINYINT/SMALLINT/INT/BIGINT/DOUBLE.
TINTINT/SMALLINT/INT/BIGINT/LARGEINT/FLOAT/DOUBLE/DECIMAL is converted to VARCHAR
- Convert LARGEINT to DOUBLE
VARCHAR supports modification of maximum length
Convert VARCHAR to TINYINT/SMALLINT/INT/BIGINT/LARGEINT/FLOAT/DOUBLE.
Convert VARCHAR to DATE (currently support six formats: "%Y-%m-%d", "%y-%m-%d", "%Y%m%d", "%y%m%d", "%Y/%m/%d, "%y/%m/%d")
diff --git a/docs/zh-CN/sql-reference/sql-statements/Data Definition/ALTER TABLE.md b/docs/zh-CN/sql-reference/sql-statements/Data Definition/ALTER TABLE.md
index 4f5f9e6..3ac35b4 100644
--- a/docs/zh-CN/sql-reference/sql-statements/Data Definition/ALTER TABLE.md
+++ b/docs/zh-CN/sql-reference/sql-statements/Data Definition/ALTER TABLE.md
@@ -152,7 +152,6 @@ under the License.
5) 目前支持以下类型的转换(精度损失由用户保证)
TINYINT/SMALLINT/INT/BIGINT 转换成 TINYINT/SMALLINT/INT/BIGINT/DOUBLE。
TINTINT/SMALLINT/INT/BIGINT/LARGEINT/FLOAT/DOUBLE/DECIMAL 转换成 VARCHAR
- LARGEINT 转换成 DOUBLE
VARCHAR 支持修改最大长度
VARCHAR 转换成 TINTINT/SMALLINT/INT/BIGINT/LARGEINT/FLOAT/DOUBLE
VARCHAR 转换成 DATE (目前支持"%Y-%m-%d", "%y-%m-%d", "%Y%m%d", "%y%m%d", "%Y/%m/%d, "%y/%m/%d"六种格式化格式)
diff --git a/fe/src/main/java/org/apache/doris/catalog/Column.java b/fe/src/main/java/org/apache/doris/catalog/Column.java
index ae67839..53b65c7 100644
--- a/fe/src/main/java/org/apache/doris/catalog/Column.java
+++ b/fe/src/main/java/org/apache/doris/catalog/Column.java
@@ -288,6 +288,12 @@ public class Column implements Writable {
}
}
+ // now we support convert decimal to varchar type
+ if ((getDataType() == PrimitiveType.DECIMAL && other.getDataType() == PrimitiveType.VARCHAR)
+ || (getDataType() == PrimitiveType.DECIMALV2 && other.getDataType() == PrimitiveType.VARCHAR)) {
+ return;
+ }
+
if (this.getPrecision() != other.getPrecision()) {
throw new DdlException("Cannot change precision");
}
diff --git a/fe/src/main/java/org/apache/doris/catalog/ColumnType.java b/fe/src/main/java/org/apache/doris/catalog/ColumnType.java
index 557a6f5..f27e414 100644
--- a/fe/src/main/java/org/apache/doris/catalog/ColumnType.java
+++ b/fe/src/main/java/org/apache/doris/catalog/ColumnType.java
@@ -75,14 +75,17 @@ public abstract class ColumnType {
schemaChangeMatrix[PrimitiveType.CHAR.ordinal()][PrimitiveType.VARCHAR.ordinal()] = true;
schemaChangeMatrix[PrimitiveType.CHAR.ordinal()][PrimitiveType.CHAR.ordinal()] = true;
- schemaChangeMatrix[PrimitiveType.VARCHAR.ordinal()][PrimitiveType.DATE.ordinal()] = true;
- schemaChangeMatrix[PrimitiveType.VARCHAR.ordinal()][PrimitiveType.FLOAT.ordinal()] = true;
- schemaChangeMatrix[PrimitiveType.VARCHAR.ordinal()][PrimitiveType.DOUBLE.ordinal()] = true;
schemaChangeMatrix[PrimitiveType.VARCHAR.ordinal()][PrimitiveType.TINYINT.ordinal()] = true;
schemaChangeMatrix[PrimitiveType.VARCHAR.ordinal()][PrimitiveType.SMALLINT.ordinal()] = true;
schemaChangeMatrix[PrimitiveType.VARCHAR.ordinal()][PrimitiveType.INT.ordinal()] = true;
schemaChangeMatrix[PrimitiveType.VARCHAR.ordinal()][PrimitiveType.BIGINT.ordinal()] = true;
schemaChangeMatrix[PrimitiveType.VARCHAR.ordinal()][PrimitiveType.LARGEINT.ordinal()] = true;
+ schemaChangeMatrix[PrimitiveType.VARCHAR.ordinal()][PrimitiveType.FLOAT.ordinal()] = true;
+ schemaChangeMatrix[PrimitiveType.VARCHAR.ordinal()][PrimitiveType.DOUBLE.ordinal()] = true;
+ schemaChangeMatrix[PrimitiveType.VARCHAR.ordinal()][PrimitiveType.DATE.ordinal()] = true;
+
+ schemaChangeMatrix[PrimitiveType.DECIMAL.ordinal()][PrimitiveType.VARCHAR.ordinal()] = true;
+ schemaChangeMatrix[PrimitiveType.DECIMALV2.ordinal()][PrimitiveType.VARCHAR.ordinal()] = true;
schemaChangeMatrix[PrimitiveType.DATETIME.ordinal()][PrimitiveType.DATE.ordinal()] = true;
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org