You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by st...@apache.org on 2020/10/10 02:01:39 UTC
[impala] branch master updated: IMPALA-9792: Add ability to split
kudu scan ranges
This is an automated email from the ASF dual-hosted git repository.
stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
The following commit(s) were added to refs/heads/master by this push:
new 2fd6f5b IMPALA-9792: Add ability to split kudu scan ranges
2fd6f5b is described below
commit 2fd6f5bc5aa6b50e36547e52657c1117637384b6
Author: Bikramjeet Vig <bi...@gmail.com>
AuthorDate: Thu Aug 27 23:01:55 2020 -0700
IMPALA-9792: Add ability to split kudu scan ranges
This patch adds the ability to split kudu scan token via the provided
kudu java API. A query option "TARGETED_KUDU_SCAN_RANGE_LENGTH" has
been added to set the scan range length used in this implementation.
Potential benefit:
This helps increase parallelism during scanning which can
result in more efficient use of CPU with higher mt_dop.
Limitation:
- The scan range length sent to kudu is just a hint and does not
guarantee that the token will be split at that limit.
- Comes at an added cost of an RPC to tablet server per token in
order to split it. A slow tablet server which can already slow
down scanning during execution can now also potentially slow
down planning.
- Also adds the cost of an RPC per token to open a new scanner for
it on the kudu side. Therefore, scanning many smaller split
tokens can slow down scanning and we can also lose benefits
of scanning a single large token sequentially with a single scanner.
Testing:
- Added an e2e test
Change-Id: Ia02fd94cc1d13c61bc6cb0765dd2cbe90e9a5ce8
Reviewed-on: http://gerrit.cloudera.org:8080/16385
Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
be/src/service/query-options-test.cc | 1 +
be/src/service/query-options.cc | 7 +++
be/src/service/query-options.h | 4 +-
common/thrift/ImpalaInternalService.thrift | 4 ++
common/thrift/ImpalaService.thrift | 7 +++
.../org/apache/impala/planner/KuduScanNode.java | 7 ++-
tests/query_test/test_kudu.py | 59 ++++++++++++++++++++++
7 files changed, 86 insertions(+), 3 deletions(-)
diff --git a/be/src/service/query-options-test.cc b/be/src/service/query-options-test.cc
index 710412c..8d80447 100644
--- a/be/src/service/query-options-test.cc
+++ b/be/src/service/query-options-test.cc
@@ -154,6 +154,7 @@ TEST(QueryOptions, SetByteOptions) {
{MAKE_OPTIONDEF(broadcast_bytes_limit), {-1, I64_MAX}},
{MAKE_OPTIONDEF(preagg_bytes_limit), {-1, I64_MAX}},
{MAKE_OPTIONDEF(sort_run_bytes_limit), {-1, I64_MAX}},
+ {MAKE_OPTIONDEF(targeted_kudu_scan_range_length), {-1, I64_MAX}},
};
vector<pair<OptionDef<int32_t>, Range<int32_t>>> case_set_i32{
{MAKE_OPTIONDEF(runtime_filter_min_size),
diff --git a/be/src/service/query-options.cc b/be/src/service/query-options.cc
index dcc16c4..63e1b73 100644
--- a/be/src/service/query-options.cc
+++ b/be/src/service/query-options.cc
@@ -965,6 +965,13 @@ Status impala::SetQueryOption(const string& key, const string& value,
query_options->__set_enable_outer_join_to_inner_transformation(IsTrue(value));
break;
}
+ case TImpalaQueryOptions::TARGETED_KUDU_SCAN_RANGE_LENGTH: {
+ int64_t scan_length = 0;
+ RETURN_IF_ERROR(
+ ParseMemValue(value, "targeted kudu scan range length", &scan_length));
+ query_options->__set_targeted_kudu_scan_range_length(scan_length);
+ break;
+ }
default:
if (IsRemovedQueryOption(key)) {
LOG(WARNING) << "Ignoring attempt to set removed query option '" << key << "'";
diff --git a/be/src/service/query-options.h b/be/src/service/query-options.h
index a56fb59..3b13f3b 100644
--- a/be/src/service/query-options.h
+++ b/be/src/service/query-options.h
@@ -47,7 +47,7 @@ typedef std::unordered_map<string, beeswax::TQueryOptionLevel::type>
// time we add or remove a query option to/from the enum TImpalaQueryOptions.
#define QUERY_OPTS_TABLE\
DCHECK_EQ(_TImpalaQueryOptions_VALUES_TO_NAMES.size(),\
- TImpalaQueryOptions::ENABLE_OUTER_JOIN_TO_INNER_TRANSFORMATION + 1);\
+ TImpalaQueryOptions::TARGETED_KUDU_SCAN_RANGE_LENGTH + 1);\
REMOVED_QUERY_OPT_FN(abort_on_default_limit_exceeded, ABORT_ON_DEFAULT_LIMIT_EXCEEDED)\
QUERY_OPT_FN(abort_on_error, ABORT_ON_ERROR, TQueryOptionLevel::REGULAR)\
REMOVED_QUERY_OPT_FN(allow_unsupported_formats, ALLOW_UNSUPPORTED_FORMATS)\
@@ -219,6 +219,8 @@ typedef std::unordered_map<string, beeswax::TQueryOptionLevel::type>
CONVERT_LEGACY_HIVE_PARQUET_UTC_TIMESTAMPS, TQueryOptionLevel::ADVANCED)\
QUERY_OPT_FN(enable_outer_join_to_inner_transformation,\
ENABLE_OUTER_JOIN_TO_INNER_TRANSFORMATION, TQueryOptionLevel::ADVANCED)\
+ QUERY_OPT_FN(targeted_kudu_scan_range_length, TARGETED_KUDU_SCAN_RANGE_LENGTH,\
+ TQueryOptionLevel::ADVANCED)\
;
/// Enforce practical limits on some query options to avoid undesired query state.
diff --git a/common/thrift/ImpalaInternalService.thrift b/common/thrift/ImpalaInternalService.thrift
index 27716ed..bb22ab3 100644
--- a/common/thrift/ImpalaInternalService.thrift
+++ b/common/thrift/ImpalaInternalService.thrift
@@ -456,6 +456,10 @@ struct TQueryOptions {
// See comment in ImpalaService.thrift
114: optional bool enable_outer_join_to_inner_transformation = false;
+
+ // Initialized with -1 to indicate it is unspecified.
+ // See comment in ImpalaService.thrift
+ 115: optional i64 targeted_kudu_scan_range_length = -1;
}
// Impala currently has two types of sessions: Beeswax and HiveServer2
diff --git a/common/thrift/ImpalaService.thrift b/common/thrift/ImpalaService.thrift
index 09b74af..3e5e5af 100644
--- a/common/thrift/ImpalaService.thrift
+++ b/common/thrift/ImpalaService.thrift
@@ -579,10 +579,17 @@ enum TImpalaQueryOptions {
// When true, TIMESTAMPs read from files written by Parquet-MR (used by Hive) will
// be converted from UTC to local time. Writes are unaffected.
+
CONVERT_LEGACY_HIVE_PARQUET_UTC_TIMESTAMPS = 112
// Indicates whether the FE should attempt to transform outer joins into inner joins.
ENABLE_OUTER_JOIN_TO_INNER_TRANSFORMATION = 113
+
+ // Set the target scan range length for scanning kudu tables (in bytes). This is
+ // used to split kudu scan tokens and is treated as a hint by kudu. Therefore,
+ // does not guarantee a limit on the size of the scan range. If unspecified or
+ // set to 0 disables this feature.
+ TARGETED_KUDU_SCAN_RANGE_LENGTH = 114
}
// The summary of a DML statement.
diff --git a/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java b/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java
index 4d54b7d..613c40b 100644
--- a/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java
@@ -214,7 +214,7 @@ public class KuduScanNode extends ScanNode {
throws ImpalaRuntimeException {
scanRangeSpecs_ = new TScanRangeSpec();
- List<KuduScanToken> scanTokens = createScanTokens(client, rpcTable);
+ List<KuduScanToken> scanTokens = createScanTokens(analyzer, client, rpcTable);
for (KuduScanToken token: scanTokens) {
LocatedTablet tablet = token.getTablet();
List<TScanRangeLocation> locations = new ArrayList<>();
@@ -253,7 +253,7 @@ public class KuduScanNode extends ScanNode {
* will be pushed to Kudu. The projected Kudu columns are ordered by offset in an
* Impala tuple to make the Impala and Kudu tuple layouts identical.
*/
- private List<KuduScanToken> createScanTokens(KuduClient client,
+ private List<KuduScanToken> createScanTokens(Analyzer analyzer, KuduClient client,
org.apache.kudu.client.KuduTable rpcTable) {
List<String> projectedCols = new ArrayList<>();
for (SlotDescriptor desc: getTupleDesc().getSlotsOrderedByOffset()) {
@@ -263,6 +263,9 @@ public class KuduScanNode extends ScanNode {
}
KuduScanTokenBuilder tokenBuilder = client.newScanTokenBuilder(rpcTable);
tokenBuilder.setProjectedColumnNames(projectedCols);
+ long split_size_hint = analyzer.getQueryOptions()
+ .getTargeted_kudu_scan_range_length();
+ if (split_size_hint > 0) tokenBuilder.setSplitSizeBytes(split_size_hint);
for (KuduPredicate predicate: kuduPredicates_) tokenBuilder.addPredicate(predicate);
return tokenBuilder.build();
}
diff --git a/tests/query_test/test_kudu.py b/tests/query_test/test_kudu.py
index 9d62712..cfdecbe 100644
--- a/tests/query_test/test_kudu.py
+++ b/tests/query_test/test_kudu.py
@@ -1483,3 +1483,62 @@ class TestCreateSynchronizedTable(KuduTestSuite):
except Exception as e:
assert "Not allowed to set 'kudu.table_name' manually for" \
" synchronized Kudu tables" in str(e)
+
+
+class TestKuduReadTokenSplit(KuduTestSuite):
+ """
+ This suite verifies impala's integration of Kudu's split token API.
+ """
+ @classmethod
+ def add_test_dimensions(cls):
+ super(TestKuduReadTokenSplit, cls).add_test_dimensions()
+ # The default read mode of READ_LATEST does not provide high enough consistency for
+ # these tests.
+ add_exec_option_dimension(cls, "kudu_read_mode", "READ_AT_SNAPSHOT")
+
+ @SkipIfKudu.no_hybrid_clock
+ def test_kudu_scanner(self, vector, unique_database):
+ """This runs explain query with variations of mt_dop and
+ targeted_kudu_scan_range_length to verify targeted_kudu_scan_range_length's
+ functionality."""
+ explain_query = "explain select * from tpch_kudu.lineitem "
+ plans = []
+
+ regular_num_inst = self.__get_num_scanner_instances(explain_query, mt_dop=None,
+ targeted_kudu_scan_range_length=None, plans=plans)
+
+ mt_dop_1_num_inst = self.__get_num_scanner_instances(explain_query, mt_dop=1,
+ targeted_kudu_scan_range_length=None, plans=plans)
+
+ # targeted_kudu_scan_range_length should be disabled by default and num instances
+ # will be equal to the number of partitions
+ with_mt_dop_num_inst = self.__get_num_scanner_instances(explain_query, mt_dop=10,
+ targeted_kudu_scan_range_length=None, plans=plans)
+
+ # This will result is more splits
+ with_mt_dop_and_low_range_len_num_inst = self.__get_num_scanner_instances(
+ explain_query, mt_dop=10, targeted_kudu_scan_range_length="8mb", plans=plans)
+
+ assert mt_dop_1_num_inst == regular_num_inst, str(plans)
+ assert regular_num_inst < with_mt_dop_num_inst, str(plans)
+ assert with_mt_dop_num_inst < with_mt_dop_and_low_range_len_num_inst, str(plans)
+
+ def __get_num_scanner_instances(self, explain_query, mt_dop,
+ targeted_kudu_scan_range_length, plans):
+ """This is a helper method that runs the explain query with the provided query
+ options (mt_dop and targeted_kudu_scan_range_length). Appends the generated plan to
+ 'plans' and returns the num of kudu scanner instances """
+ regex = r'F00:PLAN FRAGMENT \[RANDOM\] hosts=3 instances=([0-9]+)'
+ self.client.set_configuration_option("explain_level", 3)
+ if targeted_kudu_scan_range_length:
+ self.client.set_configuration_option("targeted_kudu_scan_range_length",
+ targeted_kudu_scan_range_length)
+ if mt_dop:
+ self.client.set_configuration_option("mt_dop", mt_dop)
+ result = self.client.execute(explain_query)
+ plan = "\n".join(result.data)
+ plans.append(plan)
+ matches = re.search(regex, plan)
+ assert len(matches.groups()) == 1
+ self.client.clear_configuration()
+ return int(matches.group(1))