You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by la...@apache.org on 2021/04/12 10:35:20 UTC
[impala] 03/03: IMPALA-10583: Fix bug on unbounded result spooling
memory config.
This is an automated email from the ASF dual-hosted git repository.
laszlog pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
commit e01b6312593197749a17dd6895f976c35eac5769
Author: Riza Suminto <ri...@cloudera.com>
AuthorDate: Fri Mar 12 09:37:41 2021 -0800
IMPALA-10583: Fix bug on unbounded result spooling memory config.
Two bugs happening on result spooling when we set two of its query
options to 0 (unbounded).
The first bug happens if max_spilled_result_spooling_mem =
0 (unbounded). max_unpinned_bytes_ in SpillableRowBatchQueue will be set
to 0, SpillableRowBatchQueue::IsFull() then will always return true, and
the query hang. This patch fix it by setting max_unpinned_bytes_ to
INT64_MAX if max_spilled_result_spooling_mem = 0.
The second bug happens if we set max_result_spooling_mem =
0 (unbounded). PlanRootSink.java will peg maxMemReservationBytes to
always equal to minMemReservationBytes. This patch fixes this by
reverting to the default max_result_spooling_mem (100MB).
Testing:
- Add test_unbounded_result_spooling_mem.
- Pass core tests.
Change-Id: If8f5e3668281bba8813f8082f45b4faa7721530e
Reviewed-on: http://gerrit.cloudera.org:8080/17187
Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
be/src/exec/buffered-plan-root-sink.cc | 8 +++++---
be/src/runtime/spillable-row-batch-queue.cc | 5 ++++-
.../org/apache/impala/planner/PlanRootSink.java | 12 +++++++++--
tests/query_test/test_result_spooling.py | 24 ++++++++++++++++++++++
4 files changed, 43 insertions(+), 6 deletions(-)
diff --git a/be/src/exec/buffered-plan-root-sink.cc b/be/src/exec/buffered-plan-root-sink.cc
index daaa97e..1cb148b 100644
--- a/be/src/exec/buffered-plan-root-sink.cc
+++ b/be/src/exec/buffered-plan-root-sink.cc
@@ -50,9 +50,11 @@ Status BufferedPlanRootSink::Open(RuntimeState* state) {
RETURN_IF_ERROR(DebugAction(state->query_options(), "BPRS_BEFORE_OPEN"));
RETURN_IF_ERROR(DataSink::Open(state));
current_batch_ = make_unique<RowBatch>(row_desc_, state->batch_size(), mem_tracker());
- batch_queue_.reset(new SpillableRowBatchQueue(name_,
- state->query_options().max_spilled_result_spooling_mem, state, mem_tracker(),
- profile(), row_desc_, resource_profile_, debug_options_));
+ int64_t max_spilled_mem = state->query_options().max_spilled_result_spooling_mem;
+ // max_spilled_result_spooling_mem = 0 means unbounded.
+ if (max_spilled_mem <= 0) max_spilled_mem = INT64_MAX;
+ batch_queue_.reset(new SpillableRowBatchQueue(name_, max_spilled_mem, state,
+ mem_tracker(), profile(), row_desc_, resource_profile_, debug_options_));
RETURN_IF_ERROR(batch_queue_->Open());
return Status::OK();
}
diff --git a/be/src/runtime/spillable-row-batch-queue.cc b/be/src/runtime/spillable-row-batch-queue.cc
index eb02900..ce5a4fd 100644
--- a/be/src/runtime/spillable-row-batch-queue.cc
+++ b/be/src/runtime/spillable-row-batch-queue.cc
@@ -37,7 +37,10 @@ SpillableRowBatchQueue::SpillableRowBatchQueue(const string& name,
row_desc_(row_desc),
resource_profile_(resource_profile),
debug_options_(debug_options),
- max_unpinned_bytes_(max_unpinned_bytes) {}
+ max_unpinned_bytes_(max_unpinned_bytes) {
+ DCHECK_GT(max_unpinned_bytes_, 0);
+ DCHECK_GT(resource_profile_.spillable_buffer_size, 0);
+}
SpillableRowBatchQueue::~SpillableRowBatchQueue() {
DCHECK(closed_);
diff --git a/fe/src/main/java/org/apache/impala/planner/PlanRootSink.java b/fe/src/main/java/org/apache/impala/planner/PlanRootSink.java
index b4a41b0..53887b7 100644
--- a/fe/src/main/java/org/apache/impala/planner/PlanRootSink.java
+++ b/fe/src/main/java/org/apache/impala/planner/PlanRootSink.java
@@ -107,12 +107,20 @@ public class PlanRootSink extends DataSink {
return;
}
+ long maxSpoolingMem = queryOptions.getMax_result_spooling_mem();
+ if (maxSpoolingMem <= 0) {
+ // max_result_spooling_mem = 0 means unbounded. But instead of setting unlimited
+ // memory reservation, we fallback to the default max_result_spooling_mem.
+ TQueryOptions defaults = new TQueryOptions();
+ maxSpoolingMem = defaults.getMax_result_spooling_mem();
+ queryOptions.setMax_result_spooling_mem(maxSpoolingMem);
+ }
+
long bufferSize = queryOptions.getDefault_spillable_buffer_size();
long maxRowBufferSize = PlanNode.computeMaxSpillableBufferSize(
bufferSize, queryOptions.getMax_row_size());
long minMemReservationBytes = 2 * maxRowBufferSize;
- long maxMemReservationBytes = Math.max(
- queryOptions.getMax_result_spooling_mem(), minMemReservationBytes);
+ long maxMemReservationBytes = Math.max(maxSpoolingMem, minMemReservationBytes);
// User might set query option scratch_limit that is lower than either of
// minMemReservationBytes, maxMemReservationBytes, or
diff --git a/tests/query_test/test_result_spooling.py b/tests/query_test/test_result_spooling.py
index 04aa9f0..a2db1bc 100644
--- a/tests/query_test/test_result_spooling.py
+++ b/tests/query_test/test_result_spooling.py
@@ -488,6 +488,30 @@ class TestResultSpoolingMaxReservation(ImpalaTestSuite):
exec_options['default_spillable_buffer_size'] = 8 * 1024
self.__run_small_spilling_query(exec_options, "70.00 KB")
+ def test_unbounded_result_spooling_mem(self, vector):
+ """Test result spooling against unbounded MAX_RESULT_SPOOLING_MEM and
+ MAX_SPILLED_RESULT_SPOOLING_MEM. In this situation, planner should override
+ MAX_RESULT_SPOOLING_MEM to its default (100MB) and BufferedPlanRootSink should
+ assume MAX_SPILLED_RESULT_SPOOLING_MEM = INT64_MAX."""
+ exec_options = vector.get_value('exec_option')
+ exec_options['debug_action'] = vector.get_value('debug_action')
+ exec_options['spool_query_results'] = 'true'
+ exec_options['max_row_size'] = 8 * 1024
+ exec_options['max_result_spooling_mem'] = 0
+ exec_options['max_spilled_result_spooling_mem'] = 0
+ exec_options['default_spillable_buffer_size'] = 8 * 1024
+
+ query = "select * from functional.alltypes order by id limit 1500"
+ result = self.execute_query(query, exec_options)
+ assert result.success, "Failed to run {0} when result spooling is enabled" \
+ .format(query)
+
+ # Check that PLAN_ROOT_SINK's reservation limit match the default
+ # MAX_RESULT_SPOOLING_MEM.
+ plan_root_sink_reservation_limit = "PLAN_ROOT_SINK[\s\S]*?ReservationLimit: {0}" \
+ .format('100.00 MB')
+ assert re.search(plan_root_sink_reservation_limit, result.runtime_profile)
+
def __run_small_spilling_query(self, exec_options, expected_limit):
"""Given an exec_options, test that simple query below spills and PLAN_ROOT_SINK's
ReservationLimit match with the expected_limit"""