You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2020/07/09 01:30:39 UTC

[incubator-doris] branch master updated: [Spill]Fix the problem of mem exec, when analytic eval node need to spill to disk with a low mem limit (#3991)

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new fafc7e4  [Spill]Fix the problem of mem exec, when analytic eval node need to spill to disk with a low mem limit (#3991)
fafc7e4 is described below

commit fafc7e406efc2f47961b18ed47e08a540f465a28
Author: HappenLee <ha...@hotmail.com>
AuthorDate: Thu Jul 9 09:30:22 2020 +0800

    [Spill]Fix the problem of mem exec, when analytic eval node need to spill to disk with a low mem limit (#3991)
    
    [Bug] Fix the problem of mem exec, when analytic eval node need to spill to disk with a low mem limit.
    And clear_reservations of Analytic node reservation of block manager.
    [Running Profile] Add Spilled flag in Running Profile, when Analytic eval node and sort node spill to Disk.
---
 be/src/exec/analytic_eval_node.cpp | 19 ++++++++++++++-----
 be/src/exec/analytic_eval_node.h   |  2 +-
 be/src/exec/spill_sort_node.cc     |  3 +++
 be/src/runtime/spill_sorter.cc     |  6 ++++--
 be/src/runtime/spill_sorter.h      |  5 +++++
 5 files changed, 27 insertions(+), 8 deletions(-)

diff --git a/be/src/exec/analytic_eval_node.cpp b/be/src/exec/analytic_eval_node.cpp
index 86db457..400faa5 100644
--- a/be/src/exec/analytic_eval_node.cpp
+++ b/be/src/exec/analytic_eval_node.cpp
@@ -25,8 +25,6 @@
 #include "runtime/runtime_state.h"
 #include "udf/udf_internal.h"
 
-static const int MAX_TUPLE_POOL_SIZE = 8 * 1024 * 1024; // 8MB
-
 namespace doris {
 
 using doris_udf::BigIntVal;
@@ -53,6 +51,7 @@ AnalyticEvalNode::AnalyticEvalNode(ObjectPool* pool, const TPlanNode& tnode,
         _dummy_result_tuple(NULL),
         _curr_partition_idx(-1),
         _prev_input_row(NULL),
+        _block_mgr_client(nullptr),
         _input_eos(false),
         _evaluation_timer(NULL) {
     if (tnode.analytic_node.__isset.buffered_tuple_id) {
@@ -194,8 +193,8 @@ Status AnalyticEvalNode::open(RuntimeState* state) {
     RETURN_IF_CANCELLED(state);
     //RETURN_IF_ERROR(QueryMaintenance(state));
     RETURN_IF_ERROR(child(0)->open(state));
-    RETURN_IF_ERROR(state->block_mgr2()->register_client(2, mem_tracker(), state, &client_));
-    _input_stream.reset(new BufferedTupleStream2(state, child(0)->row_desc(), state->block_mgr2(), client_, false, true));
+    RETURN_IF_ERROR(state->block_mgr2()->register_client(2, mem_tracker(), state, &_block_mgr_client));
+    _input_stream.reset(new BufferedTupleStream2(state, child(0)->row_desc(), state->block_mgr2(), _block_mgr_client, false, true));
     RETURN_IF_ERROR(_input_stream->init(id(), runtime_profile(), true));
 
     bool got_read_buffer;
@@ -689,6 +688,7 @@ Status AnalyticEvalNode::process_child_batch(RuntimeState* state) {
             // not enough memory (status() is OK). If there isn't enough memory, we unpin
             // the stream and continue writing/reading in unpinned mode.
             // TODO: Consider re-pinning later if the output stream is fully consumed.
+            add_runtime_exec_option("Spilled");
             RETURN_IF_ERROR(status);
             RETURN_IF_ERROR(_input_stream->unpin_stream());
             VLOG_FILE << id() << " Unpin input stream while adding row idx=" << stream_idx;
@@ -710,7 +710,12 @@ Status AnalyticEvalNode::process_child_batch(RuntimeState* state) {
 
     // Transfer resources to _prev_tuple_pool when enough resources have accumulated
     // and the _prev_tuple_pool has already been transfered to an output batch.
-    if (_curr_tuple_pool->total_allocated_bytes() > MAX_TUPLE_POOL_SIZE &&
+
+    // The memory limit of _curr_tuple_pool is set by the fixed value 
+    // The size is specified as 8MB, which is used in the extremely strict memory limit.
+    // Eg: exec_mem_limit < 100MB may cause memory exeecded limit problem. So change it to half of max block size to prevent the problem.
+    // TODO: Should we keep the buffer of _curr_tuple_pool or release the memory occupied ASAP?
+    if (_curr_tuple_pool->total_allocated_bytes() > state->block_mgr2()->max_block_size() / 2 &&
             (_prev_pool_last_result_idx == -1 || _prev_pool_last_window_idx == -1)) {
         _prev_tuple_pool->acquire_data(_curr_tuple_pool.get(), false);
         _prev_pool_last_result_idx = _last_result_idx;
@@ -811,6 +816,7 @@ Status AnalyticEvalNode::get_next(RuntimeState* state, RowBatch* row_batch, bool
     RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::GETNEXT));
     RETURN_IF_CANCELLED(state);
     //RETURN_IF_ERROR(QueryMaintenance(state));
+    RETURN_IF_ERROR(state->check_query_state("Analytic eval, while get_next."));
     VLOG_FILE << id() << " GetNext: " << debug_state_string(false);
 
     if (reached_limit()) {
@@ -855,6 +861,9 @@ Status AnalyticEvalNode::close(RuntimeState* state) {
         _input_stream->close();
     }
 
+    if (_block_mgr_client != nullptr) {
+        state->block_mgr2()->clear_reservations(_block_mgr_client);
+    }
     // Close all evaluators and fn ctxs. If an error occurred in Init or rrepare there may
     // be fewer ctxs than evaluators. We also need to Finalize if _curr_tuple was created
     // in Open.
diff --git a/be/src/exec/analytic_eval_node.h b/be/src/exec/analytic_eval_node.h
index 52516f6..f15c84f 100644
--- a/be/src/exec/analytic_eval_node.h
+++ b/be/src/exec/analytic_eval_node.h
@@ -308,7 +308,7 @@ private:
     boost::scoped_ptr<RowBatch> _curr_child_batch;
 
     // Block manager client used by _input_stream. Not owned.
-    BufferedBlockMgr2::Client* client_;
+    BufferedBlockMgr2::Client* _block_mgr_client;
 
     // Buffers input rows added in process_child_batch() until enough rows are able to
     // be returned by get_next_output_batch(), in which case row batches are returned from
diff --git a/be/src/exec/spill_sort_node.cc b/be/src/exec/spill_sort_node.cc
index 907c463..2d49364 100644
--- a/be/src/exec/spill_sort_node.cc
+++ b/be/src/exec/spill_sort_node.cc
@@ -168,6 +168,9 @@ Status SpillSortNode::sort_input(RuntimeState* state) {
     } while (!eos);
 
     RETURN_IF_ERROR(_sorter->input_done());
+    if (_sorter->is_spilled()) {
+        add_runtime_exec_option("Spilled");
+    }
     return Status::OK();
 }
 
diff --git a/be/src/runtime/spill_sorter.cc b/be/src/runtime/spill_sorter.cc
index 3df7312..b331731 100644
--- a/be/src/runtime/spill_sorter.cc
+++ b/be/src/runtime/spill_sorter.cc
@@ -1047,7 +1047,8 @@ SpillSorter::SpillSorter(const TupleRowComparator& compare_less_than,
     _initial_runs_counter(NULL),
     _num_merges_counter(NULL),
     _in_mem_sort_timer(NULL),
-    _sorted_data_size(NULL) {
+    _sorted_data_size(NULL),
+    _spilled(false){
 }
 
 SpillSorter::~SpillSorter() {
@@ -1109,6 +1110,7 @@ Status SpillSorter::add_batch(RowBatch* batch) {
             // The current run is full. Sort it and begin the next one.
             RETURN_IF_ERROR(sort_run());
             RETURN_IF_ERROR(_sorted_runs.back()->unpin_all_blocks());
+            _spilled = true;
             _unsorted_run = _obj_pool.add(
                     new Run(this, _output_row_desc->tuple_descriptors()[0], true));
             RETURN_IF_ERROR(_unsorted_run->init());
@@ -1333,4 +1335,4 @@ Status SpillSorter::create_merger(int num_runs) {
     return Status::OK();
 }
 
-} // namespace impala
+} // namespace doris 
diff --git a/be/src/runtime/spill_sorter.h b/be/src/runtime/spill_sorter.h
index 56b2df8..d8ddab6 100644
--- a/be/src/runtime/spill_sorter.h
+++ b/be/src/runtime/spill_sorter.h
@@ -118,6 +118,9 @@ public:
     // may or may not have been called.
     Status reset();
 
+    bool is_spilled() {
+        return _spilled;
+    }
     // Estimate the memory overhead in bytes for an intermediate merge, based on the
     // maximum number of memory buffers available for the sort, the row descriptor for
     // the sorted tuples and the batch size used (in rows).
@@ -212,6 +215,8 @@ private:
     RuntimeProfile::Counter* _num_merges_counter;
     RuntimeProfile::Counter* _in_mem_sort_timer;
     RuntimeProfile::Counter* _sorted_data_size;
+
+    bool _spilled;
 };
 
 } // namespace doris


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org