You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by GitBox <gi...@apache.org> on 2022/06/24 08:48:00 UTC

[GitHub] [doris] HappenLee commented on a diff in pull request #10187: [Feature] [Vectorized] support vectorized schema-change

HappenLee commented on code in PR #10187:
URL: https://github.com/apache/doris/pull/10187#discussion_r905767486


##########
be/src/olap/schema_change.cpp:
##########
@@ -91,6 +92,147 @@ class RowBlockMerger {
     std::priority_queue<MergeElement> _heap;
 };
 
+class MultiBlockMerger {
+public:
+    MultiBlockMerger(TabletSharedPtr tablet) : _tablet(tablet), _cmp(tablet) {}
+
+    Status merge(const std::vector<std::unique_ptr<vectorized::Block>>& blocks,
+                 RowsetWriter* rowset_writer, uint64_t* merged_rows) {
+        std::vector<RowRef> row_refs;
+        for (auto& block : blocks) {
+            for (uint16_t i = 0; i < block->rows(); i++) {
+                row_refs.emplace_back(block.get(), i);
+            }
+        }
+        // The block version is incremental.
+        std::stable_sort(row_refs.begin(), row_refs.end(), _cmp);
+
+        if (!row_refs.size()) {
+            return Status::OK();
+        }
+
+        auto finalized_block = _tablet->tablet_schema().create_block();
+        int rows = row_refs.size();
+        int columns = finalized_block.columns();
+        *merged_rows += rows;
+
+        std::vector<RowRef> pushed_row_refs;
+        if (_tablet->keys_type() == KeysType::DUP_KEYS) {
+            std::swap(pushed_row_refs, row_refs);
+        } else if (_tablet->keys_type() == KeysType::UNIQUE_KEYS) {
+            for (int i = 0; i < rows; i++) {
+                if (i == rows - 1 || _cmp.compare(row_refs[i], row_refs[i + 1])) {
+                    pushed_row_refs.push_back(row_refs[i]);
+                }
+            }
+        } else {
+            auto tablet_schema = _tablet->tablet_schema();
+            int key_number = _tablet->num_key_columns();
+
+            std::vector<vectorized::AggregateFunctionPtr> agg_functions;
+            std::vector<vectorized::AggregateDataPtr> agg_places;
+
+            for (int i = key_number; i < columns; i++) {
+                vectorized::AggregateFunctionPtr function =
+                        tablet_schema.column(i).get_aggregate_function(
+                                {finalized_block.get_data_type(i)}, vectorized::AGG_LOAD_SUFFIX);
+                agg_functions.push_back(function);
+                // create aggregate data
+                vectorized::AggregateDataPtr place = new char[function->size_of_data()];
+                function->create(place);
+                agg_places.push_back(place);
+            }
+
+            for (int i = 0; i < rows; i++) {
+                auto row_ref = row_refs[i];
+
+                for (int j = key_number; j < columns; j++) {
+                    auto column_ptr = row_ref.get_column(j).get();
+                    agg_functions[j - key_number]->add(
+                            agg_places[j - key_number],
+                            const_cast<const vectorized::IColumn**>(&column_ptr), row_ref.position,
+                            nullptr);
+                }
+
+                if (i == rows - 1 || _cmp.compare(row_refs[i], row_refs[i + 1])) {
+                    for (int j = 0; j < key_number; j++) {
+                        finalized_block.get_by_position(j).column->assume_mutable()->insert_from(
+                                *row_ref.get_column(j), row_ref.position);
+                    }
+
+                    for (int j = key_number; j < columns; j++) {
+                        agg_functions[j - key_number]->insert_result_into(
+                                agg_places[j - key_number],
+                                finalized_block.get_by_position(j).column->assume_mutable_ref());
+                        agg_functions[j - key_number]->create(agg_places[j - key_number]);
+                    }
+
+                    if (i == rows - 1 || finalized_block.rows() == ALTER_TABLE_BATCH_SIZE) {
+                        *merged_rows -= finalized_block.rows();
+                        rowset_writer->add_block(&finalized_block);
+                        finalized_block.clear_column_data();
+                    }
+                }
+            }
+
+            for (int i = 0; i < columns - key_number; i++) {
+                agg_functions[i]->destroy(agg_places[i]);
+                delete[] agg_places[i];
+            }
+        }
+
+        // update real inserted row number
+        rows = pushed_row_refs.size();
+        *merged_rows -= rows;
+
+        for (int i = 0; i < rows; i += ALTER_TABLE_BATCH_SIZE) {
+            int limit = std::min(ALTER_TABLE_BATCH_SIZE, rows - i);
+
+            for (int idx = 0; idx < columns; idx++) {
+                auto column = finalized_block.get_by_position(idx).column->assume_mutable();
+
+                for (int j = 0; j < limit; j++) {
+                    auto row_ref = pushed_row_refs[i + j];
+                    column->insert_from(*row_ref.get_column(idx), row_ref.position);
+                }
+            }
+            rowset_writer->add_block(&finalized_block);
+            finalized_block.clear_column_data();
+        }
+
+        RETURN_IF_ERROR(rowset_writer->flush());
+        return Status::OK();
+    }
+
+private:
+    struct RowRef {
+        RowRef(vectorized::Block* block_, uint16_t position_)
+                : block(block_), position(position_) {}
+        vectorized::ColumnPtr get_column(int index) const {
+            return block->get_by_position(index).column;
+        }
+        const vectorized::Block* block;
+        uint16_t position;
+    };
+
+    struct RowRefComparator {
+        RowRefComparator(TabletSharedPtr tablet) : num_columns(tablet->num_key_columns()) {}
+
+        int compare(const RowRef& lhs, const RowRef& rhs) const {
+            return lhs.block->compare_at(lhs.position, rhs.position, num_columns, *rhs.block, -1);
+        }
+
+        bool operator()(const RowRef& lhs, const RowRef& rhs) const {
+            return compare(lhs, rhs) < 0;
+        }
+
+        const size_t num_columns;

Review Comment:
   use `_num_columns`



##########
be/src/olap/schema_change.cpp:
##########
@@ -91,6 +92,147 @@ class RowBlockMerger {
     std::priority_queue<MergeElement> _heap;
 };
 
+class MultiBlockMerger {
+public:
+    MultiBlockMerger(TabletSharedPtr tablet) : _tablet(tablet), _cmp(tablet) {}
+
+    Status merge(const std::vector<std::unique_ptr<vectorized::Block>>& blocks,
+                 RowsetWriter* rowset_writer, uint64_t* merged_rows) {
+        std::vector<RowRef> row_refs;
+        for (auto& block : blocks) {
+            for (uint16_t i = 0; i < block->rows(); i++) {
+                row_refs.emplace_back(block.get(), i);
+            }
+        }
+        // The block version is incremental.

Review Comment:
   Add TODOļ¼š`use pdsort to do sort`



##########
be/src/olap/schema_change.cpp:
##########
@@ -91,6 +92,147 @@ class RowBlockMerger {
     std::priority_queue<MergeElement> _heap;
 };
 
+class MultiBlockMerger {
+public:
+    MultiBlockMerger(TabletSharedPtr tablet) : _tablet(tablet), _cmp(tablet) {}
+
+    Status merge(const std::vector<std::unique_ptr<vectorized::Block>>& blocks,
+                 RowsetWriter* rowset_writer, uint64_t* merged_rows) {
+        std::vector<RowRef> row_refs;

Review Comment:
   should reserve the row_refs by `blocks.rows()`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org