You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by GitBox <gi...@apache.org> on 2022/06/16 07:56:59 UTC

[GitHub] [incubator-doris] BiteTheDDDDt opened a new pull request, #10187: [Feature] [Vectorized] support vectorized schema-change

BiteTheDDDDt opened a new pull request, #10187:
URL: https://github.com/apache/incubator-doris/pull/10187

   # Proposed changes
   
   #9810
   
   ## Problem Summary:
   
   Describe the overview of changes.
   
   ## Checklist(Required)
   
   1. Does it affect the original behavior: (Yes/No/I Don't know)
   2. Has unit tests been added: (Yes/No/No Need)
   3. Has document been added or modified: (Yes/No/No Need)
   4. Does it need to update dependencies: (Yes/No)
   5. Are there any changes that cannot be rolled back: (Yes/No)
   
   ## Further comments
   
   If this is a relatively large or complex change, kick off the discussion at [dev@doris.apache.org](mailto:dev@doris.apache.org) by explaining why you chose the solution you did and what alternatives you considered, etc...
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [doris] HappenLee commented on a diff in pull request #10187: [Feature] [Vectorized] support vectorized schema-change

Posted by GitBox <gi...@apache.org>.
HappenLee commented on code in PR #10187:
URL: https://github.com/apache/doris/pull/10187#discussion_r905860563


##########
be/src/olap/schema_change.cpp:
##########
@@ -91,6 +92,147 @@ class RowBlockMerger {
     std::priority_queue<MergeElement> _heap;
 };
 
+class MultiBlockMerger {
+public:
+    MultiBlockMerger(TabletSharedPtr tablet) : _tablet(tablet), _cmp(tablet) {}
+
+    Status merge(const std::vector<std::unique_ptr<vectorized::Block>>& blocks,
+                 RowsetWriter* rowset_writer, uint64_t* merged_rows) {
+        std::vector<RowRef> row_refs;
+        for (auto& block : blocks) {
+            for (uint16_t i = 0; i < block->rows(); i++) {
+                row_refs.emplace_back(block.get(), i);
+            }
+        }
+        // The block version is incremental.
+        std::stable_sort(row_refs.begin(), row_refs.end(), _cmp);
+
+        if (!row_refs.size()) {
+            return Status::OK();
+        }
+
+        auto finalized_block = _tablet->tablet_schema().create_block();
+        int rows = row_refs.size();
+        int columns = finalized_block.columns();
+        *merged_rows += rows;
+
+        std::vector<RowRef> pushed_row_refs;
+        if (_tablet->keys_type() == KeysType::DUP_KEYS) {
+            std::swap(pushed_row_refs, row_refs);
+        } else if (_tablet->keys_type() == KeysType::UNIQUE_KEYS) {
+            for (int i = 0; i < rows; i++) {
+                if (i == rows - 1 || _cmp.compare(row_refs[i], row_refs[i + 1])) {
+                    pushed_row_refs.push_back(row_refs[i]);
+                }
+            }
+        } else {

Review Comment:
   the logic is confusing, should change



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [doris] HappenLee commented on a diff in pull request #10187: [Feature] [Vectorized] support vectorized schema-change

Posted by GitBox <gi...@apache.org>.
HappenLee commented on code in PR #10187:
URL: https://github.com/apache/doris/pull/10187#discussion_r905767486


##########
be/src/olap/schema_change.cpp:
##########
@@ -91,6 +92,147 @@ class RowBlockMerger {
     std::priority_queue<MergeElement> _heap;
 };
 
+class MultiBlockMerger {
+public:
+    MultiBlockMerger(TabletSharedPtr tablet) : _tablet(tablet), _cmp(tablet) {}
+
+    Status merge(const std::vector<std::unique_ptr<vectorized::Block>>& blocks,
+                 RowsetWriter* rowset_writer, uint64_t* merged_rows) {
+        std::vector<RowRef> row_refs;
+        for (auto& block : blocks) {
+            for (uint16_t i = 0; i < block->rows(); i++) {
+                row_refs.emplace_back(block.get(), i);
+            }
+        }
+        // The block version is incremental.
+        std::stable_sort(row_refs.begin(), row_refs.end(), _cmp);
+
+        if (!row_refs.size()) {
+            return Status::OK();
+        }
+
+        auto finalized_block = _tablet->tablet_schema().create_block();
+        int rows = row_refs.size();
+        int columns = finalized_block.columns();
+        *merged_rows += rows;
+
+        std::vector<RowRef> pushed_row_refs;
+        if (_tablet->keys_type() == KeysType::DUP_KEYS) {
+            std::swap(pushed_row_refs, row_refs);
+        } else if (_tablet->keys_type() == KeysType::UNIQUE_KEYS) {
+            for (int i = 0; i < rows; i++) {
+                if (i == rows - 1 || _cmp.compare(row_refs[i], row_refs[i + 1])) {
+                    pushed_row_refs.push_back(row_refs[i]);
+                }
+            }
+        } else {
+            auto tablet_schema = _tablet->tablet_schema();
+            int key_number = _tablet->num_key_columns();
+
+            std::vector<vectorized::AggregateFunctionPtr> agg_functions;
+            std::vector<vectorized::AggregateDataPtr> agg_places;
+
+            for (int i = key_number; i < columns; i++) {
+                vectorized::AggregateFunctionPtr function =
+                        tablet_schema.column(i).get_aggregate_function(
+                                {finalized_block.get_data_type(i)}, vectorized::AGG_LOAD_SUFFIX);
+                agg_functions.push_back(function);
+                // create aggregate data
+                vectorized::AggregateDataPtr place = new char[function->size_of_data()];
+                function->create(place);
+                agg_places.push_back(place);
+            }
+
+            for (int i = 0; i < rows; i++) {
+                auto row_ref = row_refs[i];
+
+                for (int j = key_number; j < columns; j++) {
+                    auto column_ptr = row_ref.get_column(j).get();
+                    agg_functions[j - key_number]->add(
+                            agg_places[j - key_number],
+                            const_cast<const vectorized::IColumn**>(&column_ptr), row_ref.position,
+                            nullptr);
+                }
+
+                if (i == rows - 1 || _cmp.compare(row_refs[i], row_refs[i + 1])) {
+                    for (int j = 0; j < key_number; j++) {
+                        finalized_block.get_by_position(j).column->assume_mutable()->insert_from(
+                                *row_ref.get_column(j), row_ref.position);
+                    }
+
+                    for (int j = key_number; j < columns; j++) {
+                        agg_functions[j - key_number]->insert_result_into(
+                                agg_places[j - key_number],
+                                finalized_block.get_by_position(j).column->assume_mutable_ref());
+                        agg_functions[j - key_number]->create(agg_places[j - key_number]);
+                    }
+
+                    if (i == rows - 1 || finalized_block.rows() == ALTER_TABLE_BATCH_SIZE) {
+                        *merged_rows -= finalized_block.rows();
+                        rowset_writer->add_block(&finalized_block);
+                        finalized_block.clear_column_data();
+                    }
+                }
+            }
+
+            for (int i = 0; i < columns - key_number; i++) {
+                agg_functions[i]->destroy(agg_places[i]);
+                delete[] agg_places[i];
+            }
+        }
+
+        // update real inserted row number
+        rows = pushed_row_refs.size();
+        *merged_rows -= rows;
+
+        for (int i = 0; i < rows; i += ALTER_TABLE_BATCH_SIZE) {
+            int limit = std::min(ALTER_TABLE_BATCH_SIZE, rows - i);
+
+            for (int idx = 0; idx < columns; idx++) {
+                auto column = finalized_block.get_by_position(idx).column->assume_mutable();
+
+                for (int j = 0; j < limit; j++) {
+                    auto row_ref = pushed_row_refs[i + j];
+                    column->insert_from(*row_ref.get_column(idx), row_ref.position);
+                }
+            }
+            rowset_writer->add_block(&finalized_block);
+            finalized_block.clear_column_data();
+        }
+
+        RETURN_IF_ERROR(rowset_writer->flush());
+        return Status::OK();
+    }
+
+private:
+    struct RowRef {
+        RowRef(vectorized::Block* block_, uint16_t position_)
+                : block(block_), position(position_) {}
+        vectorized::ColumnPtr get_column(int index) const {
+            return block->get_by_position(index).column;
+        }
+        const vectorized::Block* block;
+        uint16_t position;
+    };
+
+    struct RowRefComparator {
+        RowRefComparator(TabletSharedPtr tablet) : num_columns(tablet->num_key_columns()) {}
+
+        int compare(const RowRef& lhs, const RowRef& rhs) const {
+            return lhs.block->compare_at(lhs.position, rhs.position, num_columns, *rhs.block, -1);
+        }
+
+        bool operator()(const RowRef& lhs, const RowRef& rhs) const {
+            return compare(lhs, rhs) < 0;
+        }
+
+        const size_t num_columns;

Review Comment:
   use `_num_columns`



##########
be/src/olap/schema_change.cpp:
##########
@@ -91,6 +92,147 @@ class RowBlockMerger {
     std::priority_queue<MergeElement> _heap;
 };
 
+class MultiBlockMerger {
+public:
+    MultiBlockMerger(TabletSharedPtr tablet) : _tablet(tablet), _cmp(tablet) {}
+
+    Status merge(const std::vector<std::unique_ptr<vectorized::Block>>& blocks,
+                 RowsetWriter* rowset_writer, uint64_t* merged_rows) {
+        std::vector<RowRef> row_refs;
+        for (auto& block : blocks) {
+            for (uint16_t i = 0; i < block->rows(); i++) {
+                row_refs.emplace_back(block.get(), i);
+            }
+        }
+        // The block version is incremental.

Review Comment:
   Add TODOļ¼š`use pdsort to do sort`



##########
be/src/olap/schema_change.cpp:
##########
@@ -91,6 +92,147 @@ class RowBlockMerger {
     std::priority_queue<MergeElement> _heap;
 };
 
+class MultiBlockMerger {
+public:
+    MultiBlockMerger(TabletSharedPtr tablet) : _tablet(tablet), _cmp(tablet) {}
+
+    Status merge(const std::vector<std::unique_ptr<vectorized::Block>>& blocks,
+                 RowsetWriter* rowset_writer, uint64_t* merged_rows) {
+        std::vector<RowRef> row_refs;

Review Comment:
   should reserve the row_refs by `blocks.rows()`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [doris] HappenLee commented on a diff in pull request #10187: [Feature] [Vectorized] support vectorized schema-change

Posted by GitBox <gi...@apache.org>.
HappenLee commented on code in PR #10187:
URL: https://github.com/apache/doris/pull/10187#discussion_r906946575


##########
be/src/olap/schema_change.cpp:
##########
@@ -665,6 +812,112 @@ Status RowBlockChanger::change_row_block(const RowBlock* ref_block, int32_t data
 #undef TYPE_REINTERPRET_CAST
 #undef ASSIGN_DEFAULT_VALUE
 
+Status RowBlockChanger::change_block(vectorized::Block* ref_block,
+                                     vectorized::Block* new_block) const {
+    if (new_block->columns() != _schema_mapping.size()) {
+        LOG(WARNING) << "block does not match with schema mapping rules. "
+                     << "block_schema_size=" << new_block->columns()
+                     << ", mapping_schema_size=" << _schema_mapping.size();
+        return Status::OLAPInternalError(OLAP_ERR_NOT_INITED);
+    }
+
+    // material-view or rollup task will fail now
+    if (_desc_tbl.get_row_tuples().size() != ref_block->columns()) {
+        return Status::NotSupported(
+                "_desc_tbl.get_row_tuples().size() != ref_block->columns(), maybe because rollup "
+                "not supported now. ");
+    }
+
+    std::vector<bool> nullable_tuples;
+    for (int i = 0; i < ref_block->columns(); i++) {
+        nullable_tuples.emplace_back(ref_block->get_by_position(i).column->is_nullable());
+    }
+
+    ObjectPool pool;
+    RuntimeState* state = pool.add(new RuntimeState());
+    state->set_desc_tbl(&_desc_tbl);
+    RowDescriptor row_desc = RowDescriptor::create_default(_desc_tbl, nullable_tuples);
+
+    const int row_size = ref_block->rows();
+    const int column_size = new_block->columns();
+
+    // swap ref_block[key] and new_block[value]
+    std::map<int, int> swap_idx_map;
+
+    for (int idx = 0; idx < column_size; idx++) {
+        int ref_idx = _schema_mapping[idx].ref_column;
+
+        if (ref_idx < 0) {
+            // new column, write default value
+            auto value = _schema_mapping[idx].default_value;
+            auto column = new_block->get_by_position(idx).column->assume_mutable();
+            if (value->is_null()) {
+                DCHECK(column->is_nullable());
+                column->insert_many_defaults(row_size);
+            } else {
+                auto type_info = get_type_info(_schema_mapping[idx].new_column);
+                DefaultValueColumnIterator::insert_default_data(type_info.get(), value->size(),
+                                                                value->ptr(), column, row_size);
+            }
+            continue;
+        }
+
+        if (!_schema_mapping[idx].materialized_function.empty()) {
+            return Status::NotSupported("Materialized function not supported now. ");
+        }
+
+        if (_schema_mapping[idx].expr != nullptr) {
+            // calculate special materialized function, to_bitmap/hll_hash/count_field or cast expr
+            vectorized::VExprContext* ctx = nullptr;
+            RETURN_IF_ERROR(
+                    vectorized::VExpr::create_expr_tree(&pool, *_schema_mapping[idx].expr, &ctx));
+
+            RETURN_IF_ERROR(ctx->prepare(state, row_desc));
+            RETURN_IF_ERROR(ctx->open(state));
+
+            int result_column_id = -1;
+            RETURN_IF_ERROR(ctx->execute(ref_block, &result_column_id));
+            DCHECK(ref_block->get_by_position(result_column_id).column->size() == row_size)
+                    << new_block->get_by_position(idx).name << " size invalid"
+                    << ", expect=" << row_size
+                    << ", real=" << ref_block->get_by_position(result_column_id).column->size();
+
+            if (_schema_mapping[idx].expr->nodes[0].node_type == TExprNodeType::CAST_EXPR) {
+                RETURN_IF_ERROR(
+                        _check_cast_valid(ref_block->get_by_position(ref_idx).column,
+                                          ref_block->get_by_position(result_column_id).column));
+            }
+            swap_idx_map[result_column_id] = idx;
+
+            ctx->close(state);
+            continue;
+        }
+
+        // same type, just swap column
+        swap_idx_map[ref_idx] = idx;
+    }
+
+    for (auto it : swap_idx_map) {
+        new_block->get_by_position(it.second).column.swap(
+                ref_block->get_by_position(it.first).column);
+    }
+
+    return Status::OK();
+}
+
+Status RowBlockChanger::_check_cast_valid(vectorized::ColumnPtr ref_column,
+                                          vectorized::ColumnPtr new_column) const {
+    // TODO: rethink this check
+    // This check is to prevent schema-change from causing data loss,
+    // But it is possible to generate null data in material-view or rollup.
+    for (size_t i = 0; i < ref_column->size(); i++) {

Review Comment:
   SIMD the code



##########
be/src/olap/schema_change.cpp:
##########
@@ -665,6 +812,112 @@ Status RowBlockChanger::change_row_block(const RowBlock* ref_block, int32_t data
 #undef TYPE_REINTERPRET_CAST
 #undef ASSIGN_DEFAULT_VALUE
 
+Status RowBlockChanger::change_block(vectorized::Block* ref_block,
+                                     vectorized::Block* new_block) const {
+    if (new_block->columns() != _schema_mapping.size()) {
+        LOG(WARNING) << "block does not match with schema mapping rules. "
+                     << "block_schema_size=" << new_block->columns()
+                     << ", mapping_schema_size=" << _schema_mapping.size();
+        return Status::OLAPInternalError(OLAP_ERR_NOT_INITED);
+    }
+
+    // material-view or rollup task will fail now
+    if (_desc_tbl.get_row_tuples().size() != ref_block->columns()) {
+        return Status::NotSupported(
+                "_desc_tbl.get_row_tuples().size() != ref_block->columns(), maybe because rollup "
+                "not supported now. ");
+    }
+
+    std::vector<bool> nullable_tuples;
+    for (int i = 0; i < ref_block->columns(); i++) {
+        nullable_tuples.emplace_back(ref_block->get_by_position(i).column->is_nullable());
+    }
+
+    ObjectPool pool;
+    RuntimeState* state = pool.add(new RuntimeState());
+    state->set_desc_tbl(&_desc_tbl);
+    RowDescriptor row_desc = RowDescriptor::create_default(_desc_tbl, nullable_tuples);
+
+    const int row_size = ref_block->rows();
+    const int column_size = new_block->columns();
+
+    // swap ref_block[key] and new_block[value]
+    std::map<int, int> swap_idx_map;
+
+    for (int idx = 0; idx < column_size; idx++) {

Review Comment:
   change the for loop logic



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [doris] HappenLee merged pull request #10187: [Feature] [Vectorized] support vectorized schema-change

Posted by GitBox <gi...@apache.org>.
HappenLee merged PR #10187:
URL: https://github.com/apache/doris/pull/10187


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org