You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by st...@apache.org on 2020/10/10 02:01:39 UTC

[impala] branch master updated: IMPALA-9792: Add ability to split kudu scan ranges

This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new 2fd6f5b  IMPALA-9792: Add ability to split kudu scan ranges
2fd6f5b is described below

commit 2fd6f5bc5aa6b50e36547e52657c1117637384b6
Author: Bikramjeet Vig <bi...@gmail.com>
AuthorDate: Thu Aug 27 23:01:55 2020 -0700

    IMPALA-9792: Add ability to split kudu scan ranges
    
    This patch adds the ability to split kudu scan token via the provided
    kudu java API. A query option "TARGETED_KUDU_SCAN_RANGE_LENGTH" has
    been added to set the scan range length used in this implementation.
    
    Potential benefit:
    This helps increase parallelism during scanning which can
    result in more efficient use of CPU with higher mt_dop.
    
    Limitation:
    - The scan range length sent to kudu is just a hint and does not
      guarantee that the token will be split at that limit.
    - Comes at an added cost of an RPC to tablet server per token in
      order to split it. A slow tablet server which can already slow
      down scanning during execution can now also potentially slow
      down planning.
    - Also adds the cost of an RPC per token to open a new scanner for
      it on the kudu side. Therefore, scanning many smaller split
      tokens can slow down scanning and we can also lose benefits
      of scanning a single large token sequentially with a single scanner.
    
    Testing:
    - Added an e2e test
    
    Change-Id: Ia02fd94cc1d13c61bc6cb0765dd2cbe90e9a5ce8
    Reviewed-on: http://gerrit.cloudera.org:8080/16385
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/service/query-options-test.cc               |  1 +
 be/src/service/query-options.cc                    |  7 +++
 be/src/service/query-options.h                     |  4 +-
 common/thrift/ImpalaInternalService.thrift         |  4 ++
 common/thrift/ImpalaService.thrift                 |  7 +++
 .../org/apache/impala/planner/KuduScanNode.java    |  7 ++-
 tests/query_test/test_kudu.py                      | 59 ++++++++++++++++++++++
 7 files changed, 86 insertions(+), 3 deletions(-)

diff --git a/be/src/service/query-options-test.cc b/be/src/service/query-options-test.cc
index 710412c..8d80447 100644
--- a/be/src/service/query-options-test.cc
+++ b/be/src/service/query-options-test.cc
@@ -154,6 +154,7 @@ TEST(QueryOptions, SetByteOptions) {
       {MAKE_OPTIONDEF(broadcast_bytes_limit), {-1, I64_MAX}},
       {MAKE_OPTIONDEF(preagg_bytes_limit), {-1, I64_MAX}},
       {MAKE_OPTIONDEF(sort_run_bytes_limit), {-1, I64_MAX}},
+      {MAKE_OPTIONDEF(targeted_kudu_scan_range_length), {-1, I64_MAX}},
   };
   vector<pair<OptionDef<int32_t>, Range<int32_t>>> case_set_i32{
       {MAKE_OPTIONDEF(runtime_filter_min_size),
diff --git a/be/src/service/query-options.cc b/be/src/service/query-options.cc
index dcc16c4..63e1b73 100644
--- a/be/src/service/query-options.cc
+++ b/be/src/service/query-options.cc
@@ -965,6 +965,13 @@ Status impala::SetQueryOption(const string& key, const string& value,
          query_options->__set_enable_outer_join_to_inner_transformation(IsTrue(value));
          break;
       }
+      case TImpalaQueryOptions::TARGETED_KUDU_SCAN_RANGE_LENGTH: {
+              int64_t scan_length = 0;
+              RETURN_IF_ERROR(
+                  ParseMemValue(value, "targeted kudu scan range length", &scan_length));
+              query_options->__set_targeted_kudu_scan_range_length(scan_length);
+              break;
+      }
       default:
         if (IsRemovedQueryOption(key)) {
           LOG(WARNING) << "Ignoring attempt to set removed query option '" << key << "'";
diff --git a/be/src/service/query-options.h b/be/src/service/query-options.h
index a56fb59..3b13f3b 100644
--- a/be/src/service/query-options.h
+++ b/be/src/service/query-options.h
@@ -47,7 +47,7 @@ typedef std::unordered_map<string, beeswax::TQueryOptionLevel::type>
 // time we add or remove a query option to/from the enum TImpalaQueryOptions.
 #define QUERY_OPTS_TABLE\
   DCHECK_EQ(_TImpalaQueryOptions_VALUES_TO_NAMES.size(),\
-      TImpalaQueryOptions::ENABLE_OUTER_JOIN_TO_INNER_TRANSFORMATION + 1);\
+      TImpalaQueryOptions::TARGETED_KUDU_SCAN_RANGE_LENGTH + 1);\
   REMOVED_QUERY_OPT_FN(abort_on_default_limit_exceeded, ABORT_ON_DEFAULT_LIMIT_EXCEEDED)\
   QUERY_OPT_FN(abort_on_error, ABORT_ON_ERROR, TQueryOptionLevel::REGULAR)\
   REMOVED_QUERY_OPT_FN(allow_unsupported_formats, ALLOW_UNSUPPORTED_FORMATS)\
@@ -219,6 +219,8 @@ typedef std::unordered_map<string, beeswax::TQueryOptionLevel::type>
       CONVERT_LEGACY_HIVE_PARQUET_UTC_TIMESTAMPS, TQueryOptionLevel::ADVANCED)\
   QUERY_OPT_FN(enable_outer_join_to_inner_transformation,\
       ENABLE_OUTER_JOIN_TO_INNER_TRANSFORMATION, TQueryOptionLevel::ADVANCED)\
+  QUERY_OPT_FN(targeted_kudu_scan_range_length, TARGETED_KUDU_SCAN_RANGE_LENGTH,\
+      TQueryOptionLevel::ADVANCED)\
   ;
 
 /// Enforce practical limits on some query options to avoid undesired query state.
diff --git a/common/thrift/ImpalaInternalService.thrift b/common/thrift/ImpalaInternalService.thrift
index 27716ed..bb22ab3 100644
--- a/common/thrift/ImpalaInternalService.thrift
+++ b/common/thrift/ImpalaInternalService.thrift
@@ -456,6 +456,10 @@ struct TQueryOptions {
 
   // See comment in ImpalaService.thrift
   114: optional bool enable_outer_join_to_inner_transformation = false;
+
+  // Initialized with -1 to indicate it is unspecified.
+  // See comment in ImpalaService.thrift
+  115: optional i64 targeted_kudu_scan_range_length = -1;
 }
 
 // Impala currently has two types of sessions: Beeswax and HiveServer2
diff --git a/common/thrift/ImpalaService.thrift b/common/thrift/ImpalaService.thrift
index 09b74af..3e5e5af 100644
--- a/common/thrift/ImpalaService.thrift
+++ b/common/thrift/ImpalaService.thrift
@@ -579,10 +579,17 @@ enum TImpalaQueryOptions {
 
   // When true, TIMESTAMPs read from files written by Parquet-MR (used by Hive) will
   // be converted from UTC to local time. Writes are unaffected.
+
   CONVERT_LEGACY_HIVE_PARQUET_UTC_TIMESTAMPS = 112
 
   // Indicates whether the FE should attempt to transform outer joins into inner joins.
   ENABLE_OUTER_JOIN_TO_INNER_TRANSFORMATION = 113
+
+  // Set the target scan range length for scanning kudu tables (in bytes). This is
+  // used to split kudu scan tokens and is treated as a hint by kudu. Therefore,
+  // does not guarantee a limit on the size of the scan range. If unspecified or
+  // set to 0 disables this feature.
+  TARGETED_KUDU_SCAN_RANGE_LENGTH = 114
 }
 
 // The summary of a DML statement.
diff --git a/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java b/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java
index 4d54b7d..613c40b 100644
--- a/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java
@@ -214,7 +214,7 @@ public class KuduScanNode extends ScanNode {
       throws ImpalaRuntimeException {
     scanRangeSpecs_ = new TScanRangeSpec();
 
-    List<KuduScanToken> scanTokens = createScanTokens(client, rpcTable);
+    List<KuduScanToken> scanTokens = createScanTokens(analyzer, client, rpcTable);
     for (KuduScanToken token: scanTokens) {
       LocatedTablet tablet = token.getTablet();
       List<TScanRangeLocation> locations = new ArrayList<>();
@@ -253,7 +253,7 @@ public class KuduScanNode extends ScanNode {
    * will be pushed to Kudu. The projected Kudu columns are ordered by offset in an
    * Impala tuple to make the Impala and Kudu tuple layouts identical.
    */
-  private List<KuduScanToken> createScanTokens(KuduClient client,
+  private List<KuduScanToken> createScanTokens(Analyzer analyzer, KuduClient client,
       org.apache.kudu.client.KuduTable rpcTable) {
     List<String> projectedCols = new ArrayList<>();
     for (SlotDescriptor desc: getTupleDesc().getSlotsOrderedByOffset()) {
@@ -263,6 +263,9 @@ public class KuduScanNode extends ScanNode {
     }
     KuduScanTokenBuilder tokenBuilder = client.newScanTokenBuilder(rpcTable);
     tokenBuilder.setProjectedColumnNames(projectedCols);
+    long split_size_hint = analyzer.getQueryOptions()
+        .getTargeted_kudu_scan_range_length();
+    if (split_size_hint > 0) tokenBuilder.setSplitSizeBytes(split_size_hint);
     for (KuduPredicate predicate: kuduPredicates_) tokenBuilder.addPredicate(predicate);
     return tokenBuilder.build();
   }
diff --git a/tests/query_test/test_kudu.py b/tests/query_test/test_kudu.py
index 9d62712..cfdecbe 100644
--- a/tests/query_test/test_kudu.py
+++ b/tests/query_test/test_kudu.py
@@ -1483,3 +1483,62 @@ class TestCreateSynchronizedTable(KuduTestSuite):
       except Exception as e:
         assert "Not allowed to set 'kudu.table_name' manually for" \
                " synchronized Kudu tables" in str(e)
+
+
+class TestKuduReadTokenSplit(KuduTestSuite):
+  """
+  This suite verifies impala's integration of Kudu's split token API.
+  """
+  @classmethod
+  def add_test_dimensions(cls):
+    super(TestKuduReadTokenSplit, cls).add_test_dimensions()
+    # The default read mode of READ_LATEST does not provide high enough consistency for
+    # these tests.
+    add_exec_option_dimension(cls, "kudu_read_mode", "READ_AT_SNAPSHOT")
+
+  @SkipIfKudu.no_hybrid_clock
+  def test_kudu_scanner(self, vector, unique_database):
+    """This runs explain query with variations of mt_dop and
+    targeted_kudu_scan_range_length to verify targeted_kudu_scan_range_length's
+    functionality."""
+    explain_query = "explain select * from tpch_kudu.lineitem "
+    plans = []
+
+    regular_num_inst = self.__get_num_scanner_instances(explain_query, mt_dop=None,
+      targeted_kudu_scan_range_length=None, plans=plans)
+
+    mt_dop_1_num_inst = self.__get_num_scanner_instances(explain_query, mt_dop=1,
+      targeted_kudu_scan_range_length=None, plans=plans)
+
+    # targeted_kudu_scan_range_length should be disabled by default and num instances
+    # will be equal to the number of partitions
+    with_mt_dop_num_inst = self.__get_num_scanner_instances(explain_query, mt_dop=10,
+      targeted_kudu_scan_range_length=None, plans=plans)
+
+    # This will result is more splits
+    with_mt_dop_and_low_range_len_num_inst = self.__get_num_scanner_instances(
+      explain_query, mt_dop=10, targeted_kudu_scan_range_length="8mb", plans=plans)
+
+    assert mt_dop_1_num_inst == regular_num_inst, str(plans)
+    assert regular_num_inst < with_mt_dop_num_inst, str(plans)
+    assert with_mt_dop_num_inst < with_mt_dop_and_low_range_len_num_inst, str(plans)
+
+  def __get_num_scanner_instances(self, explain_query, mt_dop,
+                                  targeted_kudu_scan_range_length, plans):
+    """This is a helper method that runs the explain query with the provided query
+    options (mt_dop and targeted_kudu_scan_range_length). Appends the generated plan to
+    'plans' and returns the num of kudu scanner instances """
+    regex = r'F00:PLAN FRAGMENT \[RANDOM\] hosts=3 instances=([0-9]+)'
+    self.client.set_configuration_option("explain_level", 3)
+    if targeted_kudu_scan_range_length:
+      self.client.set_configuration_option("targeted_kudu_scan_range_length",
+                                           targeted_kudu_scan_range_length)
+    if mt_dop:
+      self.client.set_configuration_option("mt_dop", mt_dop)
+    result = self.client.execute(explain_query)
+    plan = "\n".join(result.data)
+    plans.append(plan)
+    matches = re.search(regex, plan)
+    assert len(matches.groups()) == 1
+    self.client.clear_configuration()
+    return int(matches.group(1))