You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2021/11/01 04:49:10 UTC

[incubator-doris] 07/07: [Enhance][Load] Reduce the number of segments when loading a large volume data in one batch (#6947)

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-0.15
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git

commit 4719771be8049e5b17fdfc0caaa9b727a1a2a854
Author: Mingyu Chen <mo...@gmail.com>
AuthorDate: Mon Nov 1 10:51:50 2021 +0800

    [Enhance][Load] Reduce the number of segments when loading a large volume data in one batch (#6947)
    
    ## Case
    
    In the load process, each tablet will have a memtable to save the incoming data,
    and if the data in a memtable is larger than 100MB, it will be flushed to disk as a `segment` file. And then
    a new memtable will be created to save the following data/
    
    Assume that this is a table with N buckets(tablets). So the max size of all memtables will be `N * 100MB`.
    If N is large, it will cost too much memory.
    
    So for memory limit purpose, when the size of all memtables reach a threshold(2GB as default), Doris will
    try to flush all current memtables to disk(even if their size are not reach 100MB).
    
    So you will see that the memtable will be flushed when it's size reach `2GB/N`, which maybe much smaller
    than 100MB, resulting in too many small segment files.
    
    ## Solution
    
    When decide to flush memtable to reduce memory consumption, NOT to flush all memtable, but to flush part
    of them.
    For example, there are 50 tablets(with 50 memtables). The memory limit is 1GB, so when each memtable reach
    20MB, the total size reach 1GB, and flush will occur.
    
    If I only flush 25 of 50 memtables, then next time when the total size reach 1GB, there will be 25 memtables with
    size 10MB, and other 25 memtables with size 30MB. So I can flush those memtables with size 30MB, which is larger
    than 20MB.
    
    The main idea is to introduce some jitter during flush to ensure the small unevenness of each memtable, so as to ensure that flush will only be triggered when the memtable is large enough.
    
    In my test, loading a table with 48 buckets, mem limit 2G, in previous version, the average memtable size is 44MB,
    after modification, the average size is 82MB
---
 be/src/common/config.h                             |  4 +-
 be/src/olap/memtable.cpp                           |  5 +++
 be/src/olap/memtable.h                             |  4 ++
 be/src/runtime/load_channel.cpp                    |  2 +-
 be/src/runtime/tablets_channel.cpp                 | 48 ++++++++++++++++++----
 be/src/runtime/tablets_channel.h                   |  2 +-
 docs/en/best-practices/star-schema-benchmark.md    | 12 +++---
 docs/zh-CN/best-practices/star-schema-benchmark.md | 16 ++++----
 .../org/apache/doris/analysis/OutFileClause.java   |  3 +-
 9 files changed, 70 insertions(+), 26 deletions(-)

diff --git a/be/src/common/config.h b/be/src/common/config.h
index 42c43fb..977282c 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -175,7 +175,7 @@ CONF_mInt32(doris_max_pushdown_conjuncts_return_rate, "90");
 // (Advanced) Maximum size of per-query receive-side buffer
 CONF_mInt32(exchg_node_buffer_size_bytes, "10485760");
 // push_write_mbytes_per_sec
-CONF_mInt32(push_write_mbytes_per_sec, "10");
+CONF_mInt32(push_write_mbytes_per_sec, "100");
 
 CONF_mInt64(column_dictionary_key_ratio_threshold, "0");
 CONF_mInt64(column_dictionary_key_size_threshold, "0");
@@ -440,7 +440,7 @@ CONF_mInt64(memory_maintenance_sleep_time_s, "10");
 CONF_Int32(memory_max_alignment, "16");
 
 // write buffer size before flush
-CONF_mInt64(write_buffer_size, "104857600");
+CONF_mInt64(write_buffer_size, "209715200");
 
 // following 2 configs limit the memory consumption of load process on a Backend.
 // eg: memory limit to 80% of mem limit config but up to 100GB(default)
diff --git a/be/src/olap/memtable.cpp b/be/src/olap/memtable.cpp
index 2c6069f..3ce5db1 100644
--- a/be/src/olap/memtable.cpp
+++ b/be/src/olap/memtable.cpp
@@ -61,6 +61,7 @@ int MemTable::RowCursorComparator::operator()(const char* left, const char* righ
 }
 
 void MemTable::insert(const Tuple* tuple) {
+    _rows++;
     bool overwritten = false;
     uint8_t* _tuple_buf = nullptr;
     if (_keys_type == KeysType::DUP_KEYS) {
@@ -120,6 +121,8 @@ void MemTable::_aggregate_two_row(const ContiguousRow& src_row, TableKey row_in_
 }
 
 OLAPStatus MemTable::flush() {
+    VLOG_CRITICAL << "begin to flush memtable for tablet: " << _tablet_id
+                  << ", memsize: " << memory_usage() << ", rows: " << _rows;
     int64_t duration_ns = 0;
     {
         SCOPED_RAW_TIMER(&duration_ns);
@@ -141,6 +144,8 @@ OLAPStatus MemTable::flush() {
     }
     DorisMetrics::instance()->memtable_flush_total->increment(1);
     DorisMetrics::instance()->memtable_flush_duration_us->increment(duration_ns / 1000);
+    VLOG_CRITICAL << "after flush memtable for tablet: " << _tablet_id
+                  << ", flushsize: " << _flush_size;
     return OLAP_SUCCESS;
 }
 
diff --git a/be/src/olap/memtable.h b/be/src/olap/memtable.h
index 6d5beeb..02783d7 100644
--- a/be/src/olap/memtable.h
+++ b/be/src/olap/memtable.h
@@ -118,6 +118,10 @@ private:
 
     // the data size flushed on disk of this memtable
     int64_t _flush_size = 0;
+    // Number of rows inserted to this memtable.
+    // This is not the rows in this memtable, because rows may be merged
+    // in unique or aggragate key model.
+    int64_t _rows = 0;
 
 }; // class MemTable
 
diff --git a/be/src/runtime/load_channel.cpp b/be/src/runtime/load_channel.cpp
index 01e8d66..5834777 100644
--- a/be/src/runtime/load_channel.cpp
+++ b/be/src/runtime/load_channel.cpp
@@ -121,7 +121,7 @@ void LoadChannel::handle_mem_exceed_limit(bool force) {
 
     std::shared_ptr<TabletsChannel> channel;
     if (_find_largest_consumption_channel(&channel)) {
-        channel->reduce_mem_usage();
+        channel->reduce_mem_usage(_mem_tracker->limit());
     } else {
         // should not happen, add log to observe
         LOG(WARNING) << "fail to find suitable tablets-channel when memory exceed. "
diff --git a/be/src/runtime/tablets_channel.cpp b/be/src/runtime/tablets_channel.cpp
index db5e23b..f74861f 100644
--- a/be/src/runtime/tablets_channel.cpp
+++ b/be/src/runtime/tablets_channel.cpp
@@ -184,7 +184,7 @@ Status TabletsChannel::close(int sender_id, int64_t backend_id, bool* finished,
     return Status::OK();
 }
 
-Status TabletsChannel::reduce_mem_usage() {
+Status TabletsChannel::reduce_mem_usage(int64_t mem_limit) {
     std::lock_guard<std::mutex> l(_lock);
     if (_state == kFinished) {
         // TabletsChannel is closed without LoadChannel's lock,
@@ -192,18 +192,48 @@ Status TabletsChannel::reduce_mem_usage() {
         return _close_status;
     }
 
-    // Flush all memtables
+    // Sort the DeltaWriters by mem consumption in descend order.
+    std::vector<DeltaWriter*> writers;
     for (auto& it : _tablet_writers) {
-        it.second->flush_memtable_and_wait(false);
+        writers.push_back(it.second);
     }
+    std::sort(writers.begin(), writers.end(),
+              [](const DeltaWriter* lhs, const DeltaWriter* rhs) {
+                  return lhs->mem_consumption() > rhs->mem_consumption();
+              });
 
-    for (auto& it : _tablet_writers) {
-        OLAPStatus st = it.second->wait_flush();
+    // Decide which writes should be flushed to reduce mem consumption.
+    // The main idea is to flush at least one third of the mem_limit.
+    // This is mainly to solve the following scenarios.
+    // Suppose there are N tablets in this TabletsChannel, and the mem limit is M.
+    // If the data is evenly distributed, when each tablet memory accumulates to M/N,
+    // the reduce memory operation will be triggered.
+    // At this time, the value of M/N may be much smaller than the value of `write_buffer_size`.
+    // If we flush all the tablets at this time, each tablet will generate a lot of small files.
+    // So here we only flush part of the tablet, and the next time the reduce memory operation is triggered,
+    // the tablet that has not been flushed before will accumulate more data, thereby reducing the number of flushes.
+    int64_t mem_to_flushed = mem_limit / 3;
+    int counter = 0;
+    int64_t  sum = 0;
+    for (auto writer : writers) {
+        if (writer->mem_consumption() <= 0) {
+            break;
+        }
+        ++counter;
+        sum += writer->mem_consumption();
+        if (sum > mem_to_flushed) {
+            break;
+        }
+    }
+    VLOG_CRITICAL << "flush " << counter << " memtables to reduce memory: " << sum;
+    for (int i = 0; i < counter; i++) {
+        writers[i]->flush_memtable_and_wait(false);
+    }
+
+    for (int i = 0; i < counter; i++) {
+        OLAPStatus st = writers[i]->wait_flush();
         if (st != OLAP_SUCCESS) {
-            // flush failed, return error
-            std::stringstream ss;
-            ss << "failed to reduce mem consumption by flushing memtable. err: " << st;
-            return Status::InternalError(ss.str());
+            return Status::InternalError(fmt::format("failed to reduce mem consumption by flushing memtable. err: {}", st));
         }
     }
     return Status::OK();
diff --git a/be/src/runtime/tablets_channel.h b/be/src/runtime/tablets_channel.h
index b0402bd..9f87e8a 100644
--- a/be/src/runtime/tablets_channel.h
+++ b/be/src/runtime/tablets_channel.h
@@ -78,7 +78,7 @@ public:
     // eg. flush the largest memtable immediately.
     // return Status::OK if mem is reduced.
     // no-op when this channel has been closed or cancelled
-    Status reduce_mem_usage();
+    Status reduce_mem_usage(int64_t mem_limit);
 
     int64_t mem_consumption() const { return _mem_tracker->consumption(); }
 
diff --git a/docs/en/best-practices/star-schema-benchmark.md b/docs/en/best-practices/star-schema-benchmark.md
index f0ea9e1..fb28fee 100644
--- a/docs/en/best-practices/star-schema-benchmark.md
+++ b/docs/en/best-practices/star-schema-benchmark.md
@@ -60,7 +60,7 @@ Execute the following script to generate the SSB data set:
 sh gen-ssb-data.sh -s 100 -c 100
 ```
 
-> Note 1: `sh gen-ssb-data.sh -h View help`
+> Note 1: Run `sh gen-ssb-data.sh -h` for help.
 >
 > Note 2: The data will be generated under the directory `ssb-data/` with a suffix of `.tbl`. The total file size is about 60GB. The generation time may vary from a few minutes to an hour.
 >
@@ -92,9 +92,11 @@ Under the `-s 100` parameter, the generated data set size is:
 
         Import the lineorder table data with the following command:
 
-        `sh load-fact-data.sh -c 3`
+        `sh load-fact-data.sh -c 5`
 
-        `-c 3` means to start 5 concurrent threads to import (the default is 3). In the case of a single BE node, the import time of lineorder data generated by `sh gen-ssb-data.sh -s 100 -c 100` using `sh load-fact-data.sh -c 3` is about 10 minutes. The memory overhead is about 5-6GB. If you turn on more threads, you can speed up the import speed, but it will increase additional memory overhead.
+        `-c 5` means to start 5 concurrent threads to import (the default is 3). In the case of a single BE node, the import time of lineorder data generated by `sh gen-ssb-data.sh -s 100 -c 100` using `sh load-fact-data.sh -c 3` is about 10 minutes. The memory overhead is about 5-6GB. If you turn on more threads, you can speed up the import speed, but it will increase additional memory overhead.
+
+        > Note: To get a faster import speed, you can add `flush_thread_num_per_store=5` in be.conf and restart BE. This configuration indicates the number of disk write threads for each data directory, and the default is 2. Larger data can increase write data throughput, but may increase IO Util. (Reference value: 1 mechanical disk, when the default is 2, the IO Util during the import process is about 12%, when it is set to 5, the IO Util is about 26%. If it is an SSD disk, it is almost 0) .
 
 5. Check the imported data
 
@@ -152,7 +154,7 @@ The following test report is based on Doris [branch-0.15](https://github.com/apa
     | q4.2 | 1430 | 670 | 8 | BLOOM_FILTER |
     | q4.2 | 1750 | 1030 | 8 | BLOOM_FILTER |
 
-    > Note 1: "This test set is far from your generation environment, don't be superstitious!"
+    > Note 1: "This test set is far from your production environment, please be skeptical!"
     >
     > Note 2: The test result is the average value of multiple executions (Page Cache will play a certain acceleration role). And the data has undergone sufficient compaction (if you test immediately after importing the data, the query delay may be higher than the test result)
     >
@@ -160,4 +162,4 @@ The following test report is based on Doris [branch-0.15](https://github.com/apa
     >
     > Note 4: Parallelism means query concurrency, which is set by `set parallel_fragment_exec_instance_num=8`.
     >
-    > Note 5: Runtime Filter Mode is the type of Runtime Filter, set by `set runtime_filter_type="BLOOM_FILTER"`. ([Runtime Filter](http://doris.incubator.apache.org/master/en/administrator-guide/runtime-filter.html) function has a significant effect on the SSB test set. Because in this test level, Join is calculated The data in the sub-right table can filter the left table very well. You can try to turn off this function through `set runtime_filter_mode=off` to see the change in query l [...]
+    > Note 5: Runtime Filter Mode is the type of Runtime Filter, set by `set runtime_filter_type="BLOOM_FILTER"`. ([Runtime Filter](http://doris.incubator.apache.org/master/en/administrator-guide/runtime-filter.html) function has a significant effect on the SSB test set. Because in this test set, The data from the right table of Join can filter the left table very well. You can try to turn off this function through `set runtime_filter_mode=off` to see the change in query latency.)
diff --git a/docs/zh-CN/best-practices/star-schema-benchmark.md b/docs/zh-CN/best-practices/star-schema-benchmark.md
index 30f43aa..ae7d608 100644
--- a/docs/zh-CN/best-practices/star-schema-benchmark.md
+++ b/docs/zh-CN/best-practices/star-schema-benchmark.md
@@ -30,7 +30,7 @@ under the License.
 
 本文档主要介绍如何在 Doris 中通过 SSB 进程初步的性能测试。
 
-> 注1:包括 SSB 在内标准测试集通常和实际业务场景差距较大,并且部分测试会针对测试集进行参数调优。所以标准测试集的测试结果仅能反映数据库在特定场景下的性能表现。建议用户使用实际业务数据进行进一步的测试。
+> 注1:包括 SSB 在内的标准测试集通常和实际业务场景差距较大,并且部分测试会针对测试集进行参数调优。所以标准测试集的测试结果仅能反映数据库在特定场景下的性能表现。建议用户使用实际业务数据进行进一步的测试。
 > 
 > 注2:本文档涉及的操作都在 CentOS 7 环境进行。
 
@@ -60,7 +60,7 @@ sh build-ssb-dbgen.sh
 sh gen-ssb-data.sh -s 100 -c 100
 ```
 
-> 注1:`sh gen-ssb-data.sh -h 查看帮助`
+> 注1:通过 `sh gen-ssb-data.sh -h` 查看脚本帮助。
 > 
 > 注2:数据会以 `.tbl` 为后缀生成在  `ssb-data/` 目录下。文件总大小约60GB。生成时间可能在数分钟到1小时不等。
 > 
@@ -84,7 +84,7 @@ sh gen-ssb-data.sh -s 100 -c 100
 
     1. 导入 4 张维度表数据(customer, part, supplier and date)
     
-        因为这4张维表数据量较小,导入较简答,我们使用以下命令先导入这4表的数据:
+        因为这4张维表数据量较小,导入较简单,我们使用以下命令先导入这4表的数据:
         
         `sh load-dimension-data.sh`
         
@@ -92,9 +92,11 @@ sh gen-ssb-data.sh -s 100 -c 100
 
         通过以下命令导入 lineorder 表数据:
         
-        `sh load-fact-data.sh -c 3`
+        `sh load-fact-data.sh -c 5`
         
-        `-c 3` 表示启动 5 个并发线程导入(默认为3)。在单 BE 节点情况下,由 `sh gen-ssb-data.sh -s 100 -c 100` 生成的 lineorder 数据,使用 `sh load-fact-data.sh -c 3` 的导入时间约为 10min。内存开销约为 5-6GB。如果开启更多线程,可以加快导入速度,但会增加额外的内存开销。
+        `-c 5` 表示启动 5 个并发线程导入(默认为3)。在单 BE 节点情况下,由 `sh gen-ssb-data.sh -s 100 -c 100` 生成的 lineorder 数据,使用 `sh load-fact-data.sh -c 3` 的导入时间约为 10min。内存开销约为 5-6GB。如果开启更多线程,可以加快导入速度,但会增加额外的内存开销。
+
+    > 注:为获得更快的导入速度,你可以在 be.conf 中添加 `flush_thread_num_per_store=5` 后重启BE。该配置表示每个数据目录的写盘线程数,默认为2。较大的数据可以提升写数据吞吐,但可能会增加 IO Util。(参考值:1块机械磁盘,在默认为2的情况下,导入过程中的 IO Util 约为12%,设置为5时,IO Util 约为26%。如果是 SSD 盘,则几乎为 0)。
 
 5. 检查导入数据
 
@@ -152,7 +154,7 @@ SSB 测试集共 4 组 14 个 SQL。查询语句在 [queries/](https://github.co
     | q4.2 | 1430 | 670 | 8 | BLOOM_FILTER |
     | q4.2 | 1750 | 1030 | 8 | BLOOM_FILTER |
 
-    > 注1:“这个测试集和你的生成环境相去甚远,不要迷信他!”
+    > 注1:“这个测试集和你的生产环境相去甚远,请对他保持怀疑态度!”
     > 
     > 注2:测试结果为多次执行取平均值(Page Cache 会起到一定加速作用)。并且数据经过充分的 compaction (如果在刚导入数据后立刻测试,则查询延迟可能高于本测试结果)
     >
@@ -160,5 +162,5 @@ SSB 测试集共 4 组 14 个 SQL。查询语句在 [queries/](https://github.co
     >
     > 注4:Parallelism 表示查询并发度,通过 `set parallel_fragment_exec_instance_num=8` 设置。
     >
-    > 注5:Runtime Filter Mode 是 Runtime Filter 的类型,通过 `set runtime_filter_type="BLOOM_FILTER"` 设置。([Runtime Filter](http://doris.incubator.apache.org/master/zh-CN/administrator-guide/runtime-filter.html) 功能对 SSB 测试集效果显著。因为该测试级中,Join 算子右表的数据可以对左表起到很好的过滤作用。你可以尝试通过 `set runtime_filter_mode=off` 关闭该功能,看看查询延迟的变化。)
+    > 注5:Runtime Filter Mode 是 Runtime Filter 的类型,通过 `set runtime_filter_type="BLOOM_FILTER"` 设置。([Runtime Filter](http://doris.incubator.apache.org/master/zh-CN/administrator-guide/runtime-filter.html) 功能对 SSB 测试集效果显著。因为该测试集中,Join 算子右表的数据可以对左表起到很好的过滤作用。你可以尝试通过 `set runtime_filter_mode=off` 关闭该功能,看看查询延迟的变化。)
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java
index 18bfc2d..b2268ab 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java
@@ -308,7 +308,8 @@ public class OutFileClause {
 
         if (filePath.startsWith(LOCAL_FILE_PREFIX)) {
             if (!Config.enable_outfile_to_local) {
-                throw new AnalysisException("Exporting results to local disk is not allowed.");
+                throw new AnalysisException("Exporting results to local disk is not allowed." 
+                    + " To enable this feature, you need to add `enable_outfile_to_local=true` in fe.conf and restart FE");
             }
             isLocalOutput = true;
             filePath = filePath.substring(LOCAL_FILE_PREFIX.length() - 1); // leave last '/'

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org