You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by GitBox <gi...@apache.org> on 2022/10/13 13:44:24 UTC

[GitHub] [doris] yixiutt opened a new pull request, #13359: [feature](vertical_compaction) support vertical compaction

yixiutt opened a new pull request, #13359:
URL: https://github.com/apache/doris/pull/13359

   # Proposed changes
   
   Issue Number: close Issue #12966 
   
   Support vertical compaction to reduce memory usage in compaction。
   
   Mainly Design:
   Split columns into several groups and record data's sequence in a RowSourceBuffer when compact key group, and use RowSourceBuffer to compact value groups. 
   
   Result:
   In clickbench test,  vertical compaction use 1/10 percent memory of  horizontal compaction and speed up 15%。
   
   ## Problem summary
   
   Describe your changes.
   
   ## Checklist(Required)
   
   1. Does it affect the original behavior: 
       - [ ] Yes
       - [ ] No
       - [ ] I don't know
   2. Has unit tests been added:
       - [ ] Yes
       - [ ] No
       - [ ] No Need
   3. Has document been added or modified:
       - [ ] Yes
       - [ ] No
       - [ ] No Need
   4. Does it need to update dependencies:
       - [ ] Yes
       - [ ] No
   5. Are there any changes that cannot be rolled back:
       - [ ] Yes (If Yes, please explain WHY)
       - [ ] No
   
   ## Further comments
   
   If this is a relatively large or complex change, kick off the discussion at [dev@doris.apache.org](mailto:dev@doris.apache.org) by explaining why you chose the solution you did and what alternatives you considered, etc...
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [doris] zhannngchen commented on a diff in pull request #13359: [feature](vertical_compaction) support vertical compaction

Posted by GitBox <gi...@apache.org>.
zhannngchen commented on code in PR #13359:
URL: https://github.com/apache/doris/pull/13359#discussion_r1006574754


##########
be/src/olap/rowset/segment_v2/segment_writer.cpp:
##########
@@ -299,11 +322,12 @@ uint64_t SegmentWriter::estimate_segment_size() {
     return size;
 }
 
-Status SegmentWriter::finalize(uint64_t* segment_file_size, uint64_t* index_size) {
-    // check disk capacity
-    if (_data_dir != nullptr && _data_dir->reach_capacity_limit((int64_t)estimate_segment_size())) {
-        return Status::InternalError("disk {} exceed capacity limit.", _data_dir->path_hash());
+Status SegmentWriter::finalize_columns(uint64_t* index_size) {
+    if (_has_key) {
+        _row_count = _num_rows_written;
     }
+    _num_rows_written = 0;

Review Comment:
   should have a check here?
   ```
       if (_has_key) {
            _row_count = _num_rows_written;
        } else {
            CHECK_EQ(_row_count, _num_rows_written);
        }
   ```



##########
regression-test/suites/compaction/test_vertical_compaction_uniq_keys.groovy:
##########
@@ -0,0 +1,242 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.codehaus.groovy.runtime.IOGroovyMethods
+
+suite("test_vertical_compaction_uniq_keys") {
+    def tableName = "vertical_compaction_uniq_keys_regression_test"
+
+    def set_be_config = { ->
+        String[][] backends = sql """ show backends; """
+        assertTrue(backends.size() > 0)
+        for (String[] backend in backends) {
+            StringBuilder setConfigCommand = new StringBuilder();
+            setConfigCommand.append("curl -X POST http://")
+            setConfigCommand.append(backend[2])
+            setConfigCommand.append(":")
+            setConfigCommand.append(backend[5])
+            setConfigCommand.append("/api/update_config?")
+            String command1 = setConfigCommand.toString() + "enable_vertical_compaction=true"
+            logger.info(command1)
+            def process1 = command1.execute()
+            int code = process1.waitFor()
+            assertEquals(code, 0)
+        }
+    }
+    def reset_be_config = { ->
+        String[][] backends = sql """ show backends; """
+        assertTrue(backends.size() > 0)
+        for (String[] backend in backends) {
+            StringBuilder setConfigCommand = new StringBuilder();
+            setConfigCommand.append("curl -X POST http://")
+            setConfigCommand.append(backend[2])
+            setConfigCommand.append(":")
+            setConfigCommand.append(backend[5])
+            setConfigCommand.append("/api/update_config?")
+            String command1 = setConfigCommand.toString() + "enable_vertical_compaction=false"
+            logger.info(command1)
+            def process1 = command1.execute()
+            int code = process1.waitFor()
+            assertEquals(code, 0)
+        }
+    }
+
+    try {
+        //BackendId,Cluster,IP,HeartbeatPort,BePort,HttpPort,BrpcPort,LastStartTime,LastHeartbeat,Alive,SystemDecommissioned,ClusterDecommissioned,TabletNum,DataUsedCapacity,AvailCapacity,TotalCapacity,UsedPct,MaxDiskUsedPct,Tag,ErrMsg,Version,Status
+        String[][] backends = sql """ show backends; """
+        assertTrue(backends.size() > 0)
+        String backend_id;
+        def backendId_to_backendIP = [:]
+        def backendId_to_backendHttpPort = [:]
+        for (String[] backend in backends) {
+            backendId_to_backendIP.put(backend[0], backend[2])
+            backendId_to_backendHttpPort.put(backend[0], backend[5])
+        }
+
+        backend_id = backendId_to_backendIP.keySet()[0]
+        StringBuilder showConfigCommand = new StringBuilder();
+        showConfigCommand.append("curl -X GET http://")
+        showConfigCommand.append(backendId_to_backendIP.get(backend_id))
+        showConfigCommand.append(":")
+        showConfigCommand.append(backendId_to_backendHttpPort.get(backend_id))
+        showConfigCommand.append("/api/show_config")
+        logger.info(showConfigCommand.toString())
+        def process = showConfigCommand.toString().execute()
+        int code = process.waitFor()
+        String err = IOGroovyMethods.getText(new BufferedReader(new InputStreamReader(process.getErrorStream())));
+        String out = process.getText()
+        logger.info("Show config: code=" + code + ", out=" + out + ", err=" + err)
+        assertEquals(code, 0)
+        def configList = parseJson(out.trim())
+        assert configList instanceof List
+
+        boolean disableAutoCompaction = true
+        for (Object ele in (List) configList) {
+            assert ele instanceof List<String>
+            if (((List<String>) ele)[0] == "disable_auto_compaction") {
+                disableAutoCompaction = Boolean.parseBoolean(((List<String>) ele)[2])
+            }
+        }
+        set_be_config.call()
+
+        sql """ DROP TABLE IF EXISTS ${tableName} """
+        sql """
+            CREATE TABLE ${tableName} (
+                `user_id` LARGEINT NOT NULL COMMENT "用户id",
+                `date` DATE NOT NULL COMMENT "数据灌入日期时间",
+                `datev2` DATEV2 NOT NULL COMMENT "数据灌入日期时间",
+                `datetimev2_1` DATETIMEV2(3) NOT NULL COMMENT "数据灌入日期时间",
+                `datetimev2_2` DATETIMEV2(6) NOT NULL COMMENT "数据灌入日期时间",
+                `city` VARCHAR(20) COMMENT "用户所在城市",
+                `age` SMALLINT COMMENT "用户年龄",
+                `sex` TINYINT COMMENT "用户性别",
+                `last_visit_date` DATETIME DEFAULT "1970-01-01 00:00:00" COMMENT "用户最后一次访问时间",
+                `last_update_date` DATETIME DEFAULT "1970-01-01 00:00:00" COMMENT "用户最后一次更新时间",
+                `datetime_val1` DATETIMEV2(3) DEFAULT "1970-01-01 00:00:00.111" COMMENT "用户最后一次访问时间",
+                `datetime_val2` DATETIME(6) DEFAULT "1970-01-01 00:00:00" COMMENT "用户最后一次更新时间",
+                `last_visit_date_not_null` DATETIME NOT NULL DEFAULT "1970-01-01 00:00:00" COMMENT "用户最后一次访问时间",
+                `cost` BIGINT DEFAULT "0" COMMENT "用户总消费",
+                `max_dwell_time` INT DEFAULT "0" COMMENT "用户最大停留时间",
+                `min_dwell_time` INT DEFAULT "99999" COMMENT "用户最小停留时间")
+            UNIQUE KEY(`user_id`, `date`, `datev2`, `datetimev2_1`, `datetimev2_2`, `city`, `age`, `sex`) DISTRIBUTED BY HASH(`user_id`)
+            PROPERTIES ( "replication_num" = "1" );
+        """
+
+        sql """ INSERT INTO ${tableName} VALUES
+             (1, '2017-10-01', '2017-10-01', '2017-10-01 11:11:11.110000', '2017-10-01 11:11:11.110111', 'Beijing', 10, 1, '2020-01-01', '2020-01-01', '2017-10-01 11:11:11.170000', '2017-10-01 11:11:11.110111', '2020-01-01', 1, 30, 20)
+            """
+
+        sql """ INSERT INTO ${tableName} VALUES
+             (1, '2017-10-01', '2017-10-01', '2017-10-01 11:11:11.110000', '2017-10-01 11:11:11.110111', 'Beijing', 10, 1, '2020-01-02', '2020-01-02', '2017-10-01 11:11:11.160000', '2017-10-01 11:11:11.100111', '2020-01-02', 1, 31, 19)

Review Comment:
   don't consider delete sign and delete query?



##########
be/src/olap/compaction.cpp:
##########
@@ -112,14 +113,50 @@ Status Compaction::quick_rowsets_compact() {
 
 Status Compaction::do_compaction(int64_t permits) {
     TRACE("start to do compaction");
+    uint32_t checksum_before;
+    uint32_t checksum_after;
+    if (config::enable_compaction_checksum) {
+        EngineChecksumTask checksum_task(_tablet->tablet_id(), _tablet->schema_hash(),
+                                         _input_rowsets.back()->end_version(), &checksum_before);
+        checksum_task.execute();
+    }
+
     _tablet->data_dir()->disks_compaction_score_increment(permits);
     _tablet->data_dir()->disks_compaction_num_increment(1);
     Status st = do_compaction_impl(permits);
     _tablet->data_dir()->disks_compaction_score_increment(-permits);
     _tablet->data_dir()->disks_compaction_num_increment(-1);
+
+    if (config::enable_compaction_checksum) {
+        EngineChecksumTask checksum_task(_tablet->tablet_id(), _tablet->schema_hash(),
+                                         _input_rowsets.back()->end_version(), &checksum_after);
+        checksum_task.execute();
+        if (checksum_before != checksum_after) {

Review Comment:
   Should use DCHECK here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [doris] dataroaring commented on a diff in pull request #13359: [feature](vertical_compaction) support vertical compaction

Posted by GitBox <gi...@apache.org>.
dataroaring commented on code in PR #13359:
URL: https://github.com/apache/doris/pull/13359#discussion_r1009220294


##########
be/src/olap/rowset/segment_v2/segment_writer.h:
##########
@@ -120,11 +130,16 @@ class SegmentWriter {
     std::unique_ptr<MemTracker> _mem_tracker;
     uint32_t _row_count = 0;
 
-    vectorized::OlapBlockDataConvertor _olap_data_convertor;
+    std::unique_ptr<vectorized::OlapBlockDataConvertor> _olap_data_convertor;
     // used for building short key index or primary key index during vectorized write.
     std::vector<const KeyCoder*> _key_coders;
     std::vector<uint16_t> _key_index_size;
     size_t _short_key_row_pos = 0;
+
+    std::vector<uint32_t> _column_ids;
+    bool _has_key = true;
+    // written when add particial columns
+    uint32_t _num_rows_written = 0;

Review Comment:
   The variable _num_rows_written is confused. It seems that num_rows_written works if users used segment writer in the order write keys, flush keys, writer colums? We should add more comment here.
   
   // used to count rows to SegmentWriter, in vertical compaction, num_rows_written = actual_row_count * column groups number.  or we can find a better name.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [doris] dataroaring commented on a diff in pull request #13359: [feature](vertical_compaction) support vertical compaction

Posted by GitBox <gi...@apache.org>.
dataroaring commented on code in PR #13359:
URL: https://github.com/apache/doris/pull/13359#discussion_r1009132314


##########
be/src/olap/rowset/beta_rowset_reader.cpp:
##########
@@ -47,68 +56,69 @@ Status BetaRowsetReader::init(RowsetReaderContext* read_context) {
     }
 
     // convert RowsetReaderContext to StorageReadOptions
-    StorageReadOptions read_options;
-    read_options.stats = _stats;
-    read_options.push_down_agg_type_opt = _context->push_down_agg_type_opt;
+    _read_options.stats = _stats;
+    _read_options.push_down_agg_type_opt = _context->push_down_agg_type_opt;
     if (read_context->lower_bound_keys != nullptr) {
         for (int i = 0; i < read_context->lower_bound_keys->size(); ++i) {
-            read_options.key_ranges.emplace_back(&read_context->lower_bound_keys->at(i),
-                                                 read_context->is_lower_keys_included->at(i),
-                                                 &read_context->upper_bound_keys->at(i),
-                                                 read_context->is_upper_keys_included->at(i));
+            _read_options.key_ranges.emplace_back(&read_context->lower_bound_keys->at(i),
+                                                  read_context->is_lower_keys_included->at(i),
+                                                  &read_context->upper_bound_keys->at(i),
+                                                  read_context->is_upper_keys_included->at(i));
         }
     }
 
-    bool can_reuse_schema = true;
     // delete_hanlder is always set, but it maybe not init, so that it will return empty conditions
     // or predicates when it is not inited.
     if (read_context->delete_handler != nullptr) {
         read_context->delete_handler->get_delete_conditions_after_version(
-                _rowset->end_version(), read_options.delete_condition_predicates.get(),
-                &read_options.col_id_to_del_predicates);
+                _rowset->end_version(), _read_options.delete_condition_predicates.get(),
+                &_read_options.col_id_to_del_predicates);
         // if del cond is not empty, schema may be different in multiple rowset
-        can_reuse_schema = read_options.col_id_to_del_predicates.empty();
+        _can_reuse_schema = _read_options.col_id_to_del_predicates.empty();
     }
-
-    if (!can_reuse_schema || _context->reuse_input_schema == nullptr) {
+    // In vertical compaction, every column group need new schema
+    if (read_context->is_vertical_compaction) {
+        _can_reuse_schema = false;

Review Comment:
   Same as above.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [doris] dataroaring commented on a diff in pull request #13359: [feature](vertical_compaction) support vertical compaction

Posted by GitBox <gi...@apache.org>.
dataroaring commented on code in PR #13359:
URL: https://github.com/apache/doris/pull/13359#discussion_r1007816289


##########
be/src/olap/merger.cpp:
##########
@@ -188,4 +190,137 @@ Status Merger::vmerge_rowsets(TabletSharedPtr tablet, ReaderType reader_type,
     return Status::OK();
 }
 
+// split columns into several groups, make sure all keys in one group
+// unique_key should consider sequence&delete column
+void Merger::vertical_split_columns(TabletSchemaSPtr tablet_schema,
+                                    std::vector<std::vector<uint32_t>>* column_groups) {
+    uint32_t num_columns_per_group = config::vertical_compaction_num_columns_per_group;
+    uint32_t num_key_cols = tablet_schema->num_key_columns();
+    uint32_t total_cols = tablet_schema->num_columns();
+    std::vector<uint32_t> key_columns;
+    for (auto i = 0; i < num_key_cols; ++i) {
+        key_columns.emplace_back(i);
+    }
+    // in unique key, sequence & delete sign column should merge with key columns
+    int32_t sequence_col_idx = -1;
+    int32_t delete_sign_idx = -1;
+    // in key column compaction, seq_col real index is _block->columns() -2
+    // and delete_sign column is _block->columns() - 1
+    if (tablet_schema->keys_type() == KeysType::UNIQUE_KEYS) {
+        if (tablet_schema->has_sequence_col()) {
+            sequence_col_idx = tablet_schema->sequence_col_idx();
+            key_columns.emplace_back(sequence_col_idx);
+        }
+        delete_sign_idx = tablet_schema->field_index(DELETE_SIGN);
+        key_columns.emplace_back(delete_sign_idx);

Review Comment:
   There is not delete sign idx in some table, e.g. created in an older doris.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [doris] dataroaring commented on a diff in pull request #13359: [feature](vertical_compaction) support vertical compaction

Posted by GitBox <gi...@apache.org>.
dataroaring commented on code in PR #13359:
URL: https://github.com/apache/doris/pull/13359#discussion_r1009070950


##########
be/src/olap/compaction.cpp:
##########
@@ -112,14 +113,50 @@ Status Compaction::quick_rowsets_compact() {
 
 Status Compaction::do_compaction(int64_t permits) {
     TRACE("start to do compaction");
+    uint32_t checksum_before;
+    uint32_t checksum_after;
+    if (config::enable_compaction_checksum) {
+        EngineChecksumTask checksum_task(_tablet->tablet_id(), _tablet->schema_hash(),
+                                         _input_rowsets.back()->end_version(), &checksum_before);
+        checksum_task.execute();
+    }
+
     _tablet->data_dir()->disks_compaction_score_increment(permits);
     _tablet->data_dir()->disks_compaction_num_increment(1);
     Status st = do_compaction_impl(permits);
     _tablet->data_dir()->disks_compaction_score_increment(-permits);
     _tablet->data_dir()->disks_compaction_num_increment(-1);
+
+    if (config::enable_compaction_checksum) {
+        EngineChecksumTask checksum_task(_tablet->tablet_id(), _tablet->schema_hash(),

Review Comment:
   EngineChecksumTask does not do merge for unique key, so it does not work for now?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [doris] morningman commented on pull request #13359: [feature](vertical_compaction) support vertical compaction

Posted by GitBox <gi...@apache.org>.
morningman commented on PR #13359:
URL: https://github.com/apache/doris/pull/13359#issuecomment-1298232389

   Hi @yixiutt , I think this is a breaking change to Doris core feature, so I created a new branch:
   https://github.com/apache/doris/tree/compaction_opt for this feature dev.
   
   And I have pushed the PR: opt compaction task producer and quick compaction (#13495) to it.
   I will close this PR, and please push this PR to branch `compaction_opt` for testing
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [doris] zhannngchen commented on a diff in pull request #13359: [feature](vertical_compaction) support vertical compaction

Posted by GitBox <gi...@apache.org>.
zhannngchen commented on code in PR #13359:
URL: https://github.com/apache/doris/pull/13359#discussion_r1007853684


##########
be/src/vec/olap/vertical_merge_iterator.h:
##########
@@ -0,0 +1,314 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "common/status.h"
+#include "olap/iterators.h"
+#include "olap/schema.h"
+#include "vec/columns/columns_number.h"
+#include "vec/core/block.h"
+
+#pragma once
+
+namespace doris {
+
+namespace vectorized {
+
+// Row source represent row location in multi-segments
+// use a uint16_t to store info
+// the lower 15 bits means segment_id in segment pool, and the higher 1 bits means agg flag.
+// In unique-key, agg flags means this key should be deleted, this comes from two way: old version
+// key or delete_sign.
+class RowSource {
+public:
+    RowSource(uint16_t data) : _data(data) {}
+    RowSource(uint16_t source_num, bool agg_flag);
+
+    uint16_t get_source_num();
+    bool agg_flag();
+    void set_agg_flag(bool agg_flag);
+    uint16_t data() const;
+
+private:
+    uint16_t _data;
+    static const uint16_t SOURCE_FLAG = 0x7FFF;
+    static const uint16_t AGG_FLAG = 0x8000;
+};
+
+/* rows source buffer
+this buffer should have a memory limit, once reach memory limit, write
+buffer data to tmp file.
+usage:
+    RowSourcesBuffer buffer(tablet_id, tablet_storage_path, reader_type);
+    buffer.append()
+    buffer.append()
+    buffer.flush()
+    buffer.seek_to_begin()
+    while (buffer.has_remaining().ok()) {
+        auto cur = buffer.current().get_source_num();
+        auto same = buffer.same_source_count(cur, limit);
+        // do copy block data
+        buffer.advance(same);
+    }
+*/
+class RowSourcesBuffer {
+public:
+    RowSourcesBuffer(int64_t tablet_id, const std::string& tablet_path, ReaderType reader_type)
+            : _tablet_id(tablet_id),
+              _tablet_path(tablet_path),
+              _reader_type(reader_type),
+              _buffer(ColumnUInt16::create()) {}
+
+    ~RowSourcesBuffer() {
+        _reset_buffer();
+        if (_fd > 0) {
+            ::close(_fd);
+        }
+    }
+
+    // write batch row source
+    Status append(const std::vector<RowSource>& row_sources);
+    Status flush();
+
+    RowSource current() {
+        DCHECK(_buf_idx < _buffer->size());
+        return RowSource(_buffer->get_element(_buf_idx));
+    }
+    void advance(int32_t step = 1) {
+        DCHECK(_buf_idx + step <= _buffer->size());
+        _buf_idx += step;
+    }
+
+    uint64_t buf_idx() { return _buf_idx; }
+    uint64_t total_size() { return _total_size; }
+    uint64_t buffered_size() { return _buffer->size(); }
+    void set_agg_flag(uint64_t index, bool agg);
+
+    Status has_remaining();
+
+    Status seek_to_begin();
+
+    size_t same_source_count(uint16_t source, size_t limit);
+
+private:
+    Status _create_buffer_file();
+    Status _serialize();
+    Status _deserialize();
+    void _reset_buffer() {
+        _buffer->clear();
+        _buf_idx = 0;
+    }
+
+private:
+    int64_t _tablet_id;
+    std::string _tablet_path;
+    ReaderType _reader_type;
+    uint64_t _buf_idx = 0;
+    int _fd = -1;
+    ColumnUInt16::MutablePtr _buffer;
+    uint64_t _total_size = 0;
+};
+
+// --------------- VerticalMergeIteratorContext ------------- //
+// takes ownership of rowwise iterator
+class VerticalMergeIteratorContext {
+public:
+    VerticalMergeIteratorContext(RowwiseIterator* iter, size_t ori_return_cols, uint32_t order,
+                                 uint32_t seq_col_idx)
+            : _iter(iter),
+              _ori_return_cols(ori_return_cols),
+              _order(order),
+              _seq_col_idx(seq_col_idx),
+              _num_columns(iter->schema().num_column_ids()),
+              _num_key_columns(iter->schema().num_key_columns()) {}
+
+    VerticalMergeIteratorContext(const VerticalMergeIteratorContext&) = delete;
+    VerticalMergeIteratorContext(VerticalMergeIteratorContext&&) = delete;
+    VerticalMergeIteratorContext& operator=(const VerticalMergeIteratorContext&) = delete;
+    VerticalMergeIteratorContext& operator=(VerticalMergeIteratorContext&&) = delete;
+
+    ~VerticalMergeIteratorContext() {
+        delete _iter;
+        _iter = nullptr;
+    }
+    Status block_reset(const std::shared_ptr<Block>& block);
+    Status init(const StorageReadOptions& opts);
+    bool compare(const VerticalMergeIteratorContext& rhs) const;
+    void copy_rows(Block* block, bool advanced = true);
+    void copy_rows(Block* block, size_t count);
+
+    Status advance();
+
+    // Return if it has remaining data in this context.
+    // Only when this function return true, current_row()
+    // will return a valid row
+    bool valid() const { return _valid; }
+
+    uint32_t order() const { return _order; }
+
+    void set_is_same(bool is_same) const { _is_same = is_same; }
+
+    bool is_same() { return _is_same; }
+
+    void add_cur_batch() { _cur_batch_num++; }
+
+    void reset_cur_batch() { _cur_batch_num = 0; }

Review Comment:
   This method is not used?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [doris] dataroaring commented on a diff in pull request #13359: [feature](vertical_compaction) support vertical compaction

Posted by GitBox <gi...@apache.org>.
dataroaring commented on code in PR #13359:
URL: https://github.com/apache/doris/pull/13359#discussion_r1007814703


##########
be/src/common/config.h:
##########
@@ -246,12 +246,20 @@ CONF_Bool(enable_storage_vectorization, "true");
 CONF_Bool(enable_low_cardinality_optimize, "true");
 
 // be policy
+// whether check compaction checksum
+CONF_mBool(enable_compaction_checksum, "false");
 // whether disable automatic compaction task
 CONF_mBool(disable_auto_compaction, "false");
 // whether enable vectorized compaction
 CONF_Bool(enable_vectorized_compaction, "true");
 // whether enable vectorized schema change/material-view/rollup task.
 CONF_Bool(enable_vectorized_alter_table, "true");
+// whether enable vertical compaction
+CONF_mBool(enable_vertical_compaction, "false");
+// In vertical compaction, column number for every group
+CONF_Int32(vertical_compaction_num_columns_per_group, "5");

Review Comment:
   This config can be changed online and it works according code in merger.cpp



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [doris] dataroaring commented on a diff in pull request #13359: [feature](vertical_compaction) support vertical compaction

Posted by GitBox <gi...@apache.org>.
dataroaring commented on code in PR #13359:
URL: https://github.com/apache/doris/pull/13359#discussion_r1009127913


##########
be/src/olap/rowset/beta_rowset_reader.cpp:
##########
@@ -36,7 +36,16 @@ BetaRowsetReader::BetaRowsetReader(BetaRowsetSharedPtr rowset)
     _rowset->acquire();
 }
 
-Status BetaRowsetReader::init(RowsetReaderContext* read_context) {
+void BetaRowsetReader::reset_read_options() {
+    _read_options.delete_condition_predicates = std::make_shared<AndBlockColumnPredicate>();
+    _read_options.column_predicates.clear();
+    _read_options.col_id_to_predicates.clear();
+    _read_options.col_id_to_del_predicates.clear();
+    _read_options.key_ranges.clear();
+}
+
+Status BetaRowsetReader::get_segment_iterators(RowsetReaderContext* read_context,
+                                               std::vector<RowwiseIterator*>* out_iters) {
     RETURN_NOT_OK(_rowset->load());
     _context = read_context;

Review Comment:
   This assignment is duplicated with init? BTW, get methods should not set member.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [doris] dataroaring commented on a diff in pull request #13359: [feature](vertical_compaction) support vertical compaction

Posted by GitBox <gi...@apache.org>.
dataroaring commented on code in PR #13359:
URL: https://github.com/apache/doris/pull/13359#discussion_r1009221836


##########
be/src/olap/rowset/segment_v2/segment_writer.cpp:
##########
@@ -299,11 +322,12 @@ uint64_t SegmentWriter::estimate_segment_size() {
     return size;
 }
 
-Status SegmentWriter::finalize(uint64_t* segment_file_size, uint64_t* index_size) {
-    // check disk capacity
-    if (_data_dir != nullptr && _data_dir->reach_capacity_limit((int64_t)estimate_segment_size())) {
-        return Status::InternalError("disk {} exceed capacity limit.", _data_dir->path_hash());
+Status SegmentWriter::finalize_columns(uint64_t* index_size) {
+    if (_has_key) {

Review Comment:
   Please add comment for usage. e.g. appendkey finalize_columns  or we can add a method finalize_key_columns and it takes row_count as a argument.  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [doris] morningman closed pull request #13359: [feature](vertical_compaction) support vertical compaction

Posted by GitBox <gi...@apache.org>.
morningman closed pull request #13359: [feature](vertical_compaction) support vertical compaction
URL: https://github.com/apache/doris/pull/13359


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org