You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2022/06/16 08:35:54 UTC

[incubator-doris] branch dev-1.0.1 updated (a504469053 -> 42ce8361ec)

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a change to branch dev-1.0.1
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


    from a504469053 [Opt][Vectorized] Opt vectorized the unique_table in storage vectorized (#10132)
     new db4ab9dc12 [fix] Fix disk used pct only consider the data that used by Doris (#9705)
     new 42ce8361ec [Feature] compaction quickly for small data import (#9804)

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 be/src/agent/task_worker_pool.cpp                  | 28 +++++++++++-
 be/src/common/config.h                             | 12 +++++
 be/src/olap/compaction.cpp                         | 51 +++++++++++++++++++++
 be/src/olap/compaction.h                           |  1 +
 be/src/olap/delta_writer.cpp                       |  6 +++
 be/src/olap/olap_server.cpp                        | 23 ++++++++++
 be/src/olap/storage_engine.h                       |  4 ++
 be/src/olap/tablet.cpp                             | 53 +++++++++++++++++++++-
 be/src/olap/tablet.h                               | 11 ++++-
 be/src/olap/task/engine_publish_version_task.cpp   | 10 +++-
 be/src/olap/task/engine_publish_version_task.h     |  5 +-
 .../java/org/apache/doris/catalog/DiskInfo.java    | 14 ++++--
 .../apache/doris/clone/BackendLoadStatistic.java   |  2 +-
 .../org/apache/doris/clone/RebalancerTestUtil.java |  1 +
 14 files changed, 206 insertions(+), 15 deletions(-)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[incubator-doris] 01/02: [fix] Fix disk used pct only consider the data that used by Doris (#9705)

Posted by mo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch dev-1.0.1
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git

commit db4ab9dc12bc431b090179e546300f18228d7575
Author: ccoffline <45...@users.noreply.github.com>
AuthorDate: Wed Jun 15 16:28:56 2022 +0800

    [fix] Fix disk used pct only consider the data that used by Doris (#9705)
---
 .../src/main/java/org/apache/doris/catalog/DiskInfo.java   | 14 +++++++++-----
 .../java/org/apache/doris/clone/BackendLoadStatistic.java  |  2 +-
 .../java/org/apache/doris/clone/RebalancerTestUtil.java    |  1 +
 3 files changed, 11 insertions(+), 6 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/DiskInfo.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/DiskInfo.java
index 3c26635ee6..e007286219 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/DiskInfo.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/DiskInfo.java
@@ -90,6 +90,10 @@ public class DiskInfo implements Writable {
         this.dataUsedCapacityB = dataUsedCapacityB;
     }
 
+    public long getDiskUsedCapacityB() {
+        return totalCapacityB - diskAvailableCapacityB;
+    }
+
     public long getAvailableCapacityB() {
         return diskAvailableCapacityB;
     }
@@ -99,7 +103,7 @@ public class DiskInfo implements Writable {
     }
 
     public double getUsedPct() {
-        return (totalCapacityB - diskAvailableCapacityB) / (double) (totalCapacityB <= 0 ? 1 : totalCapacityB);
+        return this.getDiskUsedCapacityB() / (double) (totalCapacityB <= 0 ? 1 : totalCapacityB);
     }
 
     public DiskState getState() {
@@ -148,11 +152,11 @@ public class DiskInfo implements Writable {
         LOG.debug("flood stage: {}, diskAvailableCapacityB: {}, totalCapacityB: {}",
                 floodStage, diskAvailableCapacityB, totalCapacityB);
         if (floodStage) {
-            return diskAvailableCapacityB < Config.storage_flood_stage_left_capacity_bytes &&
-                    (double) (totalCapacityB - diskAvailableCapacityB) / totalCapacityB > (Config.storage_flood_stage_usage_percent / 100.0);
+            return diskAvailableCapacityB < Config.storage_flood_stage_left_capacity_bytes
+                && this.getUsedPct() > (Config.storage_flood_stage_usage_percent / 100.0);
         } else {
-            return diskAvailableCapacityB < Config.storage_min_left_capacity_bytes ||
-                    (double) (totalCapacityB - diskAvailableCapacityB) / totalCapacityB > (Config.storage_high_watermark_usage_percent / 100.0);
+            return diskAvailableCapacityB < Config.storage_min_left_capacity_bytes
+                || this.getUsedPct() > (Config.storage_high_watermark_usage_percent / 100.0);
         }
     }
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/BackendLoadStatistic.java b/fe/fe-core/src/main/java/org/apache/doris/clone/BackendLoadStatistic.java
index 8c575d8cb9..94a836173d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/clone/BackendLoadStatistic.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/clone/BackendLoadStatistic.java
@@ -188,7 +188,7 @@ public class BackendLoadStatistic {
 
             RootPathLoadStatistic pathStatistic = new RootPathLoadStatistic(beId, diskInfo.getRootPath(),
                     diskInfo.getPathHash(), diskInfo.getStorageMedium(),
-                    diskInfo.getTotalCapacityB(), diskInfo.getDataUsedCapacityB(), diskInfo.getState());
+                    diskInfo.getTotalCapacityB(), diskInfo.getDiskUsedCapacityB(), diskInfo.getState());
             pathStatistics.add(pathStatistic);
         }
 
diff --git a/fe/fe-core/src/test/java/org/apache/doris/clone/RebalancerTestUtil.java b/fe/fe-core/src/test/java/org/apache/doris/clone/RebalancerTestUtil.java
index 4b3e4c693b..744389951d 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/clone/RebalancerTestUtil.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/clone/RebalancerTestUtil.java
@@ -47,6 +47,7 @@ public class RebalancerTestUtil {
         diskInfo.setPathHash(id);
         diskInfo.setTotalCapacityB(totalCap);
         diskInfo.setDataUsedCapacityB(usedCap);
+        diskInfo.setAvailableCapacityB(totalCap - usedCap);
         disks.put(diskInfo.getRootPath(), diskInfo);
         be.setDisks(ImmutableMap.copyOf(disks));
         be.setAlive(true);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[incubator-doris] 02/02: [Feature] compaction quickly for small data import (#9804)

Posted by mo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch dev-1.0.1
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git

commit 42ce8361ecb69025eecb7270d787b65ad7ac0619
Author: chenlinzhong <49...@qq.com>
AuthorDate: Wed Jun 15 21:48:34 2022 +0800

    [Feature] compaction quickly for small data import (#9804)
    
    * compaction quickly for small data import #9791
    1.merge small versions of rowset as soon as possible to increase the import frequency of small version data
    2.small version means that the number of rows is less than config::small_compaction_rowset_rows  default 1000
---
 be/src/agent/task_worker_pool.cpp                | 28 ++++++++++++-
 be/src/common/config.h                           | 12 ++++++
 be/src/olap/compaction.cpp                       | 51 +++++++++++++++++++++++
 be/src/olap/compaction.h                         |  1 +
 be/src/olap/delta_writer.cpp                     |  6 +++
 be/src/olap/olap_server.cpp                      | 23 ++++++++++
 be/src/olap/storage_engine.h                     |  4 ++
 be/src/olap/tablet.cpp                           | 53 +++++++++++++++++++++++-
 be/src/olap/tablet.h                             | 11 ++++-
 be/src/olap/task/engine_publish_version_task.cpp | 10 ++++-
 be/src/olap/task/engine_publish_version_task.h   |  5 ++-
 11 files changed, 195 insertions(+), 9 deletions(-)

diff --git a/be/src/agent/task_worker_pool.cpp b/be/src/agent/task_worker_pool.cpp
index c7557ea443..55f3a786d9 100644
--- a/be/src/agent/task_worker_pool.cpp
+++ b/be/src/agent/task_worker_pool.cpp
@@ -700,11 +700,13 @@ void TaskWorkerPool::_publish_version_worker_thread_callback() {
 
         Status st;
         std::vector<TTabletId> error_tablet_ids;
+        std::unordered_map<TTabletId, int32_t> succ_tablet_ids;
         uint32_t retry_time = 0;
         OLAPStatus res = OLAP_SUCCESS;
         while (retry_time < PUBLISH_VERSION_MAX_RETRY) {
             error_tablet_ids.clear();
-            EnginePublishVersionTask engine_task(publish_version_req, &error_tablet_ids);
+            EnginePublishVersionTask engine_task(publish_version_req, &error_tablet_ids,
+                                                 &succ_tablet_ids);
             res = _env->storage_engine()->execute_task(&engine_task);
             if (res == OLAP_SUCCESS) {
                 break;
@@ -727,7 +729,29 @@ void TaskWorkerPool::_publish_version_worker_thread_callback() {
             st = Status::RuntimeError(strings::Substitute("publish version failed. error=$0", res));
             finish_task_request.__set_error_tablet_ids(error_tablet_ids);
         } else {
-            LOG(INFO) << "publish_version success. signature:" << agent_task_req.signature;
+            int submit_tablets = 0;
+            if (config::enable_quick_compaction && config::quick_compaction_batch_size > 0) {
+                for (auto& entry : succ_tablet_ids) {
+                    TabletSharedPtr tablet =
+                            StorageEngine::instance()->tablet_manager()->get_tablet(
+                                    entry.first, entry.second);
+                    if (tablet != nullptr) {
+                        submit_tablets++;
+                        tablet->publised_count++;
+                        if (tablet->publised_count % config::quick_compaction_batch_size == 0) {
+                            StorageEngine::instance()->submit_quick_compaction_task(tablet);
+                            LOG(INFO) << "trigger quick compaction succ, tabletid:"
+                                      << entry.first
+                                      << ", published:" << tablet->publised_count;
+                        }
+                    } else {
+                        LOG(WARNING) << "trigger quick compaction failed, tabletid:"
+                                     << entry.first;
+                    }
+                }
+                LOG(INFO) << "publish_version success. signature:" << agent_task_req.signature
+                          << ", size:" << succ_tablet_ids.size();
+            }
         }
 
         st.to_thrift(&finish_task_request.task_status);
diff --git a/be/src/common/config.h b/be/src/common/config.h
index d0f6c563c5..29fed299bc 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -295,6 +295,9 @@ CONF_mInt32(convert_rowset_thread_num, "0");
 // initial sleep interval in seconds of scan alpha rowset
 CONF_mInt32(scan_alpha_rowset_min_interval_sec, "3");
 
+// This config can be set to limit thread number in  smallcompaction thread pool.
+CONF_mInt32(quick_compaction_max_threads, "10");
+
 // Thread count to do tablet meta checkpoint, -1 means use the data directories count.
 CONF_Int32(max_meta_checkpoint_threads, "-1");
 
@@ -712,6 +715,15 @@ CONF_Int32(object_pool_buffer_size, "100");
 // ParquetReaderWrap prefetch buffer size
 CONF_Int32(parquet_reader_max_buffer_size, "50");
 
+//whether turn on quick compaction feature
+CONF_Bool(enable_quick_compaction, "false");
+// For continuous versions that rows less than quick_compaction_max_rows will  trigger compaction quickly
+CONF_Int32(quick_compaction_max_rows, "1000");
+// min compaction versions
+CONF_Int32(quick_compaction_batch_size, "10");
+// do compaction min rowsets
+CONF_Int32(quick_compaction_min_rowsets, "10");
+
 } // namespace config
 
 } // namespace doris
diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp
index c484fd5703..3ebcf65389 100644
--- a/be/src/olap/compaction.cpp
+++ b/be/src/olap/compaction.cpp
@@ -54,6 +54,57 @@ OLAPStatus Compaction::execute_compact() {
     return st;
 }
 
+OLAPStatus Compaction::quick_rowsets_compact() {
+    MutexLock lock_cumulative(_tablet->get_cumulative_lock(), TRY_LOCK);
+    if (!lock_cumulative.own_lock()) {
+        LOG(WARNING) << "The tablet is under cumulative compaction. tablet="
+                     << _tablet->full_name();
+        return OLAP_ERR_CE_TRY_CE_LOCK_ERROR;
+    }
+
+    // Clone task may happen after compaction task is submitted to thread pool, and rowsets picked
+    // for compaction may change. In this case, current compaction task should not be executed.
+    if (_tablet->get_clone_occurred()) {
+        _tablet->set_clone_occurred(false);
+        return OLAP_ERR_CUMULATIVE_CLONE_OCCURRED;
+    }
+
+    _input_rowsets.clear();
+    int version_count = _tablet->version_count();
+    MonotonicStopWatch watch;
+    watch.start();
+    int64_t permits = 0;
+    _tablet->pick_quick_compaction_rowsets(&_input_rowsets, &permits);
+    std::vector<Version> missedVersions;
+    find_longest_consecutive_version(&_input_rowsets, &missedVersions);
+    if (missedVersions.size() != 0) {
+        LOG(WARNING) << "quick_rowsets_compaction, find missed version"
+                     << ",input_size:" << _input_rowsets.size();
+    }
+    int nums = _input_rowsets.size();
+    if (_input_rowsets.size() >= config::quick_compaction_min_rowsets) {
+        OLAPStatus st = check_version_continuity(_input_rowsets);
+        if (st != OLAP_SUCCESS) {
+            LOG(WARNING) << "quick_rowsets_compaction failed, cause version not continuous";
+            return st;
+        }
+        st = do_compaction(permits);
+        if (st != OLAP_SUCCESS) {
+            gc_output_rowset();
+            LOG(WARNING) << "quick_rowsets_compaction failed";
+        } else {
+            LOG(INFO) << "quick_compaction succ"
+                      << ", before_versions:" << version_count
+                      << ", after_versions:" << _tablet->version_count()
+                      << ", cost:" << (watch.elapsed_time() / 1000 / 1000) << "ms"
+                      << ", merged: " << nums << ", batch:" << config::quick_compaction_batch_size
+                      << ", segments:" << permits << ", tabletid:" << _tablet->tablet_id();
+            _tablet->set_last_quick_compaction_success_time(UnixMillis());
+        }
+    }
+    return OLAP_SUCCESS;
+}
+
 OLAPStatus Compaction::do_compaction(int64_t permits) {
     TRACE("start to do compaction");
     _tablet->data_dir()->disks_compaction_score_increment(permits);
diff --git a/be/src/olap/compaction.h b/be/src/olap/compaction.h
index e537997b04..8b77b78a39 100644
--- a/be/src/olap/compaction.h
+++ b/be/src/olap/compaction.h
@@ -50,6 +50,7 @@ public:
 
     // This is only for http CompactionAction
     OLAPStatus compact();
+    OLAPStatus quick_rowsets_compact();
 
     virtual OLAPStatus prepare_compact() = 0;
     OLAPStatus execute_compact();
diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp
index 58b89e5d64..448a90bcd5 100644
--- a/be/src/olap/delta_writer.cpp
+++ b/be/src/olap/delta_writer.cpp
@@ -17,6 +17,8 @@
 
 #include "olap/delta_writer.h"
 
+#include "olap/base_compaction.h"
+#include "olap/cumulative_compaction.h"
 #include "olap/data_dir.h"
 #include "olap/memtable.h"
 #include "olap/memtable_flush_executor.h"
@@ -101,6 +103,10 @@ OLAPStatus DeltaWriter::init() {
                                              _parent_mem_tracker);
     // check tablet version number
     if (_tablet->version_count() > config::max_tablet_version_num) {
+        //trigger quick compaction
+        if (config::enable_quick_compaction) {
+            StorageEngine::instance()->submit_quick_compaction_task(_tablet);
+        }
         LOG(WARNING) << "failed to init delta writer. version count: " << _tablet->version_count()
                      << ", exceed limit: " << config::max_tablet_version_num
                      << ". tablet: " << _tablet->full_name();
diff --git a/be/src/olap/olap_server.cpp b/be/src/olap/olap_server.cpp
index 550879ebfa..c4b157de28 100644
--- a/be/src/olap/olap_server.cpp
+++ b/be/src/olap/olap_server.cpp
@@ -88,6 +88,16 @@ Status StorageEngine::start_bg_threads() {
         LOG(INFO) << "alpha rowset scan thread started";
     }
 
+    ThreadPoolBuilder("CompactionTaskThreadPool")
+            .set_min_threads(max_thread_num)
+            .set_max_threads(max_thread_num)
+            .build(&_compaction_thread_pool);
+
+    ThreadPoolBuilder("SmallCompactionTaskThreadPool")
+            .set_min_threads(config::quick_compaction_max_threads)
+            .set_max_threads(config::quick_compaction_max_threads)
+            .build(&_quick_compaction_thread_pool);
+
     // compaction tasks producer thread
     RETURN_IF_ERROR(Thread::create(
             "StorageEngine", "compaction_tasks_producer_thread",
@@ -650,4 +660,17 @@ Status StorageEngine::submit_compaction_task(TabletSharedPtr tablet, CompactionT
     return _submit_compaction_task(tablet, compaction_type);
 }
 
+Status StorageEngine::_handle_quick_compaction(TabletSharedPtr tablet) {
+    std::shared_ptr<CumulativeCompaction> compact; 
+    StorageEngine::instance()->create_cumulative_compaction(tablet, compact);
+    compact->quick_rowsets_compact();
+    return Status::OK();
+}
+
+Status StorageEngine::submit_quick_compaction_task(TabletSharedPtr tablet) {
+    _quick_compaction_thread_pool->submit_func(
+            std::bind<void>(&StorageEngine::_handle_quick_compaction, this, tablet));
+    return Status::OK();
+}
+
 } // namespace doris
diff --git a/be/src/olap/storage_engine.h b/be/src/olap/storage_engine.h
index 03ee581d59..fff1f952ec 100644
--- a/be/src/olap/storage_engine.h
+++ b/be/src/olap/storage_engine.h
@@ -191,6 +191,7 @@ public:
     void check_cumulative_compaction_config();
 
     Status submit_compaction_task(TabletSharedPtr tablet, CompactionType compaction_type);
+    Status submit_quick_compaction_task(TabletSharedPtr tablet);
 
 private:
     // Instance should be inited from `static open()`
@@ -268,6 +269,8 @@ private:
 
     Status _submit_compaction_task(TabletSharedPtr tablet, CompactionType compaction_type);
 
+    Status _handle_quick_compaction(TabletSharedPtr);
+
 private:
     struct CompactionCandidate {
         CompactionCandidate(uint32_t nicumulative_compaction_, int64_t tablet_id_, uint32_t index_)
@@ -365,6 +368,7 @@ private:
     HeartbeatFlags* _heartbeat_flags;
 
     std::unique_ptr<ThreadPool> _compaction_thread_pool;
+    std::unique_ptr<ThreadPool> _quick_compaction_thread_pool;
 
     scoped_refptr<Thread> _alpha_rowset_scan_thread;
     std::unique_ptr<ThreadPool> _convert_rowset_thread_pool;
diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp
index 6f84b1764f..855177c86a 100644
--- a/be/src/olap/tablet.cpp
+++ b/be/src/olap/tablet.cpp
@@ -882,12 +882,61 @@ void Tablet::calculate_cumulative_point() {
     if (ret_cumulative_point == K_INVALID_CUMULATIVE_POINT) {
         return;
     }
-
     set_cumulative_layer_point(ret_cumulative_point);
 }
 
+//find rowsets that rows less then "config::quick_compaction_max_rows"
+OLAPStatus Tablet::pick_quick_compaction_rowsets(std::vector<RowsetSharedPtr>* input_rowsets,
+                                             int64_t* permits) {
+    int max_rows = config::quick_compaction_max_rows;
+    if (!config::enable_quick_compaction || max_rows <= 0) {
+        return OLAP_SUCCESS;
+    }
+    if (!init_succeeded()) {
+        return OLAP_ERR_CUMULATIVE_INVALID_PARAMETERS;
+    }
+    int max_series_num = 1000;
+
+    std::vector<std::vector<RowsetSharedPtr>> quick_compaction_rowsets(max_series_num);
+    int idx = 0;
+    std::shared_lock rdlock(_meta_lock);
+    std::vector<RowsetSharedPtr> sortedRowset;
+    for (auto& rs : _rs_version_map) {
+        sortedRowset.push_back(rs.second);
+    }
+    std::sort(sortedRowset.begin(), sortedRowset.end(), Rowset::comparator);
+    if (tablet_state() == TABLET_RUNNING) {
+        for (int i = 0; i < sortedRowset.size(); i++) {
+            bool is_delete = version_for_delete_predicate(sortedRowset[i]->version());
+            if (!is_delete && sortedRowset[i]->start_version() > 0 &&
+                sortedRowset[i]->start_version() > cumulative_layer_point()) {
+                if (sortedRowset[i]->num_rows() < max_rows) {
+                    quick_compaction_rowsets[idx].push_back(sortedRowset[i]);
+                } else {
+                    idx++;
+                    if (idx > max_series_num) {
+                        break;
+                    }
+                }
+            }
+        }
+        if (quick_compaction_rowsets.size() == 0) return OLAP_SUCCESS;
+        std::vector<RowsetSharedPtr> result = quick_compaction_rowsets[0];
+        for (int i = 0; i < quick_compaction_rowsets.size(); i++) {
+            if (quick_compaction_rowsets[i].size() > result.size()) {
+                result = quick_compaction_rowsets[i];
+            }
+        }
+        for (int i = 0; i < result.size(); i++) {
+            *permits += result[i]->num_segments();
+            input_rowsets->push_back(result[i]);
+        }
+    }
+    return OLAP_SUCCESS;
+}
+
 OLAPStatus Tablet::split_range(const OlapTuple& start_key_strings, const OlapTuple& end_key_strings,
-                               uint64_t request_block_row_count, std::vector<OlapTuple>* ranges) {
+                           uint64_t request_block_row_count, std::vector<OlapTuple>* ranges) {
     DCHECK(ranges != nullptr);
 
     size_t key_num = 0;
diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h
index 114ca26e72..303780f712 100644
--- a/be/src/olap/tablet.h
+++ b/be/src/olap/tablet.h
@@ -69,7 +69,9 @@ public:
     void save_meta();
     // Used in clone task, to update local meta when finishing a clone job
     OLAPStatus revise_tablet_meta(const std::vector<RowsetMetaSharedPtr>& rowsets_to_clone,
-                                  const std::vector<Version>& versions_to_delete);
+                              const std::vector<Version>& versions_to_delete);
+    OLAPStatus pick_quick_compaction_rowsets(std::vector<RowsetSharedPtr>* input_rowsets,
+                                         int64_t* permits);
 
     inline const int64_t cumulative_layer_point() const;
     inline void set_cumulative_layer_point(int64_t new_point);
@@ -198,6 +200,10 @@ public:
         _last_cumu_compaction_success_millis = millis;
     }
 
+    void set_last_quick_compaction_success_time(int64_t millis) {
+        _last_quick_compaction_success_time_millis = millis;
+    }
+
     int64_t last_base_compaction_success_time() { return _last_base_compaction_success_millis; }
     void set_last_base_compaction_success_time(int64_t millis) {
         _last_base_compaction_success_millis = millis;
@@ -330,7 +336,7 @@ private:
     std::atomic<int64_t> _last_cumu_compaction_success_millis;
     // timestamp of last base compaction success
     std::atomic<int64_t> _last_base_compaction_success_millis;
-
+    std::atomic<int64_t> _last_quick_compaction_success_time_millis;
     std::atomic<int64_t> _cumulative_point;
     std::atomic<int32_t> _newly_created_rowset_num;
     std::atomic<int64_t> _last_checkpoint_time;
@@ -360,6 +366,7 @@ private:
 public:
     IntCounter* flush_bytes;
     IntCounter* flush_count;
+    std::atomic<int64_t> publised_count = 0;
 };
 
 inline CumulativeCompactionPolicy* Tablet::cumulative_compaction_policy() {
diff --git a/be/src/olap/task/engine_publish_version_task.cpp b/be/src/olap/task/engine_publish_version_task.cpp
index 004f0ad195..14a3bd3ce0 100644
--- a/be/src/olap/task/engine_publish_version_task.cpp
+++ b/be/src/olap/task/engine_publish_version_task.cpp
@@ -28,8 +28,11 @@ namespace doris {
 using std::map;
 
 EnginePublishVersionTask::EnginePublishVersionTask(TPublishVersionRequest& publish_version_req,
-                                                   std::vector<TTabletId>* error_tablet_ids)
-        : _publish_version_req(publish_version_req), _error_tablet_ids(error_tablet_ids) {}
+                                                   std::vector<TTabletId>* error_tablet_ids,
+                                                   std::unordered_map<TTabletId, int32_t>* succ_tablet_ids)
+        : _publish_version_req(publish_version_req),
+          _error_tablet_ids(error_tablet_ids),
+          _succ_tablet_ids(succ_tablet_ids) {}
 
 OLAPStatus EnginePublishVersionTask::finish() {
     OLAPStatus res = OLAP_SUCCESS;
@@ -107,6 +110,9 @@ OLAPStatus EnginePublishVersionTask::finish() {
                 res = publish_status;
                 continue;
             }
+            if (_succ_tablet_ids != nullptr) {
+                _succ_tablet_ids->insert({tablet_info.tablet_id, tablet_info.schema_hash});
+            }
             partition_related_tablet_infos.erase(tablet_info);
             VLOG_NOTICE << "publish version successfully on tablet. tablet=" << tablet->full_name()
                         << ", transaction_id=" << transaction_id << ", version=" << version.first
diff --git a/be/src/olap/task/engine_publish_version_task.h b/be/src/olap/task/engine_publish_version_task.h
index 8bf1783ddc..0f7d7b421a 100644
--- a/be/src/olap/task/engine_publish_version_task.h
+++ b/be/src/olap/task/engine_publish_version_task.h
@@ -18,6 +18,7 @@
 #ifndef DORIS_BE_SRC_OLAP_TASK_ENGINE_PUBLISH_VERSION_TASK_H
 #define DORIS_BE_SRC_OLAP_TASK_ENGINE_PUBLISH_VERSION_TASK_H
 
+#include <unordered_map>
 #include "gen_cpp/AgentService_types.h"
 #include "olap/olap_define.h"
 #include "olap/task/engine_task.h"
@@ -27,7 +28,8 @@ namespace doris {
 class EnginePublishVersionTask : public EngineTask {
 public:
     EnginePublishVersionTask(TPublishVersionRequest& publish_version_req,
-                             vector<TTabletId>* error_tablet_ids);
+                             vector<TTabletId>* error_tablet_ids,
+                             std::unordered_map<TTabletId, int32_t>* succ_tablet_ids = nullptr);
     ~EnginePublishVersionTask() {}
 
     virtual OLAPStatus finish() override;
@@ -35,6 +37,7 @@ public:
 private:
     const TPublishVersionRequest& _publish_version_req;
     vector<TTabletId>* _error_tablet_ids;
+    std::unordered_map<TTabletId, int32_t>* _succ_tablet_ids;
 };
 
 } // namespace doris


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org