You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2020/07/09 01:30:39 UTC
[incubator-doris] branch master updated: [Spill]Fix the problem of
mem exec,
when analytic eval node need to spill to disk with a low mem limit (#3991)
This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new fafc7e4 [Spill]Fix the problem of mem exec, when analytic eval node need to spill to disk with a low mem limit (#3991)
fafc7e4 is described below
commit fafc7e406efc2f47961b18ed47e08a540f465a28
Author: HappenLee <ha...@hotmail.com>
AuthorDate: Thu Jul 9 09:30:22 2020 +0800
[Spill]Fix the problem of mem exec, when analytic eval node need to spill to disk with a low mem limit (#3991)
[Bug] Fix the problem of mem exec, when analytic eval node need to spill to disk with a low mem limit.
And clear_reservations of Analytic node reservation of block manager.
[Running Profile] Add Spilled flag in Running Profile, when Analytic eval node and sort node spill to Disk.
---
be/src/exec/analytic_eval_node.cpp | 19 ++++++++++++++-----
be/src/exec/analytic_eval_node.h | 2 +-
be/src/exec/spill_sort_node.cc | 3 +++
be/src/runtime/spill_sorter.cc | 6 ++++--
be/src/runtime/spill_sorter.h | 5 +++++
5 files changed, 27 insertions(+), 8 deletions(-)
diff --git a/be/src/exec/analytic_eval_node.cpp b/be/src/exec/analytic_eval_node.cpp
index 86db457..400faa5 100644
--- a/be/src/exec/analytic_eval_node.cpp
+++ b/be/src/exec/analytic_eval_node.cpp
@@ -25,8 +25,6 @@
#include "runtime/runtime_state.h"
#include "udf/udf_internal.h"
-static const int MAX_TUPLE_POOL_SIZE = 8 * 1024 * 1024; // 8MB
-
namespace doris {
using doris_udf::BigIntVal;
@@ -53,6 +51,7 @@ AnalyticEvalNode::AnalyticEvalNode(ObjectPool* pool, const TPlanNode& tnode,
_dummy_result_tuple(NULL),
_curr_partition_idx(-1),
_prev_input_row(NULL),
+ _block_mgr_client(nullptr),
_input_eos(false),
_evaluation_timer(NULL) {
if (tnode.analytic_node.__isset.buffered_tuple_id) {
@@ -194,8 +193,8 @@ Status AnalyticEvalNode::open(RuntimeState* state) {
RETURN_IF_CANCELLED(state);
//RETURN_IF_ERROR(QueryMaintenance(state));
RETURN_IF_ERROR(child(0)->open(state));
- RETURN_IF_ERROR(state->block_mgr2()->register_client(2, mem_tracker(), state, &client_));
- _input_stream.reset(new BufferedTupleStream2(state, child(0)->row_desc(), state->block_mgr2(), client_, false, true));
+ RETURN_IF_ERROR(state->block_mgr2()->register_client(2, mem_tracker(), state, &_block_mgr_client));
+ _input_stream.reset(new BufferedTupleStream2(state, child(0)->row_desc(), state->block_mgr2(), _block_mgr_client, false, true));
RETURN_IF_ERROR(_input_stream->init(id(), runtime_profile(), true));
bool got_read_buffer;
@@ -689,6 +688,7 @@ Status AnalyticEvalNode::process_child_batch(RuntimeState* state) {
// not enough memory (status() is OK). If there isn't enough memory, we unpin
// the stream and continue writing/reading in unpinned mode.
// TODO: Consider re-pinning later if the output stream is fully consumed.
+ add_runtime_exec_option("Spilled");
RETURN_IF_ERROR(status);
RETURN_IF_ERROR(_input_stream->unpin_stream());
VLOG_FILE << id() << " Unpin input stream while adding row idx=" << stream_idx;
@@ -710,7 +710,12 @@ Status AnalyticEvalNode::process_child_batch(RuntimeState* state) {
// Transfer resources to _prev_tuple_pool when enough resources have accumulated
// and the _prev_tuple_pool has already been transfered to an output batch.
- if (_curr_tuple_pool->total_allocated_bytes() > MAX_TUPLE_POOL_SIZE &&
+
+ // The memory limit of _curr_tuple_pool is set by the fixed value
+ // The size is specified as 8MB, which is used in the extremely strict memory limit.
+ // Eg: exec_mem_limit < 100MB may cause memory exeecded limit problem. So change it to half of max block size to prevent the problem.
+ // TODO: Should we keep the buffer of _curr_tuple_pool or release the memory occupied ASAP?
+ if (_curr_tuple_pool->total_allocated_bytes() > state->block_mgr2()->max_block_size() / 2 &&
(_prev_pool_last_result_idx == -1 || _prev_pool_last_window_idx == -1)) {
_prev_tuple_pool->acquire_data(_curr_tuple_pool.get(), false);
_prev_pool_last_result_idx = _last_result_idx;
@@ -811,6 +816,7 @@ Status AnalyticEvalNode::get_next(RuntimeState* state, RowBatch* row_batch, bool
RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::GETNEXT));
RETURN_IF_CANCELLED(state);
//RETURN_IF_ERROR(QueryMaintenance(state));
+ RETURN_IF_ERROR(state->check_query_state("Analytic eval, while get_next."));
VLOG_FILE << id() << " GetNext: " << debug_state_string(false);
if (reached_limit()) {
@@ -855,6 +861,9 @@ Status AnalyticEvalNode::close(RuntimeState* state) {
_input_stream->close();
}
+ if (_block_mgr_client != nullptr) {
+ state->block_mgr2()->clear_reservations(_block_mgr_client);
+ }
// Close all evaluators and fn ctxs. If an error occurred in Init or rrepare there may
// be fewer ctxs than evaluators. We also need to Finalize if _curr_tuple was created
// in Open.
diff --git a/be/src/exec/analytic_eval_node.h b/be/src/exec/analytic_eval_node.h
index 52516f6..f15c84f 100644
--- a/be/src/exec/analytic_eval_node.h
+++ b/be/src/exec/analytic_eval_node.h
@@ -308,7 +308,7 @@ private:
boost::scoped_ptr<RowBatch> _curr_child_batch;
// Block manager client used by _input_stream. Not owned.
- BufferedBlockMgr2::Client* client_;
+ BufferedBlockMgr2::Client* _block_mgr_client;
// Buffers input rows added in process_child_batch() until enough rows are able to
// be returned by get_next_output_batch(), in which case row batches are returned from
diff --git a/be/src/exec/spill_sort_node.cc b/be/src/exec/spill_sort_node.cc
index 907c463..2d49364 100644
--- a/be/src/exec/spill_sort_node.cc
+++ b/be/src/exec/spill_sort_node.cc
@@ -168,6 +168,9 @@ Status SpillSortNode::sort_input(RuntimeState* state) {
} while (!eos);
RETURN_IF_ERROR(_sorter->input_done());
+ if (_sorter->is_spilled()) {
+ add_runtime_exec_option("Spilled");
+ }
return Status::OK();
}
diff --git a/be/src/runtime/spill_sorter.cc b/be/src/runtime/spill_sorter.cc
index 3df7312..b331731 100644
--- a/be/src/runtime/spill_sorter.cc
+++ b/be/src/runtime/spill_sorter.cc
@@ -1047,7 +1047,8 @@ SpillSorter::SpillSorter(const TupleRowComparator& compare_less_than,
_initial_runs_counter(NULL),
_num_merges_counter(NULL),
_in_mem_sort_timer(NULL),
- _sorted_data_size(NULL) {
+ _sorted_data_size(NULL),
+ _spilled(false){
}
SpillSorter::~SpillSorter() {
@@ -1109,6 +1110,7 @@ Status SpillSorter::add_batch(RowBatch* batch) {
// The current run is full. Sort it and begin the next one.
RETURN_IF_ERROR(sort_run());
RETURN_IF_ERROR(_sorted_runs.back()->unpin_all_blocks());
+ _spilled = true;
_unsorted_run = _obj_pool.add(
new Run(this, _output_row_desc->tuple_descriptors()[0], true));
RETURN_IF_ERROR(_unsorted_run->init());
@@ -1333,4 +1335,4 @@ Status SpillSorter::create_merger(int num_runs) {
return Status::OK();
}
-} // namespace impala
+} // namespace doris
diff --git a/be/src/runtime/spill_sorter.h b/be/src/runtime/spill_sorter.h
index 56b2df8..d8ddab6 100644
--- a/be/src/runtime/spill_sorter.h
+++ b/be/src/runtime/spill_sorter.h
@@ -118,6 +118,9 @@ public:
// may or may not have been called.
Status reset();
+ bool is_spilled() {
+ return _spilled;
+ }
// Estimate the memory overhead in bytes for an intermediate merge, based on the
// maximum number of memory buffers available for the sort, the row descriptor for
// the sorted tuples and the batch size used (in rows).
@@ -212,6 +215,8 @@ private:
RuntimeProfile::Counter* _num_merges_counter;
RuntimeProfile::Counter* _in_mem_sort_timer;
RuntimeProfile::Counter* _sorted_data_size;
+
+ bool _spilled;
};
} // namespace doris
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org