You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by la...@apache.org on 2021/04/12 10:35:20 UTC

[impala] 03/03: IMPALA-10583: Fix bug on unbounded result spooling memory config.

This is an automated email from the ASF dual-hosted git repository.

laszlog pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit e01b6312593197749a17dd6895f976c35eac5769
Author: Riza Suminto <ri...@cloudera.com>
AuthorDate: Fri Mar 12 09:37:41 2021 -0800

    IMPALA-10583: Fix bug on unbounded result spooling memory config.
    
    Two bugs happening on result spooling when we set two of its query
    options to 0 (unbounded).
    
    The first bug happens if max_spilled_result_spooling_mem =
    0 (unbounded). max_unpinned_bytes_ in SpillableRowBatchQueue will be set
    to 0, SpillableRowBatchQueue::IsFull() then will always return true, and
    the query hang. This patch fix it by setting max_unpinned_bytes_ to
    INT64_MAX if max_spilled_result_spooling_mem = 0.
    
    The second bug happens if we set max_result_spooling_mem =
    0 (unbounded). PlanRootSink.java will peg maxMemReservationBytes to
    always equal to minMemReservationBytes. This patch fixes this by
    reverting to the default max_result_spooling_mem (100MB).
    
    Testing:
    - Add test_unbounded_result_spooling_mem.
    - Pass core tests.
    
    Change-Id: If8f5e3668281bba8813f8082f45b4faa7721530e
    Reviewed-on: http://gerrit.cloudera.org:8080/17187
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/exec/buffered-plan-root-sink.cc             |  8 +++++---
 be/src/runtime/spillable-row-batch-queue.cc        |  5 ++++-
 .../org/apache/impala/planner/PlanRootSink.java    | 12 +++++++++--
 tests/query_test/test_result_spooling.py           | 24 ++++++++++++++++++++++
 4 files changed, 43 insertions(+), 6 deletions(-)

diff --git a/be/src/exec/buffered-plan-root-sink.cc b/be/src/exec/buffered-plan-root-sink.cc
index daaa97e..1cb148b 100644
--- a/be/src/exec/buffered-plan-root-sink.cc
+++ b/be/src/exec/buffered-plan-root-sink.cc
@@ -50,9 +50,11 @@ Status BufferedPlanRootSink::Open(RuntimeState* state) {
   RETURN_IF_ERROR(DebugAction(state->query_options(), "BPRS_BEFORE_OPEN"));
   RETURN_IF_ERROR(DataSink::Open(state));
   current_batch_ = make_unique<RowBatch>(row_desc_, state->batch_size(), mem_tracker());
-  batch_queue_.reset(new SpillableRowBatchQueue(name_,
-      state->query_options().max_spilled_result_spooling_mem, state, mem_tracker(),
-      profile(), row_desc_, resource_profile_, debug_options_));
+  int64_t max_spilled_mem = state->query_options().max_spilled_result_spooling_mem;
+  // max_spilled_result_spooling_mem = 0 means unbounded.
+  if (max_spilled_mem <= 0) max_spilled_mem = INT64_MAX;
+  batch_queue_.reset(new SpillableRowBatchQueue(name_, max_spilled_mem, state,
+      mem_tracker(), profile(), row_desc_, resource_profile_, debug_options_));
   RETURN_IF_ERROR(batch_queue_->Open());
   return Status::OK();
 }
diff --git a/be/src/runtime/spillable-row-batch-queue.cc b/be/src/runtime/spillable-row-batch-queue.cc
index eb02900..ce5a4fd 100644
--- a/be/src/runtime/spillable-row-batch-queue.cc
+++ b/be/src/runtime/spillable-row-batch-queue.cc
@@ -37,7 +37,10 @@ SpillableRowBatchQueue::SpillableRowBatchQueue(const string& name,
     row_desc_(row_desc),
     resource_profile_(resource_profile),
     debug_options_(debug_options),
-    max_unpinned_bytes_(max_unpinned_bytes) {}
+    max_unpinned_bytes_(max_unpinned_bytes) {
+  DCHECK_GT(max_unpinned_bytes_, 0);
+  DCHECK_GT(resource_profile_.spillable_buffer_size, 0);
+}
 
 SpillableRowBatchQueue::~SpillableRowBatchQueue() {
   DCHECK(closed_);
diff --git a/fe/src/main/java/org/apache/impala/planner/PlanRootSink.java b/fe/src/main/java/org/apache/impala/planner/PlanRootSink.java
index b4a41b0..53887b7 100644
--- a/fe/src/main/java/org/apache/impala/planner/PlanRootSink.java
+++ b/fe/src/main/java/org/apache/impala/planner/PlanRootSink.java
@@ -107,12 +107,20 @@ public class PlanRootSink extends DataSink {
         return;
       }
 
+      long maxSpoolingMem = queryOptions.getMax_result_spooling_mem();
+      if (maxSpoolingMem <= 0) {
+        // max_result_spooling_mem = 0 means unbounded. But instead of setting unlimited
+        // memory reservation, we fallback to the default max_result_spooling_mem.
+        TQueryOptions defaults = new TQueryOptions();
+        maxSpoolingMem = defaults.getMax_result_spooling_mem();
+        queryOptions.setMax_result_spooling_mem(maxSpoolingMem);
+      }
+
       long bufferSize = queryOptions.getDefault_spillable_buffer_size();
       long maxRowBufferSize = PlanNode.computeMaxSpillableBufferSize(
           bufferSize, queryOptions.getMax_row_size());
       long minMemReservationBytes = 2 * maxRowBufferSize;
-      long maxMemReservationBytes = Math.max(
-          queryOptions.getMax_result_spooling_mem(), minMemReservationBytes);
+      long maxMemReservationBytes = Math.max(maxSpoolingMem, minMemReservationBytes);
 
       // User might set query option scratch_limit that is lower than either of
       // minMemReservationBytes, maxMemReservationBytes, or
diff --git a/tests/query_test/test_result_spooling.py b/tests/query_test/test_result_spooling.py
index 04aa9f0..a2db1bc 100644
--- a/tests/query_test/test_result_spooling.py
+++ b/tests/query_test/test_result_spooling.py
@@ -488,6 +488,30 @@ class TestResultSpoolingMaxReservation(ImpalaTestSuite):
     exec_options['default_spillable_buffer_size'] = 8 * 1024
     self.__run_small_spilling_query(exec_options, "70.00 KB")
 
+  def test_unbounded_result_spooling_mem(self, vector):
+    """Test result spooling against unbounded MAX_RESULT_SPOOLING_MEM and
+    MAX_SPILLED_RESULT_SPOOLING_MEM. In this situation, planner should override
+    MAX_RESULT_SPOOLING_MEM to its default (100MB) and BufferedPlanRootSink should
+    assume MAX_SPILLED_RESULT_SPOOLING_MEM = INT64_MAX."""
+    exec_options = vector.get_value('exec_option')
+    exec_options['debug_action'] = vector.get_value('debug_action')
+    exec_options['spool_query_results'] = 'true'
+    exec_options['max_row_size'] = 8 * 1024
+    exec_options['max_result_spooling_mem'] = 0
+    exec_options['max_spilled_result_spooling_mem'] = 0
+    exec_options['default_spillable_buffer_size'] = 8 * 1024
+
+    query = "select * from functional.alltypes order by id limit 1500"
+    result = self.execute_query(query, exec_options)
+    assert result.success, "Failed to run {0} when result spooling is enabled" \
+        .format(query)
+
+    # Check that PLAN_ROOT_SINK's reservation limit match the default
+    # MAX_RESULT_SPOOLING_MEM.
+    plan_root_sink_reservation_limit = "PLAN_ROOT_SINK[\s\S]*?ReservationLimit: {0}" \
+        .format('100.00 MB')
+    assert re.search(plan_root_sink_reservation_limit, result.runtime_profile)
+
   def __run_small_spilling_query(self, exec_options, expected_limit):
     """Given an exec_options, test that simple query below spills and PLAN_ROOT_SINK's
     ReservationLimit match with the expected_limit"""