You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by GitBox <gi...@apache.org> on 2022/10/28 09:14:43 UTC

[GitHub] [doris] zhannngchen commented on a diff in pull request #13359: [feature](vertical_compaction) support vertical compaction

zhannngchen commented on code in PR #13359:
URL: https://github.com/apache/doris/pull/13359#discussion_r1006574754


##########
be/src/olap/rowset/segment_v2/segment_writer.cpp:
##########
@@ -299,11 +322,12 @@ uint64_t SegmentWriter::estimate_segment_size() {
     return size;
 }
 
-Status SegmentWriter::finalize(uint64_t* segment_file_size, uint64_t* index_size) {
-    // check disk capacity
-    if (_data_dir != nullptr && _data_dir->reach_capacity_limit((int64_t)estimate_segment_size())) {
-        return Status::InternalError("disk {} exceed capacity limit.", _data_dir->path_hash());
+Status SegmentWriter::finalize_columns(uint64_t* index_size) {
+    if (_has_key) {
+        _row_count = _num_rows_written;
     }
+    _num_rows_written = 0;

Review Comment:
   should have a check here?
   ```
       if (_has_key) {
            _row_count = _num_rows_written;
        } else {
            CHECK_EQ(_row_count, _num_rows_written);
        }
   ```



##########
regression-test/suites/compaction/test_vertical_compaction_uniq_keys.groovy:
##########
@@ -0,0 +1,242 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.codehaus.groovy.runtime.IOGroovyMethods
+
+suite("test_vertical_compaction_uniq_keys") {
+    def tableName = "vertical_compaction_uniq_keys_regression_test"
+
+    def set_be_config = { ->
+        String[][] backends = sql """ show backends; """
+        assertTrue(backends.size() > 0)
+        for (String[] backend in backends) {
+            StringBuilder setConfigCommand = new StringBuilder();
+            setConfigCommand.append("curl -X POST http://")
+            setConfigCommand.append(backend[2])
+            setConfigCommand.append(":")
+            setConfigCommand.append(backend[5])
+            setConfigCommand.append("/api/update_config?")
+            String command1 = setConfigCommand.toString() + "enable_vertical_compaction=true"
+            logger.info(command1)
+            def process1 = command1.execute()
+            int code = process1.waitFor()
+            assertEquals(code, 0)
+        }
+    }
+    def reset_be_config = { ->
+        String[][] backends = sql """ show backends; """
+        assertTrue(backends.size() > 0)
+        for (String[] backend in backends) {
+            StringBuilder setConfigCommand = new StringBuilder();
+            setConfigCommand.append("curl -X POST http://")
+            setConfigCommand.append(backend[2])
+            setConfigCommand.append(":")
+            setConfigCommand.append(backend[5])
+            setConfigCommand.append("/api/update_config?")
+            String command1 = setConfigCommand.toString() + "enable_vertical_compaction=false"
+            logger.info(command1)
+            def process1 = command1.execute()
+            int code = process1.waitFor()
+            assertEquals(code, 0)
+        }
+    }
+
+    try {
+        //BackendId,Cluster,IP,HeartbeatPort,BePort,HttpPort,BrpcPort,LastStartTime,LastHeartbeat,Alive,SystemDecommissioned,ClusterDecommissioned,TabletNum,DataUsedCapacity,AvailCapacity,TotalCapacity,UsedPct,MaxDiskUsedPct,Tag,ErrMsg,Version,Status
+        String[][] backends = sql """ show backends; """
+        assertTrue(backends.size() > 0)
+        String backend_id;
+        def backendId_to_backendIP = [:]
+        def backendId_to_backendHttpPort = [:]
+        for (String[] backend in backends) {
+            backendId_to_backendIP.put(backend[0], backend[2])
+            backendId_to_backendHttpPort.put(backend[0], backend[5])
+        }
+
+        backend_id = backendId_to_backendIP.keySet()[0]
+        StringBuilder showConfigCommand = new StringBuilder();
+        showConfigCommand.append("curl -X GET http://")
+        showConfigCommand.append(backendId_to_backendIP.get(backend_id))
+        showConfigCommand.append(":")
+        showConfigCommand.append(backendId_to_backendHttpPort.get(backend_id))
+        showConfigCommand.append("/api/show_config")
+        logger.info(showConfigCommand.toString())
+        def process = showConfigCommand.toString().execute()
+        int code = process.waitFor()
+        String err = IOGroovyMethods.getText(new BufferedReader(new InputStreamReader(process.getErrorStream())));
+        String out = process.getText()
+        logger.info("Show config: code=" + code + ", out=" + out + ", err=" + err)
+        assertEquals(code, 0)
+        def configList = parseJson(out.trim())
+        assert configList instanceof List
+
+        boolean disableAutoCompaction = true
+        for (Object ele in (List) configList) {
+            assert ele instanceof List<String>
+            if (((List<String>) ele)[0] == "disable_auto_compaction") {
+                disableAutoCompaction = Boolean.parseBoolean(((List<String>) ele)[2])
+            }
+        }
+        set_be_config.call()
+
+        sql """ DROP TABLE IF EXISTS ${tableName} """
+        sql """
+            CREATE TABLE ${tableName} (
+                `user_id` LARGEINT NOT NULL COMMENT "用户id",
+                `date` DATE NOT NULL COMMENT "数据灌入日期时间",
+                `datev2` DATEV2 NOT NULL COMMENT "数据灌入日期时间",
+                `datetimev2_1` DATETIMEV2(3) NOT NULL COMMENT "数据灌入日期时间",
+                `datetimev2_2` DATETIMEV2(6) NOT NULL COMMENT "数据灌入日期时间",
+                `city` VARCHAR(20) COMMENT "用户所在城市",
+                `age` SMALLINT COMMENT "用户年龄",
+                `sex` TINYINT COMMENT "用户性别",
+                `last_visit_date` DATETIME DEFAULT "1970-01-01 00:00:00" COMMENT "用户最后一次访问时间",
+                `last_update_date` DATETIME DEFAULT "1970-01-01 00:00:00" COMMENT "用户最后一次更新时间",
+                `datetime_val1` DATETIMEV2(3) DEFAULT "1970-01-01 00:00:00.111" COMMENT "用户最后一次访问时间",
+                `datetime_val2` DATETIME(6) DEFAULT "1970-01-01 00:00:00" COMMENT "用户最后一次更新时间",
+                `last_visit_date_not_null` DATETIME NOT NULL DEFAULT "1970-01-01 00:00:00" COMMENT "用户最后一次访问时间",
+                `cost` BIGINT DEFAULT "0" COMMENT "用户总消费",
+                `max_dwell_time` INT DEFAULT "0" COMMENT "用户最大停留时间",
+                `min_dwell_time` INT DEFAULT "99999" COMMENT "用户最小停留时间")
+            UNIQUE KEY(`user_id`, `date`, `datev2`, `datetimev2_1`, `datetimev2_2`, `city`, `age`, `sex`) DISTRIBUTED BY HASH(`user_id`)
+            PROPERTIES ( "replication_num" = "1" );
+        """
+
+        sql """ INSERT INTO ${tableName} VALUES
+             (1, '2017-10-01', '2017-10-01', '2017-10-01 11:11:11.110000', '2017-10-01 11:11:11.110111', 'Beijing', 10, 1, '2020-01-01', '2020-01-01', '2017-10-01 11:11:11.170000', '2017-10-01 11:11:11.110111', '2020-01-01', 1, 30, 20)
+            """
+
+        sql """ INSERT INTO ${tableName} VALUES
+             (1, '2017-10-01', '2017-10-01', '2017-10-01 11:11:11.110000', '2017-10-01 11:11:11.110111', 'Beijing', 10, 1, '2020-01-02', '2020-01-02', '2017-10-01 11:11:11.160000', '2017-10-01 11:11:11.100111', '2020-01-02', 1, 31, 19)

Review Comment:
   don't consider delete sign and delete query?



##########
be/src/olap/compaction.cpp:
##########
@@ -112,14 +113,50 @@ Status Compaction::quick_rowsets_compact() {
 
 Status Compaction::do_compaction(int64_t permits) {
     TRACE("start to do compaction");
+    uint32_t checksum_before;
+    uint32_t checksum_after;
+    if (config::enable_compaction_checksum) {
+        EngineChecksumTask checksum_task(_tablet->tablet_id(), _tablet->schema_hash(),
+                                         _input_rowsets.back()->end_version(), &checksum_before);
+        checksum_task.execute();
+    }
+
     _tablet->data_dir()->disks_compaction_score_increment(permits);
     _tablet->data_dir()->disks_compaction_num_increment(1);
     Status st = do_compaction_impl(permits);
     _tablet->data_dir()->disks_compaction_score_increment(-permits);
     _tablet->data_dir()->disks_compaction_num_increment(-1);
+
+    if (config::enable_compaction_checksum) {
+        EngineChecksumTask checksum_task(_tablet->tablet_id(), _tablet->schema_hash(),
+                                         _input_rowsets.back()->end_version(), &checksum_after);
+        checksum_task.execute();
+        if (checksum_before != checksum_after) {

Review Comment:
   Should use DCHECK here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org