You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by st...@apache.org on 2021/03/15 01:06:12 UTC

[impala] 01/02: IMPALA-10565: Adjust result spooling memory based on scratch_limit

This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 47219ec3663fa8e27ea16b826425b139c94580da
Author: Riza Suminto <ri...@cloudera.com>
AuthorDate: Mon Mar 8 17:41:34 2021 -0800

    IMPALA-10565: Adjust result spooling memory based on scratch_limit
    
    IMPALA-9856 enables result spooling by default. Result spooling depends
    on the ability to spill its entire BufferedTupleStream to disk once it
    hits maximum memory reservation. However, if the query option
    scratch_limit is set lower than max_spilled_result_spooling_mem, the
    query might fail in the middle of execution due to insufficient scratch
    space. This patch adds planner change to consider scratch_limit and
    scratch_dirs query option when computing resource used by result
    spooling. The algorithm is as follow:
    
    * If scratch_dirs is empty or scratch_limit < minMemReservationBytes
      required to use BufferedPlanRootSink, we set spool_query_results to
      false and fallback to use BlockingPlanRootSink.
    
    * If scratch_limit > minMemReservationBytes but still fairly low, we
      lower the max_result_spooling_mem (default is 100MB) and
      max_spilled_result_spooling_mem (default is 1GB) to fit scratch_limit.
    
    * if scratch_limit > max_spilled_result_spooling_mem, do nothing.
    
    Testing:
    - Add TestScratchLimit::test_result_spooling_and_varying_scratch_limit
    - Verify that spool_query_results query option is disabled in
      TestScratchDir::test_no_dirs
    - Pass exhaustive tests.
    
    Change-Id: I541f46e6911694e14c0fc25be1a6982fd929d3a9
    Reviewed-on: http://gerrit.cloudera.org:8080/17166
    Reviewed-by: Aman Sinha <am...@cloudera.com>
    Tested-by: Aman Sinha <am...@cloudera.com>
---
 be/src/service/query-options-test.cc               |  3 +
 be/src/util/backend-gflag-util.cc                  |  2 +
 common/thrift/BackendGflags.thrift                 |  2 +
 .../org/apache/impala/planner/PlanRootSink.java    | 77 +++++++++++++++++++++-
 .../org/apache/impala/service/BackendConfig.java   |  2 +
 .../queries/QueryTest/scratch-limit.test           | 44 +++++++++++++
 tests/custom_cluster/test_scratch_disk.py          | 11 ++--
 tests/query_test/test_scratch_limit.py             |  8 +++
 8 files changed, 143 insertions(+), 6 deletions(-)

diff --git a/be/src/service/query-options-test.cc b/be/src/service/query-options-test.cc
index a6d467d..0759f5c 100644
--- a/be/src/service/query-options-test.cc
+++ b/be/src/service/query-options-test.cc
@@ -155,6 +155,9 @@ TEST(QueryOptions, SetByteOptions) {
       {MAKE_OPTIONDEF(preagg_bytes_limit), {-1, I64_MAX}},
       {MAKE_OPTIONDEF(sort_run_bytes_limit), {-1, I64_MAX}},
       {MAKE_OPTIONDEF(targeted_kudu_scan_range_length), {-1, I64_MAX}},
+      {MAKE_OPTIONDEF(scratch_limit), {-1, I64_MAX}},
+      {MAKE_OPTIONDEF(max_result_spooling_mem), {-1, I64_MAX}},
+      {MAKE_OPTIONDEF(max_spilled_result_spooling_mem), {-1, I64_MAX}},
   };
   vector<pair<OptionDef<int32_t>, Range<int32_t>>> case_set_i32{
       {MAKE_OPTIONDEF(runtime_filter_min_size),
diff --git a/be/src/util/backend-gflag-util.cc b/be/src/util/backend-gflag-util.cc
index d20c94e..9d0daf6 100644
--- a/be/src/util/backend-gflag-util.cc
+++ b/be/src/util/backend-gflag-util.cc
@@ -90,6 +90,7 @@ DECLARE_bool(compact_catalog_topic);
 DECLARE_bool(enable_incremental_metadata_updates);
 DECLARE_int64(topic_update_tbl_max_wait_time_ms);
 DECLARE_int32(catalog_max_lock_skipped_topic_updates);
+DECLARE_string(scratch_dirs);
 
 // HS2 SAML2.0 configuration
 // Defined here because TAG_FLAG caused issues in global-flags.cc
@@ -268,6 +269,7 @@ Status PopulateThriftBackendGflags(TBackendGflags& cfg) {
   cfg.__set_saml2_group_attribute_name(FLAGS_saml2_group_attribute_name);
   cfg.__set_saml2_group_filter(FLAGS_saml2_group_filter);
   cfg.__set_saml2_ee_test_mode(FLAGS_saml2_ee_test_mode);
+  cfg.__set_scratch_dirs(FLAGS_scratch_dirs);
   return Status::OK();
 }
 
diff --git a/common/thrift/BackendGflags.thrift b/common/thrift/BackendGflags.thrift
index 3a8537c..f468755 100644
--- a/common/thrift/BackendGflags.thrift
+++ b/common/thrift/BackendGflags.thrift
@@ -189,4 +189,6 @@ struct TBackendGflags {
   82: required string saml2_group_filter
 
   83: required bool saml2_ee_test_mode
+
+  84: required string scratch_dirs
 }
diff --git a/fe/src/main/java/org/apache/impala/planner/PlanRootSink.java b/fe/src/main/java/org/apache/impala/planner/PlanRootSink.java
index 61d57df..b4a41b0 100644
--- a/fe/src/main/java/org/apache/impala/planner/PlanRootSink.java
+++ b/fe/src/main/java/org/apache/impala/planner/PlanRootSink.java
@@ -20,11 +20,13 @@ package org.apache.impala.planner;
 import java.util.List;
 
 import org.apache.impala.analysis.Expr;
+import org.apache.impala.service.BackendConfig;
 import org.apache.impala.thrift.TDataSink;
 import org.apache.impala.thrift.TDataSinkType;
 import org.apache.impala.thrift.TExplainLevel;
 import org.apache.impala.thrift.TPlanRootSink;
 import org.apache.impala.thrift.TQueryOptions;
+import org.apache.log4j.Logger;
 
 import com.google.common.base.Preconditions;
 
@@ -35,6 +37,7 @@ import com.google.common.base.Preconditions;
  * client, despite both executing concurrently.
  */
 public class PlanRootSink extends DataSink {
+  private static final Logger LOG = Logger.getLogger(PlanRootSink.class);
 
   // The default estimated memory consumption is 10 mb. Only used if statistics are not
   // available. 10 mb should be sufficient to buffer results from most queries. See
@@ -69,8 +72,9 @@ public class PlanRootSink extends DataSink {
    * spooling is disabled, a ResourceProfile is returned with no reservation or buffer
    * sizes, and the estimated memory consumption is 0. Without result spooling, no rows
    * get buffered, and only a single RowBatch is passed to the client at a time. Given
-   * that RowBatch memory is currently unreserved, no reservation is necessary. If
-   * SPOOL_QUERY_RESULTS is true, then the ResourceProfile sets a min/max resevation,
+   * that RowBatch memory is currently unreserved, no reservation is necessary.
+   *
+   * If SPOOL_QUERY_RESULTS is true, then the ResourceProfile sets a min/max resevation,
    * estimated memory consumption, max buffer size, and spillable buffer size. The
    * 'memEstimateBytes' (estimated memory consumption in bytes) is set by taking the
    * estimated number of input rows into the sink and multiplying it by the estimated
@@ -81,10 +85,28 @@ public class PlanRootSink extends DataSink {
    * nearest power of 2) to account for the read and write pages in the
    * BufferedTupleStream used by the backend plan-root-sink. The maximum reservation is
    * set to the query-level config MAX_PINNED_RESULT_SPOOLING_MEMORY.
+   *
+   * If SPOOL_QUERY_RESULTS is true but spill is disabled either due to SCRATCH_LIMIT = 0
+   * or SCRATCH_DIRS is empty, SPOOL_QUERY_RESULTS will be set to false. A ResourceProfile
+   * is returned with no reservation or buffer sizes, and the estimated memory consumption
+   * is 0.
    */
   @Override
   public void computeResourceProfile(TQueryOptions queryOptions) {
     if (queryOptions.isSpool_query_results()) {
+      // Check if we need to disable result spooling because we can not spill.
+      long scratchLimit = queryOptions.getScratch_limit();
+      String scratchDirs = BackendConfig.INSTANCE.getScratchDirs();
+      if (scratchLimit == 0 || scratchDirs.isEmpty()) {
+        if (LOG.isTraceEnabled()) {
+          LOG.trace("Result spooling is disabled due to unavailability of scratch "
+              + "space.");
+        }
+        queryOptions.setSpool_query_results(false);
+        resourceProfile_ = ResourceProfile.noReservation(0);
+        return;
+      }
+
       long bufferSize = queryOptions.getDefault_spillable_buffer_size();
       long maxRowBufferSize = PlanNode.computeMaxSpillableBufferSize(
           bufferSize, queryOptions.getMax_row_size());
@@ -92,6 +114,57 @@ public class PlanRootSink extends DataSink {
       long maxMemReservationBytes = Math.max(
           queryOptions.getMax_result_spooling_mem(), minMemReservationBytes);
 
+      // User might set query option scratch_limit that is lower than either of
+      // minMemReservationBytes, maxMemReservationBytes, or
+      // max_spilled_result_spooling_mem. We define:
+      //
+      //   maxAllowedScratchLimit = scratchLimit - maxRowBufferSize
+      //
+      // If maxAllowedScratchLimit < minMemReservationBytes, we fall back to use
+      // BlockingPlanRootSink in the backend by silently disabling result spooling.
+      // If maxAllowedScratchLimit < maxMemReservationBytes, we silently lower
+      // maxMemReservationBytes, max_result_spooling_mem, and
+      // max_spilled_result_spooling_mem accordingly to fit maxAllowedScratchLimit.
+      // Otherwise, do nothing.
+      //
+      // BufferedPlanRootSink may slightly exceed its maxMemReservationBytes when it
+      // decides to spill. maxRowBufferSize bytes is subtracted in maxAllowedScratchLimit
+      // to give extra space, ensuring spill does not exceed scratch_limit.
+      if (scratchLimit > -1) {
+        long maxAllowedScratchLimit = scratchLimit - maxRowBufferSize;
+        if (maxAllowedScratchLimit < minMemReservationBytes) {
+          if (LOG.isTraceEnabled()) {
+            LOG.trace("Result spooling is disabled due to low scratch_limit ("
+                + scratchLimit + "). Try increasing scratch_limit to >= "
+                + (minMemReservationBytes + maxRowBufferSize)
+                + " to enable result spooling.");
+          }
+          queryOptions.setSpool_query_results(false);
+          resourceProfile_ = ResourceProfile.noReservation(0);
+          return;
+        } else if (maxAllowedScratchLimit < maxMemReservationBytes) {
+          maxMemReservationBytes = maxAllowedScratchLimit;
+          queryOptions.setMax_result_spooling_mem(maxAllowedScratchLimit);
+          if (LOG.isTraceEnabled()) {
+            LOG.trace("max_result_spooling_mem is lowered to " + maxMemReservationBytes
+                + " to fit scratch_limit (" + scratchLimit + ").");
+          }
+        }
+
+        // If we got here, it means we can use BufferedPlanRootSink with at least
+        // minMemReservationBytes in memory. But the amount of memory we can spill to disk
+        // may still be limited by scratch_limit. Thus, we need to lower
+        // max_spilled_result_spooling_mem as necessary.
+        if (maxAllowedScratchLimit < queryOptions.getMax_spilled_result_spooling_mem()) {
+          queryOptions.setMax_spilled_result_spooling_mem(maxAllowedScratchLimit);
+          if (LOG.isTraceEnabled()) {
+            LOG.trace("max_spilled_result_spooling_mem is lowered to "
+                + maxAllowedScratchLimit + " to fit scratch_limit ("
+                + scratchLimit + ").");
+          }
+        }
+      }
+
       PlanNode inputNode = fragment_.getPlanRoot();
 
       long memEstimateBytes;
diff --git a/fe/src/main/java/org/apache/impala/service/BackendConfig.java b/fe/src/main/java/org/apache/impala/service/BackendConfig.java
index 923b5ed..68a8665 100644
--- a/fe/src/main/java/org/apache/impala/service/BackendConfig.java
+++ b/fe/src/main/java/org/apache/impala/service/BackendConfig.java
@@ -246,6 +246,8 @@ public class BackendConfig {
 
   public boolean getSaml2EETestMode() { return backendCfg_.saml2_ee_test_mode; }
 
+  public String getScratchDirs() { return backendCfg_.scratch_dirs; }
+
   // Inits the auth_to_local configuration in the static KerberosName class.
   private static void initAuthToLocal() {
     // If auth_to_local is enabled, we read the configuration hadoop.security.auth_to_local
diff --git a/testdata/workloads/functional-query/queries/QueryTest/scratch-limit.test b/testdata/workloads/functional-query/queries/QueryTest/scratch-limit.test
new file mode 100644
index 0000000..fed0155
--- /dev/null
+++ b/testdata/workloads/functional-query/queries/QueryTest/scratch-limit.test
@@ -0,0 +1,44 @@
+====
+---- QUERY
+# Unbounded scratch_limit will not override default query options of result spooling.
+set scratch_limit=-1;
+select o_orderdate, o_custkey, o_comment from tpch.orders limit 100000;
+---- RUNTIME_PROFILE
+row_regex: .*set by configuration and planner.*
+row_regex: \|  mem-estimate=8.63MB mem-reservation=4.00MB spill-buffer=2.00MB thread-reservation=0
+====
+---- QUERY
+# Result spooling should be disabled if scratch_limit is 0.
+set scratch_limit=0;
+select o_orderdate, o_custkey, o_comment from tpch.orders limit 100000;
+---- RUNTIME_PROFILE
+row_regex: .*set by configuration and planner.*SPOOL_QUERY_RESULTS=0
+row_regex: \|  mem-estimate=0B mem-reservation=0B thread-reservation=0
+====
+---- QUERY
+# Result spooling should be disabled if scratch_limit is less than minimum memory
+# reservation required by result spooling (4MB).
+set scratch_limit=2m;
+select o_orderdate, o_custkey, o_comment from tpch.orders limit 100000;
+---- RUNTIME_PROFILE
+row_regex: .*set by configuration and planner.*SPOOL_QUERY_RESULTS=0
+row_regex: \|  mem-estimate=0B mem-reservation=0B thread-reservation=0
+====
+---- QUERY
+# scratch_limit higher than minimum memory required by result spooling, but less than the
+# default MAX_RESULT_SPOOLING_MEM (100MB).
+set scratch_limit=7m;
+select o_orderdate, o_custkey, o_comment from tpch.orders limit 100000;
+---- RUNTIME_PROFILE
+row_regex: .*set by configuration and planner.*MAX_RESULT_SPOOLING_MEM=5242880,MAX_SPILLED_RESULT_SPOOLING_MEM=5242880
+row_regex: \|  mem-estimate=5.00MB mem-reservation=4.00MB spill-buffer=2.00MB thread-reservation=0
+====
+---- QUERY
+# scratch_limit higher than minimum memory required by result spooling, but less than the
+# default MAX_SPILLED_RESULT_SPOOLING_MEM (1GB).
+set scratch_limit=200m;
+select o_orderdate, o_custkey, o_comment from tpch.orders limit 100000;
+---- RUNTIME_PROFILE
+row_regex: .*set by configuration and planner.*MAX_SPILLED_RESULT_SPOOLING_MEM=207618048
+row_regex: \|  mem-estimate=8.63MB mem-reservation=4.00MB spill-buffer=2.00MB thread-reservation=0
+====
\ No newline at end of file
diff --git a/tests/custom_cluster/test_scratch_disk.py b/tests/custom_cluster/test_scratch_disk.py
index 21f5c08..4c0f6b3 100644
--- a/tests/custom_cluster/test_scratch_disk.py
+++ b/tests/custom_cluster/test_scratch_disk.py
@@ -19,6 +19,7 @@
 
 import os
 import pytest
+import re
 import shutil
 import stat
 import tempfile
@@ -114,15 +115,17 @@ class TestScratchDir(CustomClusterTestSuite):
         "Running without spill to disk: no scratch directories provided\.")
     exec_option = vector.get_value('exec_option')
     exec_option['buffer_pool_limit'] = self.buffer_pool_limit
-    # IMPALA-9856: Disable query result spooling so that in_mem_query does not spill to
-    # disk.
-    exec_option['spool_query_results'] = '0'
     impalad = self.cluster.get_any_impalad()
     client = impalad.service.create_beeswax_client()
     # Expect spill to disk to fail
     self.execute_query_expect_failure(client, self.spill_query, exec_option)
     # Should be able to execute in-memory query
-    self.execute_query_expect_success(client, self.in_mem_query, exec_option)
+    result = self.execute_query_expect_success(client, self.in_mem_query, exec_option)
+    # IMPALA-10565: Since scratch_dirs is empty, we expect planner to disable result
+    # spooling.
+    query_options_by_planner = ".*set by configuration and planner" \
+                               ".*SPOOL_QUERY_RESULTS=0"
+    assert re.search(query_options_by_planner, result.runtime_profile)
 
   @pytest.mark.execute_serially
   def test_non_writable_dirs(self, vector):
diff --git a/tests/query_test/test_scratch_limit.py b/tests/query_test/test_scratch_limit.py
index 9bf6114..8779cd3 100644
--- a/tests/query_test/test_scratch_limit.py
+++ b/tests/query_test/test_scratch_limit.py
@@ -126,3 +126,11 @@ class TestScratchLimit(ImpalaTestSuite):
     exec_option['scratch_limit'] = '0'
     for query in self.spilling_queries:
       self.execute_query_expect_success(self.client, query, exec_option)
+
+  def test_result_spooling_and_varying_scratch_limit(self, vector):
+    """
+    IMPALA-9856 make result spooling default. Since result spooling depends on ability to
+    spill to disk, query option scratch_limit may affect memory configuration of result
+    spooling feature. This test vary scratch_limit and verify the memory adjustment.
+    """
+    self.run_test_case('QueryTest/scratch-limit', vector)