You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by ya...@apache.org on 2022/06/15 13:48:42 UTC

[incubator-doris] branch master updated: [Feature] compaction quickly for small data import (#9804)

This is an automated email from the ASF dual-hosted git repository.

yangzhg pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 4dfebb9852 [Feature] compaction quickly for small data import (#9804)
4dfebb9852 is described below

commit 4dfebb98521e0b774ceebcbb7ed6995d32350093
Author: chenlinzhong <49...@qq.com>
AuthorDate: Wed Jun 15 21:48:34 2022 +0800

    [Feature] compaction quickly for small data import (#9804)
    
    * compaction quickly for small data import #9791
    1.merge small versions of rowset as soon as possible to increase the import frequency of small version data
    2.small version means that the number of rows is less than config::small_compaction_rowset_rows  default 1000
---
 be/src/agent/task_worker_pool.cpp                | 28 ++++++++++++-
 be/src/common/config.h                           | 12 ++++++
 be/src/common/status.h                           |  8 ++--
 be/src/olap/compaction.cpp                       | 51 ++++++++++++++++++++++++
 be/src/olap/compaction.h                         |  1 +
 be/src/olap/delta_writer.cpp                     |  6 +++
 be/src/olap/olap_server.cpp                      | 22 ++++++++++
 be/src/olap/storage_engine.h                     |  4 ++
 be/src/olap/tablet.cpp                           | 51 +++++++++++++++++++++++-
 be/src/olap/tablet.h                             |  9 ++++-
 be/src/olap/task/engine_publish_version_task.cpp | 10 ++++-
 be/src/olap/task/engine_publish_version_task.h   |  4 +-
 12 files changed, 195 insertions(+), 11 deletions(-)

diff --git a/be/src/agent/task_worker_pool.cpp b/be/src/agent/task_worker_pool.cpp
index c29a65cdc6..79953f6709 100644
--- a/be/src/agent/task_worker_pool.cpp
+++ b/be/src/agent/task_worker_pool.cpp
@@ -692,11 +692,13 @@ void TaskWorkerPool::_publish_version_worker_thread_callback() {
         VLOG_NOTICE << "get publish version task, signature:" << agent_task_req.signature;
 
         std::vector<TTabletId> error_tablet_ids;
+        std::vector<TTabletId> succ_tablet_ids;
         uint32_t retry_time = 0;
         Status res = Status::OK();
         while (retry_time < PUBLISH_VERSION_MAX_RETRY) {
             error_tablet_ids.clear();
-            EnginePublishVersionTask engine_task(publish_version_req, &error_tablet_ids);
+            EnginePublishVersionTask engine_task(publish_version_req, &error_tablet_ids,
+                                                 &succ_tablet_ids);
             res = _env->storage_engine()->execute_task(&engine_task);
             if (res.ok()) {
                 break;
@@ -718,7 +720,29 @@ void TaskWorkerPool::_publish_version_worker_thread_callback() {
                          << ", error_code=" << res;
             finish_task_request.__set_error_tablet_ids(error_tablet_ids);
         } else {
-            LOG(INFO) << "publish_version success. signature:" << agent_task_req.signature;
+            int submit_tablets = 0;
+            if (config::enable_quick_compaction && config::quick_compaction_batch_size > 0) {
+                for (int i = 0; i < succ_tablet_ids.size(); i++) {
+                    TabletSharedPtr tablet =
+                            StorageEngine::instance()->tablet_manager()->get_tablet(
+                                    succ_tablet_ids[i]);
+                    if (tablet != nullptr) {
+                        submit_tablets++;
+                        tablet->publised_count++;
+                        if (tablet->publised_count % config::quick_compaction_batch_size == 0) {
+                            StorageEngine::instance()->submit_quick_compaction_task(tablet);
+                            LOG(INFO) << "trigger quick compaction succ, tabletid:"
+                                      << succ_tablet_ids[i]
+                                      << ", publised:" << tablet->publised_count;
+                        }
+                    } else {
+                        LOG(WARNING) << "trigger quick compaction failed, tabletid:"
+                                     << succ_tablet_ids[i];
+                    }
+                }
+                LOG(INFO) << "publish_version success. signature:" << agent_task_req.signature
+                          << ", size:" << succ_tablet_ids.size();
+            }
         }
 
         res.to_thrift(&finish_task_request.task_status);
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 0ae23127e7..8a922f076e 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -300,6 +300,9 @@ CONF_mInt32(convert_rowset_thread_num, "0");
 // initial sleep interval in seconds of scan alpha rowset
 CONF_mInt32(scan_alpha_rowset_min_interval_sec, "3");
 
+// This config can be set to limit thread number in  smallcompaction thread pool.
+CONF_mInt32(quick_compaction_max_threads, "10");
+
 // Thread count to do tablet meta checkpoint, -1 means use the data directories count.
 CONF_Int32(max_meta_checkpoint_threads, "-1");
 
@@ -743,6 +746,15 @@ CONF_Int32(parquet_reader_max_buffer_size, "50");
 // if it is lower than a specific threshold, the predicate will be disabled.
 CONF_mInt32(bloom_filter_predicate_check_row_num, "1000");
 
+//whether turn on quick compaction feature
+CONF_Bool(enable_quick_compaction, "false");
+// For continuous versions that rows less than quick_compaction_max_rows will  trigger compaction quickly
+CONF_Int32(quick_compaction_max_rows, "1000");
+// min compaction versions
+CONF_Int32(quick_compaction_batch_size, "10");
+// do compaction min rowsets
+CONF_Int32(quick_compaction_min_rowsets, "10");
+
 } // namespace config
 
 } // namespace doris
diff --git a/be/src/common/status.h b/be/src/common/status.h
index cca2383670..58fdb7d2f4 100644
--- a/be/src/common/status.h
+++ b/be/src/common/status.h
@@ -24,7 +24,7 @@ namespace doris {
 #define APPLY_FOR_ERROR_CODES(M)                                         \
     M(OLAP_SUCCESS, 0, "", false)                                        \
     M(OLAP_ERR_OTHER_ERROR, -1, "", true)                                \
-    M(OLAP_REQUEST_FAILED, -2, "", true)                                 \
+    M(OLAP_REQUEST_FAILED, -2, "", false)                                \
     M(OLAP_ERR_OS_ERROR, -100, "", true)                                 \
     M(OLAP_ERR_DIR_NOT_EXIST, -101, "", true)                            \
     M(OLAP_ERR_FILE_NOT_EXIST, -102, "", true)                           \
@@ -92,7 +92,7 @@ namespace doris {
     M(OLAP_ERR_CE_LOAD_TABLE_ERROR, -303, "", true)                      \
     M(OLAP_ERR_CE_NOT_FINISHED, -304, "", true)                          \
     M(OLAP_ERR_CE_TABLET_ID_EXIST, -305, "", true)                       \
-    M(OLAP_ERR_CE_TRY_CE_LOCK_ERROR, -306, "", true)                     \
+    M(OLAP_ERR_CE_TRY_CE_LOCK_ERROR, -306, "", false)                    \
     M(OLAP_ERR_TABLE_VERSION_DUPLICATE_ERROR, -400, "", true)            \
     M(OLAP_ERR_TABLE_VERSION_INDEX_MISMATCH_ERROR, -401, "", true)       \
     M(OLAP_ERR_TABLE_INDEX_VALIDATE_ERROR, -402, "", true)               \
@@ -176,8 +176,8 @@ namespace doris {
     M(OLAP_ERR_HEADER_LOAD_JSON_HEADER, -1410, "", true)                 \
     M(OLAP_ERR_HEADER_INIT_FAILED, -1411, "", true)                      \
     M(OLAP_ERR_HEADER_PB_PARSE_FAILED, -1412, "", true)                  \
-    M(OLAP_ERR_HEADER_HAS_PENDING_DATA, -1413, "", true)                 \
-    M(OLAP_ERR_SCHEMA_SCHEMA_INVALID, -1500, "", true)                   \
+    M(OLAP_ERR_HEADER_HAS_PENDING_DATA, -1413, "", false)                \
+    M(OLAP_ERR_SCHEMA_SCHEMA_INVALID, -1500, "", false)                  \
     M(OLAP_ERR_SCHEMA_SCHEMA_FIELD_INVALID, -1501, "", true)             \
     M(OLAP_ERR_ALTER_MULTI_TABLE_ERR, -1600, "", true)                   \
     M(OLAP_ERR_ALTER_DELTA_DOES_NOT_EXISTS, -1601, "", true)             \
diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp
index c279ac7d62..3ac480194e 100644
--- a/be/src/olap/compaction.cpp
+++ b/be/src/olap/compaction.cpp
@@ -55,6 +55,57 @@ Status Compaction::execute_compact() {
     return st;
 }
 
+Status Compaction::quick_rowsets_compact() {
+    std::unique_lock<std::mutex> lock(_tablet->get_cumulative_compaction_lock(), std::try_to_lock);
+    if (!lock.owns_lock()) {
+        LOG(WARNING) << "The tablet is under cumulative compaction. tablet="
+                     << _tablet->full_name();
+        return Status::OLAPInternalError(OLAP_ERR_CE_TRY_CE_LOCK_ERROR);
+    }
+
+    // Clone task may happen after compaction task is submitted to thread pool, and rowsets picked
+    // for compaction may change. In this case, current compaction task should not be executed.
+    if (_tablet->get_clone_occurred()) {
+        _tablet->set_clone_occurred(false);
+        return Status::OLAPInternalError(OLAP_ERR_CUMULATIVE_CLONE_OCCURRED);
+    }
+
+    _input_rowsets.clear();
+    int version_count = _tablet->version_count();
+    MonotonicStopWatch watch;
+    watch.start();
+    int64_t permits = 0;
+    _tablet->pick_quick_compaction_rowsets(&_input_rowsets, &permits);
+    std::vector<Version> missedVersions;
+    find_longest_consecutive_version(&_input_rowsets, &missedVersions);
+    if (missedVersions.size() != 0) {
+        LOG(WARNING) << "quick_rowsets_compaction, find missed version"
+                     << ",input_size:" << _input_rowsets.size();
+    }
+    int nums = _input_rowsets.size();
+    if (_input_rowsets.size() >= config::quick_compaction_min_rowsets) {
+        Status st = check_version_continuity(_input_rowsets);
+        if (!st.ok()) {
+            LOG(WARNING) << "quick_rowsets_compaction failed, cause version not continuous";
+            return st;
+        }
+        st = do_compaction(permits);
+        if (!st.ok()) {
+            gc_output_rowset();
+            LOG(WARNING) << "quick_rowsets_compaction failed";
+        } else {
+            LOG(INFO) << "quick_compaction succ"
+                      << ", before_versions:" << version_count
+                      << ", after_versions:" << _tablet->version_count()
+                      << ", cost:" << (watch.elapsed_time() / 1000 / 1000) << "ms"
+                      << ", merged: " << nums << ", batch:" << config::quick_compaction_batch_size
+                      << ", segments:" << permits << ", tabletid:" << _tablet->tablet_id();
+            _tablet->set_last_quick_compaction_success_time(UnixMillis());
+        }
+    }
+    return Status::OK();
+}
+
 Status Compaction::do_compaction(int64_t permits) {
     TRACE("start to do compaction");
     _tablet->data_dir()->disks_compaction_score_increment(permits);
diff --git a/be/src/olap/compaction.h b/be/src/olap/compaction.h
index bd37cae275..c70a82defa 100644
--- a/be/src/olap/compaction.h
+++ b/be/src/olap/compaction.h
@@ -48,6 +48,7 @@ public:
 
     // This is only for http CompactionAction
     Status compact();
+    Status quick_rowsets_compact();
 
     virtual Status prepare_compact() = 0;
     Status execute_compact();
diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp
index 132a79a390..40f2851655 100644
--- a/be/src/olap/delta_writer.cpp
+++ b/be/src/olap/delta_writer.cpp
@@ -17,6 +17,8 @@
 
 #include "olap/delta_writer.h"
 
+#include "olap/base_compaction.h"
+#include "olap/cumulative_compaction.h"
 #include "olap/data_dir.h"
 #include "olap/memtable.h"
 #include "olap/memtable_flush_executor.h"
@@ -98,6 +100,10 @@ Status DeltaWriter::init() {
             MemTracker::create_tracker(-1, "DeltaWriter:" + std::to_string(_tablet->tablet_id()));
     // check tablet version number
     if (_tablet->version_count() > config::max_tablet_version_num) {
+        //trigger quick compaction
+        if (config::enable_quick_compaction) {
+            StorageEngine::instance()->submit_quick_compaction_task(_tablet);
+        }
         LOG(WARNING) << "failed to init delta writer. version count: " << _tablet->version_count()
                      << ", exceed limit: " << config::max_tablet_version_num
                      << ". tablet: " << _tablet->full_name();
diff --git a/be/src/olap/olap_server.cpp b/be/src/olap/olap_server.cpp
index 2ad264d2bc..eb1d47c065 100644
--- a/be/src/olap/olap_server.cpp
+++ b/be/src/olap/olap_server.cpp
@@ -88,6 +88,16 @@ Status StorageEngine::start_bg_threads() {
         LOG(INFO) << "alpha rowset scan thread started";
     }
 
+    ThreadPoolBuilder("CompactionTaskThreadPool")
+            .set_min_threads(max_thread_num)
+            .set_max_threads(max_thread_num)
+            .build(&_compaction_thread_pool);
+
+    ThreadPoolBuilder("SmallCompactionTaskThreadPool")
+            .set_min_threads(config::quick_compaction_max_threads)
+            .set_max_threads(config::quick_compaction_max_threads)
+            .build(&_quick_compaction_thread_pool);
+
     // compaction tasks producer thread
     RETURN_IF_ERROR(Thread::create(
             "StorageEngine", "compaction_tasks_producer_thread",
@@ -661,4 +671,16 @@ Status StorageEngine::submit_compaction_task(TabletSharedPtr tablet,
     return _submit_compaction_task(tablet, compaction_type);
 }
 
+Status StorageEngine::_handle_quick_compaction(TabletSharedPtr tablet) {
+    CumulativeCompaction compact(tablet);
+    compact.quick_rowsets_compact();
+    return Status::OK();
+}
+
+Status StorageEngine::submit_quick_compaction_task(TabletSharedPtr tablet) {
+    _quick_compaction_thread_pool->submit_func(
+            std::bind<void>(&StorageEngine::_handle_quick_compaction, this, tablet));
+    return Status::OK();
+}
+
 } // namespace doris
diff --git a/be/src/olap/storage_engine.h b/be/src/olap/storage_engine.h
index df6e78874b..40d4303dab 100644
--- a/be/src/olap/storage_engine.h
+++ b/be/src/olap/storage_engine.h
@@ -194,6 +194,7 @@ public:
     void check_cumulative_compaction_config();
 
     Status submit_compaction_task(TabletSharedPtr tablet, CompactionType compaction_type);
+    Status submit_quick_compaction_task(TabletSharedPtr tablet);
 
 private:
     // Instance should be inited from `static open()`
@@ -271,6 +272,8 @@ private:
 
     Status _submit_compaction_task(TabletSharedPtr tablet, CompactionType compaction_type);
 
+    Status _handle_quick_compaction(TabletSharedPtr);
+
 private:
     struct CompactionCandidate {
         CompactionCandidate(uint32_t nicumulative_compaction_, int64_t tablet_id_, uint32_t index_)
@@ -379,6 +382,7 @@ private:
     HeartbeatFlags* _heartbeat_flags;
 
     std::unique_ptr<ThreadPool> _compaction_thread_pool;
+    std::unique_ptr<ThreadPool> _quick_compaction_thread_pool;
 
     scoped_refptr<Thread> _alpha_rowset_scan_thread;
     std::unique_ptr<ThreadPool> _convert_rowset_thread_pool;
diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp
index 405e6acdc5..85d8f7e4ad 100644
--- a/be/src/olap/tablet.cpp
+++ b/be/src/olap/tablet.cpp
@@ -872,10 +872,59 @@ void Tablet::calculate_cumulative_point() {
     if (ret_cumulative_point == K_INVALID_CUMULATIVE_POINT) {
         return;
     }
-
     set_cumulative_layer_point(ret_cumulative_point);
 }
 
+//find rowsets that rows less then "config::quick_compaction_max_rows"
+Status Tablet::pick_quick_compaction_rowsets(std::vector<RowsetSharedPtr>* input_rowsets,
+                                             int64_t* permits) {
+    int max_rows = config::quick_compaction_max_rows;
+    if (!config::enable_quick_compaction || max_rows <= 0) {
+        return Status::OK();
+    }
+    if (!init_succeeded()) {
+        return Status::OLAPInternalError(OLAP_ERR_CUMULATIVE_INVALID_PARAMETERS);
+    }
+    int max_series_num = 1000;
+
+    std::vector<std::vector<RowsetSharedPtr>> quick_compaction_rowsets(max_series_num);
+    int idx = 0;
+    std::shared_lock rdlock(_meta_lock);
+    std::vector<RowsetSharedPtr> sortedRowset;
+    for (auto& rs : _rs_version_map) {
+        sortedRowset.push_back(rs.second);
+    }
+    std::sort(sortedRowset.begin(), sortedRowset.end(), Rowset::comparator);
+    if (tablet_state() == TABLET_RUNNING) {
+        for (int i = 0; i < sortedRowset.size(); i++) {
+            bool is_delete = version_for_delete_predicate(sortedRowset[i]->version());
+            if (!is_delete && sortedRowset[i]->start_version() > 0 &&
+                sortedRowset[i]->start_version() > cumulative_layer_point()) {
+                if (sortedRowset[i]->num_rows() < max_rows) {
+                    quick_compaction_rowsets[idx].push_back(sortedRowset[i]);
+                } else {
+                    idx++;
+                    if (idx > max_series_num) {
+                        break;
+                    }
+                }
+            }
+        }
+        if (quick_compaction_rowsets.size() == 0) return Status::OK();
+        std::vector<RowsetSharedPtr> result = quick_compaction_rowsets[0];
+        for (int i = 0; i < quick_compaction_rowsets.size(); i++) {
+            if (quick_compaction_rowsets[i].size() > result.size()) {
+                result = quick_compaction_rowsets[i];
+            }
+        }
+        for (int i = 0; i < result.size(); i++) {
+            *permits += result[i]->num_segments();
+            input_rowsets->push_back(result[i]);
+        }
+    }
+    return Status::OK();
+}
+
 Status Tablet::split_range(const OlapTuple& start_key_strings, const OlapTuple& end_key_strings,
                            uint64_t request_block_row_count, std::vector<OlapTuple>* ranges) {
     DCHECK(ranges != nullptr);
diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h
index fbc9a57a29..1156de36f1 100644
--- a/be/src/olap/tablet.h
+++ b/be/src/olap/tablet.h
@@ -73,6 +73,8 @@ public:
     // Used in clone task, to update local meta when finishing a clone job
     Status revise_tablet_meta(const std::vector<RowsetMetaSharedPtr>& rowsets_to_clone,
                               const std::vector<Version>& versions_to_delete);
+    Status pick_quick_compaction_rowsets(std::vector<RowsetSharedPtr>* input_rowsets,
+                                         int64_t* permits);
 
     const int64_t cumulative_layer_point() const;
     void set_cumulative_layer_point(int64_t new_point);
@@ -190,6 +192,10 @@ public:
         _last_cumu_compaction_success_millis = millis;
     }
 
+    void set_last_quick_compaction_success_time(int64_t millis) {
+        _last_quick_compaction_success_time_millis = millis;
+    }
+
     int64_t last_base_compaction_success_time() { return _last_base_compaction_success_millis; }
     void set_last_base_compaction_success_time(int64_t millis) {
         _last_base_compaction_success_millis = millis;
@@ -337,7 +343,7 @@ private:
     std::atomic<int64_t> _last_cumu_compaction_success_millis;
     // timestamp of last base compaction success
     std::atomic<int64_t> _last_base_compaction_success_millis;
-
+    std::atomic<int64_t> _last_quick_compaction_success_time_millis;
     std::atomic<int64_t> _cumulative_point;
     std::atomic<int32_t> _newly_created_rowset_num;
     std::atomic<int64_t> _last_checkpoint_time;
@@ -367,6 +373,7 @@ private:
 public:
     IntCounter* flush_bytes;
     IntCounter* flush_count;
+    std::atomic<int64_t> publised_count = 0;
 };
 
 inline CumulativeCompactionPolicy* Tablet::cumulative_compaction_policy() {
diff --git a/be/src/olap/task/engine_publish_version_task.cpp b/be/src/olap/task/engine_publish_version_task.cpp
index de4d6449f2..5c7397c8ad 100644
--- a/be/src/olap/task/engine_publish_version_task.cpp
+++ b/be/src/olap/task/engine_publish_version_task.cpp
@@ -28,8 +28,11 @@ namespace doris {
 using std::map;
 
 EnginePublishVersionTask::EnginePublishVersionTask(TPublishVersionRequest& publish_version_req,
-                                                   std::vector<TTabletId>* error_tablet_ids)
-        : _publish_version_req(publish_version_req), _error_tablet_ids(error_tablet_ids) {}
+                                                   std::vector<TTabletId>* error_tablet_ids,
+                                                   std::vector<TTabletId>* succ_tablet_ids)
+        : _publish_version_req(publish_version_req),
+          _error_tablet_ids(error_tablet_ids),
+          _succ_tablet_ids(succ_tablet_ids) {}
 
 Status EnginePublishVersionTask::finish() {
     Status res = Status::OK();
@@ -106,6 +109,9 @@ Status EnginePublishVersionTask::finish() {
                 res = publish_status;
                 continue;
             }
+            if (_succ_tablet_ids != nullptr) {
+                _succ_tablet_ids->push_back(tablet_info.tablet_id);
+            }
             partition_related_tablet_infos.erase(tablet_info);
             VLOG_NOTICE << "publish version successfully on tablet. tablet=" << tablet->full_name()
                         << ", transaction_id=" << transaction_id << ", version=" << version.first
diff --git a/be/src/olap/task/engine_publish_version_task.h b/be/src/olap/task/engine_publish_version_task.h
index 2601ed8f60..4086f466d3 100644
--- a/be/src/olap/task/engine_publish_version_task.h
+++ b/be/src/olap/task/engine_publish_version_task.h
@@ -27,7 +27,8 @@ namespace doris {
 class EnginePublishVersionTask : public EngineTask {
 public:
     EnginePublishVersionTask(TPublishVersionRequest& publish_version_req,
-                             vector<TTabletId>* error_tablet_ids);
+                             vector<TTabletId>* error_tablet_ids,
+                             std::vector<TTabletId>* succ_tablet_ids = nullptr);
     ~EnginePublishVersionTask() {}
 
     virtual Status finish() override;
@@ -35,6 +36,7 @@ public:
 private:
     const TPublishVersionRequest& _publish_version_req;
     vector<TTabletId>* _error_tablet_ids;
+    vector<TTabletId>* _succ_tablet_ids;
 };
 
 } // namespace doris


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org