You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2021/03/11 14:34:28 UTC

[incubator-doris] branch master updated: [Enhancement] Support Pallralel Merge In Exchange Node (#5468)

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 689602e  [Enhancement] Support Pallralel Merge In Exchange Node (#5468)
689602e is described below

commit 689602e686a4ae7298ad5fc1c7abf2223a06da5f
Author: HappenLee <ha...@hotmail.com>
AuthorDate: Thu Mar 11 22:34:18 2021 +0800

    [Enhancement] Support Pallralel Merge In Exchange Node (#5468)
    
    Support Parallel Merge In Exchange Node
---
 be/src/exec/exchange_node.cpp                      |   6 +-
 be/src/runtime/data_stream_recvr.cc                |  64 +++++++--
 be/src/runtime/data_stream_recvr.h                 |   3 +
 be/src/runtime/load_channel_mgr.cpp                |   2 +-
 be/src/runtime/runtime_state.h                     |   2 +
 be/src/runtime/sorted_run_merger.cc                | 153 ++++++++++++++++++++-
 be/src/runtime/sorted_run_merger.h                 |  34 ++++-
 docs/en/administrator-guide/running-profile.md     |   4 +
 docs/en/administrator-guide/variables.md           |   6 +
 docs/zh-CN/administrator-guide/running-profile.md  |   4 +
 docs/zh-CN/administrator-guide/variables.md        |   8 +-
 .../java/org/apache/doris/qe/SessionVariable.java  |   6 +
 gensrc/thrift/PaloInternalService.thrift           |   2 +
 13 files changed, 273 insertions(+), 21 deletions(-)

diff --git a/be/src/exec/exchange_node.cpp b/be/src/exec/exchange_node.cpp
index 94fe0a0..d4314a9 100644
--- a/be/src/exec/exchange_node.cpp
+++ b/be/src/exec/exchange_node.cpp
@@ -83,7 +83,11 @@ Status ExchangeNode::open(RuntimeState* state) {
         TupleRowComparator less_than(_sort_exec_exprs, _is_asc_order, _nulls_first);
         // create_merger() will populate its merging heap with batches from the _stream_recvr,
         // so it is not necessary to call fill_input_row_batch().
-        RETURN_IF_ERROR(_stream_recvr->create_merger(less_than));
+        if (state->enable_exchange_node_parallel_merge()) {
+            RETURN_IF_ERROR(_stream_recvr->create_parallel_merger(less_than, state->batch_size(), mem_tracker().get()));
+        } else {
+            RETURN_IF_ERROR(_stream_recvr->create_merger(less_than));
+        }
     } else {
         RETURN_IF_ERROR(fill_input_row_batch(state));
     }
diff --git a/be/src/runtime/data_stream_recvr.cc b/be/src/runtime/data_stream_recvr.cc
index eb671f1..c5ef74f 100644
--- a/be/src/runtime/data_stream_recvr.cc
+++ b/be/src/runtime/data_stream_recvr.cc
@@ -322,24 +322,72 @@ void DataStreamRecvr::SenderQueue::close() {
 
 Status DataStreamRecvr::create_merger(const TupleRowComparator& less_than) {
     DCHECK(_is_merging);
-    vector<SortedRunMerger::RunBatchSupplier> input_batch_suppliers;
-    input_batch_suppliers.reserve(_sender_queues.size());
-
+    vector<SortedRunMerger::RunBatchSupplier> child_input_batch_suppliers;
     // Create the merger that will a single stream of sorted rows.
     _merger.reset(new SortedRunMerger(less_than, &_row_desc, _profile, false));
 
     for (int i = 0; i < _sender_queues.size(); ++i) {
-        input_batch_suppliers.push_back(
+        child_input_batch_suppliers.emplace_back(
                 bind(mem_fn(&SenderQueue::get_batch), _sender_queues[i], _1));
     }
-    RETURN_IF_ERROR(_merger->prepare(input_batch_suppliers));
+    RETURN_IF_ERROR(_merger->prepare(child_input_batch_suppliers));
+    return Status::OK();
+}
+
+Status DataStreamRecvr::create_parallel_merger(const TupleRowComparator& less_than, uint32_t batch_size, MemTracker* mem_tracker) {
+    DCHECK(_is_merging);
+    vector<SortedRunMerger::RunBatchSupplier> child_input_batch_suppliers;
+
+    // Create the merger that will a single stream of sorted rows.
+    _merger.reset(new SortedRunMerger(less_than, &_row_desc, _profile, false));
+
+    // There we chose parallel merge, we should make thread execute more parallel
+    // to minimized the computation of top merger
+    // top merger: have child merger to supplier data
+    //	child merger: have sender queue to supplier data, each merger start a thread to merge data firstly
+    //		sender queue: the data from other node
+    // Before parallel merge, if we have 81 sender queue, data is 1000, the computation is 1000 * log(81)
+    // After parallel merge, the computation is MAX(1000 * log(2), 500 * log(41))
+    // Now we only support max 3 merge child, because:
+    // we have N _sender_queue, M merge child. the best way is log(N / M) = M * log(M)
+    // So if N = 8, M = 2
+    // 	     N = 81, M = 3
+    //       N = 1024, M = 4
+    // normally the N is lower than 1024, so we chose 8 <= N < 81, M = 2
+    // N >= 81, M = 3
+    auto parallel_thread = _sender_queues.size() < 81 ? 2 : 3;
+    auto step = _sender_queues.size() / parallel_thread + 1;
+    for (int i = 0; i < _sender_queues.size(); i += step) {
+        // Create the merger that will a single stream of sorted rows.
+        std::unique_ptr<SortedRunMerger> child_merger(new ChildSortedRunMerger(
+                less_than, &_row_desc, _profile, mem_tracker, batch_size, false));
+        vector<SortedRunMerger::RunBatchSupplier> input_batch_suppliers;
+        for (int j = i; j < std::min((size_t)i + step, _sender_queues.size()); ++j) {
+            input_batch_suppliers.emplace_back(
+                    bind(mem_fn(&SenderQueue::get_batch), _sender_queues[j], _1));
+        }
+        child_merger->prepare(input_batch_suppliers);
+
+        child_input_batch_suppliers.emplace_back(
+                bind(mem_fn(&SortedRunMerger::get_batch), child_merger.get(), _1));
+        _child_mergers.emplace_back(std::move(child_merger));
+    }
+    RETURN_IF_ERROR(_merger->prepare(child_input_batch_suppliers, true));
+
     return Status::OK();
 }
 
 void DataStreamRecvr::transfer_all_resources(RowBatch* transfer_batch) {
-    BOOST_FOREACH (SenderQueue* sender_queue, _sender_queues) {
-        if (sender_queue->current_batch() != NULL) {
-            sender_queue->current_batch()->transfer_resource_ownership(transfer_batch);
+    // _child_mergers is not empty, means use parallel merge need transfer resource from
+    // _sender queue.
+    // the need transfer resources from child_merger input_row_batch
+    if (!_child_mergers.empty()) {
+        _merger->transfer_all_resources(transfer_batch);
+    } else {
+        BOOST_FOREACH (SenderQueue* sender_queue, _sender_queues) {
+            if (sender_queue->current_batch() != NULL) {
+                sender_queue->current_batch()->transfer_resource_ownership(transfer_batch);
+            }
         }
     }
 }
diff --git a/be/src/runtime/data_stream_recvr.h b/be/src/runtime/data_stream_recvr.h
index 32fa571..62d38c7 100644
--- a/be/src/runtime/data_stream_recvr.h
+++ b/be/src/runtime/data_stream_recvr.h
@@ -87,6 +87,7 @@ public:
     // queues. The exprs used in less_than must have already been prepared and opened.
     Status create_merger(const TupleRowComparator& less_than);
 
+    Status create_parallel_merger(const TupleRowComparator& less_than, uint32_t batch_size, MemTracker* mem_tracker);
     // Fill output_batch with the next batch of rows obtained by merging the per-sender
     // input streams. Must only be called if _is_merging is true.
     Status get_next(RowBatch* output_batch, bool* eos);
@@ -165,6 +166,8 @@ private:
     // SortedRunMerger used to merge rows from different senders.
     boost::scoped_ptr<SortedRunMerger> _merger;
 
+    std::vector<std::unique_ptr<SortedRunMerger>> _child_mergers;
+
     // Pool of sender queues.
     ObjectPool _sender_queue_pool;
 
diff --git a/be/src/runtime/load_channel_mgr.cpp b/be/src/runtime/load_channel_mgr.cpp
index 74e21d0..e006205 100644
--- a/be/src/runtime/load_channel_mgr.cpp
+++ b/be/src/runtime/load_channel_mgr.cpp
@@ -44,7 +44,7 @@ static int64_t calc_process_max_load_memory(int64_t process_mem_limit) {
 static int64_t calc_job_max_load_memory(int64_t mem_limit_in_req, int64_t total_mem_limit) {
     // default mem limit is used to be compatible with old request.
     // new request should be set load_mem_limit.
-    const int64_t default_load_mem_limit = 2 * 1024 * 1024 * 1024L; // 2GB
+    constexpr int64_t default_load_mem_limit = 2 * 1024 * 1024 * 1024L; // 2GB
     int64_t load_mem_limit = default_load_mem_limit;
     if (mem_limit_in_req != -1) {
         // mem-limit of a certain load should between config::write_buffer_size
diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h
index cbcf3d7..dc7a2c8 100644
--- a/be/src/runtime/runtime_state.h
+++ b/be/src/runtime/runtime_state.h
@@ -342,6 +342,8 @@ public:
 
     bool enable_spill() const { return _query_options.enable_spilling; }
 
+    bool enable_exchange_node_parallel_merge() const { return _query_options.enable_enable_exchange_node_parallel_merge; }
+
     // the following getters are only valid after Prepare()
     InitialReservations* initial_reservations() const { return _initial_reservations; }
 
diff --git a/be/src/runtime/sorted_run_merger.cc b/be/src/runtime/sorted_run_merger.cc
index 76c451e..adcec2d 100644
--- a/be/src/runtime/sorted_run_merger.cc
+++ b/be/src/runtime/sorted_run_merger.cc
@@ -17,6 +17,7 @@
 
 #include "runtime/sorted_run_merger.h"
 
+#include <condition_variable>
 #include <vector>
 
 #include "exprs/expr.h"
@@ -25,6 +26,7 @@
 #include "runtime/sorter.h"
 #include "runtime/tuple_row.h"
 #include "util/debug_util.h"
+#include "util/defer_op.h"
 #include "util/runtime_profile.h"
 
 using std::vector;
@@ -46,10 +48,10 @@ public:
               _input_row_batch_index(-1),
               _parent(parent) {}
 
-    ~BatchedRowSupplier() {}
+    virtual ~BatchedRowSupplier() = default;
 
     // Retrieves the first batch of sorted rows from the run.
-    Status init(bool* done) {
+    virtual Status init(bool* done) {
         *done = false;
         RETURN_IF_ERROR(_sorted_run(&_input_row_batch));
         if (_input_row_batch == NULL) {
@@ -62,7 +64,7 @@ public:
 
     // Increment the current row index. If the current input batch is exhausted fetch the
     // next one from the sorted run. Transfer ownership to transfer_batch if not NULL.
-    Status next(RowBatch* transfer_batch, bool* done) {
+    virtual Status next(RowBatch* transfer_batch, bool* done) {
         DCHECK(_input_row_batch != NULL);
         ++_input_row_batch_index;
         if (_input_row_batch_index < _input_row_batch->num_rows()) {
@@ -83,7 +85,9 @@ public:
 
     TupleRow* current_row() const { return _input_row_batch->get_row(_input_row_batch_index); }
 
-private:
+    RowBatch* get_row_batch() const { return _input_row_batch; }
+
+protected:
     friend class SortedRunMerger;
 
     // The run from which this object supplies rows.
@@ -99,6 +103,102 @@ private:
     SortedRunMerger* _parent;
 };
 
+class SortedRunMerger::ParallelBatchedRowSupplier : public SortedRunMerger::BatchedRowSupplier {
+public:
+    // Construct an instance from a sorted input run.
+    ParallelBatchedRowSupplier(SortedRunMerger* parent, const RunBatchSupplier& sorted_run)
+            : BatchedRowSupplier(parent, sorted_run) {}
+
+    ~ParallelBatchedRowSupplier() {
+        // when have the limit clause need to wait the _pull_task_thread join terminate
+        _cancel = true;
+        _backup_ready = false;
+        _batch_prepared_cv.notify_one();
+        _pull_task_thread.join();
+
+        delete _input_row_batch;
+        delete _input_row_batch_backup;
+    }
+
+    // Retrieves the first batch of sorted rows from the run.
+    Status init(bool* done) override {
+        *done = false;
+        _pull_task_thread = std::thread(&SortedRunMerger::ParallelBatchedRowSupplier::process_sorted_run_task, this);
+
+        RETURN_IF_ERROR(next(NULL, done));
+        return Status::OK();
+    }
+
+    // Increment the current row index. If the current input batch is exhausted fetch the
+    // next one from the sorted run. Transfer ownership to transfer_batch if not NULL.
+    Status next(RowBatch* transfer_batch, bool* done) override {
+        ++_input_row_batch_index;
+        if (_input_row_batch && _input_row_batch_index < _input_row_batch->num_rows()) {
+            *done = false;
+        } else {
+            ScopedTimer<MonotonicStopWatch> timer(_parent->_get_next_batch_timer);
+            if (_input_row_batch && transfer_batch != NULL) {
+                _input_row_batch->transfer_resource_ownership(transfer_batch);
+            }
+            // release the mem of child merge
+            delete _input_row_batch;
+
+            std::unique_lock<std::mutex> lock(_mutex);
+            _batch_prepared_cv.wait(lock, [this](){ return _backup_ready.load(); });
+
+            // switch input_row_batch_backup to _input_row_batch
+            _input_row_batch = _input_row_batch_backup;
+            _input_row_batch_index = 0;
+            _input_row_batch_backup = nullptr;
+            _backup_ready = false;
+            DCHECK(_input_row_batch == nullptr || _input_row_batch->num_rows() > 0);
+
+            *done = _input_row_batch == nullptr;
+            _batch_prepared_cv.notify_one();
+        }
+        return Status::OK();
+    }
+
+private:
+    // The backup row batch input be backup batch from _sort_run.
+    RowBatch* _input_row_batch_backup;
+
+    std::atomic_bool _backup_ready{false};
+
+    std::atomic_bool _cancel{false};
+
+    std::thread _pull_task_thread;
+
+    Status _status_backup;
+
+    std::mutex _mutex;
+
+    // signal of new batch or the eos/cancelled condition
+    std::condition_variable _batch_prepared_cv;
+
+    void process_sorted_run_task() {
+        std::unique_lock<std::mutex> lock(_mutex);
+        while (true) {
+            _batch_prepared_cv.wait(lock, [this]() { return !_backup_ready.load(); });
+            if (_cancel) {
+                break;
+            }
+
+            // do merge from sender queue data
+            _status_backup = _sorted_run(&_input_row_batch_backup);
+            _backup_ready = true;
+            DeferOp defer_op([this]() {
+                _batch_prepared_cv.notify_one();
+            });
+
+            if (!_status_backup.ok() || _input_row_batch_backup == nullptr || _cancel) {
+                if (!_status_backup.ok()) _input_row_batch_backup = nullptr;
+                break;
+            }
+        }
+    }
+};
+
 void SortedRunMerger::heapify(int parent_index) {
     int left_index = 2 * parent_index + 1;
     int right_index = left_index + 1;
@@ -134,11 +234,12 @@ SortedRunMerger::SortedRunMerger(const TupleRowComparator& compare_less_than,
     _get_next_batch_timer = ADD_TIMER(profile, "MergeGetNextBatch");
 }
 
-Status SortedRunMerger::prepare(const vector<RunBatchSupplier>& input_runs) {
+Status SortedRunMerger::prepare(const vector<RunBatchSupplier>& input_runs, bool parallel) {
     DCHECK_EQ(_min_heap.size(), 0);
     _min_heap.reserve(input_runs.size());
     BOOST_FOREACH (const RunBatchSupplier& input_run, input_runs) {
-        BatchedRowSupplier* new_elem = _pool.add(new BatchedRowSupplier(this, input_run));
+        BatchedRowSupplier* new_elem = _pool.add(
+        	parallel ? new ParallelBatchedRowSupplier(this, input_run) : new BatchedRowSupplier(this, input_run));
         DCHECK(new_elem != NULL);
         bool empty = false;
         RETURN_IF_ERROR(new_elem->init(&empty));
@@ -155,6 +256,15 @@ Status SortedRunMerger::prepare(const vector<RunBatchSupplier>& input_runs) {
     return Status::OK();
 }
 
+void SortedRunMerger::transfer_all_resources(class doris::RowBatch * transfer_resource_batch) {
+    for (BatchedRowSupplier* batched_row_supplier : _min_heap) {
+        auto row_batch = batched_row_supplier->get_row_batch();
+        if (row_batch != nullptr) {
+            row_batch->transfer_resource_ownership(transfer_resource_batch);
+        }
+    }
+}
+
 Status SortedRunMerger::get_next(RowBatch* output_batch, bool* eos) {
     ScopedTimer<MonotonicStopWatch> timer(_get_next_timer);
     if (_min_heap.empty()) {
@@ -195,4 +305,35 @@ Status SortedRunMerger::get_next(RowBatch* output_batch, bool* eos) {
     return Status::OK();
 }
 
+ChildSortedRunMerger::ChildSortedRunMerger(const TupleRowComparator& compare_less_than,
+        RowDescriptor* row_desc,
+        RuntimeProfile* profile,
+        MemTracker* parent,
+        uint32_t row_batch_size,
+        bool deep_copy_input):
+        SortedRunMerger(compare_less_than, row_desc, profile, deep_copy_input),
+        _eos(false),
+        _parent(parent),
+        _row_batch_size(row_batch_size) {
+	_get_next_timer = ADD_TIMER(profile, "ChildMergeGetNext");
+	_get_next_batch_timer = ADD_TIMER(profile, "ChildMergeGetNextBatch");
+}
+
+Status ChildSortedRunMerger::get_batch(RowBatch** output_batch) {
+    *output_batch = nullptr;
+    if (_eos) {
+        return Status::OK();
+    }
+
+    _current_row_batch.reset(new RowBatch(*_input_row_desc, _row_batch_size, _parent));
+
+    bool eos = false;
+    RETURN_IF_ERROR(get_next(_current_row_batch.get(), &eos));
+    *output_batch =
+            UNLIKELY(_current_row_batch->num_rows() == 0) ? nullptr : _current_row_batch.release();
+    _eos = eos;
+
+    return Status::OK();
+}
+
 } // namespace doris
diff --git a/be/src/runtime/sorted_run_merger.h b/be/src/runtime/sorted_run_merger.h
index ca27415..1c3e2d4 100644
--- a/be/src/runtime/sorted_run_merger.h
+++ b/be/src/runtime/sorted_run_merger.h
@@ -51,22 +51,28 @@ public:
     SortedRunMerger(const TupleRowComparator& compare_less_than, RowDescriptor* row_desc,
                     RuntimeProfile* profile, bool deep_copy_input);
 
-    ~SortedRunMerger() {}
+    virtual ~SortedRunMerger() = default;
 
     // Prepare this merger to merge and return rows from the sorted runs in 'input_runs'.
     // Retrieves the first batch from each run and sets up the binary heap implementing
     // the priority queue.
-    Status prepare(const std::vector<RunBatchSupplier>& input_runs);
+    Status prepare(const std::vector<RunBatchSupplier>& input_runs, bool parallel = false);
 
     // Return the next batch of sorted rows from this merger.
     Status get_next(RowBatch* output_batch, bool* eos);
 
+    // Only Child class implement this Method, Return the next batch of sorted rows from this merger.
+    virtual Status get_batch(RowBatch** output_batch) {
+        return Status::InternalError("no support method get_batch(RowBatch** output_batch)");
+    }
+
     // Called to finalize a merge when deep_copy is false. Transfers resources from
     // all input batches to the specified output batch.
     void transfer_all_resources(RowBatch* transfer_resource_batch);
 
-private:
+protected:
     class BatchedRowSupplier;
+    class ParallelBatchedRowSupplier;
 
     // Assuming the element at parent_index is the only out of place element in the heap,
     // restore the heap property (i.e. swap elements so parent <= children).
@@ -101,6 +107,28 @@ private:
     RuntimeProfile::Counter* _get_next_batch_timer;
 };
 
+class ChildSortedRunMerger: public SortedRunMerger {
+public:
+    ChildSortedRunMerger(const TupleRowComparator& compare_less_than,
+        RowDescriptor* row_desc,
+        RuntimeProfile* profile,
+        MemTracker* _parent,
+        uint32_t row_batch_size,
+        bool deep_copy_input);
+
+    Status get_batch(RowBatch** output_batch) override;
+private:
+    // Ptr to prevent mem leak for api get_batch(Rowbatch**)
+    std::unique_ptr<RowBatch> _current_row_batch;
+
+    // The data in merger is exhaust
+    bool _eos = false;
+
+    MemTracker* _parent;
+
+    uint32_t _row_batch_size;
+};
+
 } // namespace doris
 
 #endif // DORIS_BE_SRC_RUNTIME_SORTED_RUN_MERGER_H
diff --git a/docs/en/administrator-guide/running-profile.md b/docs/en/administrator-guide/running-profile.md
index 4002f73..13f4625 100644
--- a/docs/en/administrator-guide/running-profile.md
+++ b/docs/en/administrator-guide/running-profile.md
@@ -117,6 +117,10 @@ There are many statistical information collected at BE.  so we list the correspo
 #### `EXCHANGE_NODE`
   - BytesReceived: Size of bytes received by network
   - DataArrivalWaitTime: Total waiting time of sender to push data 
+  - MergeGetNext: When there is a sort in the lower level node, exchange node will perform a unified merge sort and output an ordered result. This indicator records the total time consumption of merge sorting, including the time consumption of MergeGetNextBatch.
+  - MergeGetNextBatch:It takes time for merge node to get data. If it is single-layer merge sort, the object to get data is network queue. For multi-level merge sorting, the data object is child merger.
+  - ChildMergeGetNext: When there are too many senders in the lower layer to send data, single thread merge will become a performance bottleneck. Doris will start multiple child merge threads to do merge sort in parallel. The sorting time of child merge is recorded, which is the cumulative value of multiple threads.
+  - ChildMergeGetNextBatch: It takes time for child merge to get data,If the time consumption is too large, the bottleneck may be the lower level data sending node.
   - FirstBatchArrivalWaitTime: The time waiting for the first batch come from sender
   - DeserializeRowBatchTimer: Time consuming to receive data deserialization
   - SendersBlockedTotalTimer(*): When the DataStreamRecv's queue buffer is full, wait time of sender
diff --git a/docs/en/administrator-guide/variables.md b/docs/en/administrator-guide/variables.md
index a587319..1325117 100644
--- a/docs/en/administrator-guide/variables.md
+++ b/docs/en/administrator-guide/variables.md
@@ -382,3 +382,9 @@ Note that the comment must start with /*+ and can only follow the SELECT.
 
     When execute insert statement, doris will wait for the transaction to commit and visible after the import is completed.
     This parameter controls the timeout of waiting for transaction to be visible. The default value is 10000, and the minimum value is 1000.
+
+*  `enable_exchange_node_parallel_merge`
+
+    In a sort query, when an upper level node receives the ordered data of the lower level node, it will sort the corresponding data on the exchange node to ensure that the final data is ordered. However, when a single thread merges multiple channels of data, if the amount of data is too large, it will lead to a single point of exchange node merge bottleneck.
+
+    Doris optimizes this part if there are too many data nodes in the lower layer. Exchange node will start multithreading for parallel merging to speed up the sorting process. This parameter is false by default, which means that exchange node does not adopt parallel merge sort to reduce the extra CPU and memory consumption.
diff --git a/docs/zh-CN/administrator-guide/running-profile.md b/docs/zh-CN/administrator-guide/running-profile.md
index fe13a72..697254a 100644
--- a/docs/zh-CN/administrator-guide/running-profile.md
+++ b/docs/zh-CN/administrator-guide/running-profile.md
@@ -115,6 +115,10 @@ BE端收集的统计信息较多,下面列出了各个参数的对应含义:
 
 #### `EXCHANGE_NODE`
   - BytesReceived: 通过网络接收的数据量大小
+  - MergeGetNext: 当下层节点存在排序时,会在EXCHANGE NODE进行统一的归并排序,输出有序结果。该指标记录了Merge排序的总耗时,包含了MergeGetNextBatch耗时。
+  - MergeGetNextBatch:Merge节点取数据的耗时,如果为单层Merge排序,则取数据的对象为网络队列。若为多层Merge排序取数据对象为Child Merger。
+  - ChildMergeGetNext: 当下层的发送数据的Sender过多时,单线程的Merge会成为性能瓶颈,Doris会启动多个Child Merge线程并行归并排序。记录了Child Merge的排序耗时  该数值是多个线程的累加值。
+  - ChildMergeGetNextBatch: Child Merge节点从取数据的耗时,如果耗时过大,可能的瓶颈为下层的数据发送节点。 
   - DataArrivalWaitTime: 等待Sender发送数据的总时间
   - FirstBatchArrivalWaitTime: 等待第一个batch从Sender获取的时间
   - DeserializeRowBatchTimer: 反序列化网络数据的耗时
diff --git a/docs/zh-CN/administrator-guide/variables.md b/docs/zh-CN/administrator-guide/variables.md
index 5b2322f..79e263a 100644
--- a/docs/zh-CN/administrator-guide/variables.md
+++ b/docs/zh-CN/administrator-guide/variables.md
@@ -71,8 +71,6 @@ SET GLOBAL exec_mem_limit = 137438953472
 * `query_timeout`
 * `exec_mem_limit`
 * `batch_size`
-* `parallel_fragment_exec_instance_num`
-* `parallel_exchange_instance_num`
 * `allow_partition_column_nullable`
 * `insert_visible_timeout_ms`
 
@@ -379,3 +377,9 @@ SELECT /*+ SET_VAR(query_timeout = 1) */ sleep(3);
 * `insert_visible_timeout_ms`
 
     在执行insert语句时,导入动作(查询和插入)完成后,还需要等待事务提交,使数据可见。此参数控制等待数据可见的超时时间,默认为10000,最小为1000。
+    
+*  `enable_exchange_node_parallel_merge`
+
+    在一个排序的查询之中,一个上层节点接收下层节点有序数据时,会在exchange node上进行对应的排序来保证最终的数据是有序的。但是单线程进行多路数据归并时,如果数据量过大,会导致exchange node的单点的归并瓶颈。
+
+    Doris在这部分进行了优化处理,如果下层的数据节点过多。exchange node会启动多线程进行并行归并来加速排序过程。该参数默认为False,即表示 exchange node 不采取并行的归并排序,来减少额外的CPU和内存消耗。
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index 09c8400..40bc2bf 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -77,6 +77,7 @@ public class SessionVariable implements Serializable, Writable {
     public static final String PARALLEL_FRAGMENT_EXEC_INSTANCE_NUM = "parallel_fragment_exec_instance_num";
     public static final String ENABLE_INSERT_STRICT = "enable_insert_strict";
     public static final String ENABLE_SPILLING = "enable_spilling";
+    public static final String ENABLE_EXCHANGE_NODE_PARALLEL_MERGE = "enable_exchange_node_parallel_merge";
     public static final String PREFER_JOIN_METHOD = "prefer_join_method";
 
     public static final String ENABLE_ODBC_TRANSCATION = "enable_odbc_transcation";
@@ -133,6 +134,9 @@ public class SessionVariable implements Serializable, Writable {
     @VariableMgr.VarAttr(name = ENABLE_SPILLING)
     public boolean enableSpilling = false;
 
+    @VariableMgr.VarAttr(name = ENABLE_EXCHANGE_NODE_PARALLEL_MERGE)
+    public boolean enableExchangeNodeParallelMerge = false;
+
     // query timeout in second.
     @VariableMgr.VarAttr(name = QUERY_TIMEOUT)
     public int queryTimeoutS = 300;
@@ -617,7 +621,9 @@ public class SessionVariable implements Serializable, Writable {
         if (maxPushdownConditionsPerColumn > -1) {
             tResult.setMaxPushdownConditionsPerColumn(maxPushdownConditionsPerColumn);
         }
+
         tResult.setEnableSpilling(enableSpilling);
+        tResult.setEnableEnableExchangeNodeParallelMerge(enableExchangeNodeParallelMerge);
         return tResult;
     }
 
diff --git a/gensrc/thrift/PaloInternalService.thrift b/gensrc/thrift/PaloInternalService.thrift
index 0263934..c320a98 100644
--- a/gensrc/thrift/PaloInternalService.thrift
+++ b/gensrc/thrift/PaloInternalService.thrift
@@ -138,6 +138,8 @@ struct TQueryOptions {
   30: optional i32 max_pushdown_conditions_per_column
   // whether enable spilling to disk
   31: optional bool enable_spilling = false;
+  // whether enable parallel merge in exchange node
+  32: optional bool enable_enable_exchange_node_parallel_merge = false;
 }
     
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org