You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by st...@apache.org on 2021/05/24 12:25:51 UTC
[impala] branch master updated: IMPALA-10197: Add
KUDU_REPLICA_SELECTION query option
This is an automated email from the ASF dual-hosted git repository.
stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
The following commit(s) were added to refs/heads/master by this push:
new e11237e IMPALA-10197: Add KUDU_REPLICA_SELECTION query option
e11237e is described below
commit e11237e29ed3a41dd361dd7a541f9702d0d0b16b
Author: wzhou-code <wz...@cloudera.com>
AuthorDate: Fri Apr 30 18:25:42 2021 -0700
IMPALA-10197: Add KUDU_REPLICA_SELECTION query option
Sometimes it is useful to target queries at the leader only replica
instead of the default closest replica.
This patch added new query option KUDU_REPLICA_SELECTION with which to
choose replicas for Kudu amongst multiple Kudu replicas.
Removed variable FLAGS_pick_only_leaders_for_tests since its usage can
be replaced by the new query option.
Added new planner and end-to-end tests for the new query option.
Testings:
- Passed exhaustive tests.
Change-Id: I613e6d9be8680c05880f7cf962a31aa38931f3d9
Reviewed-on: http://gerrit.cloudera.org:8080/17396
Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
be/src/exec/kudu-scanner.cc | 5 +-
be/src/service/query-options-test.cc | 3 +
be/src/service/query-options.cc | 8 ++
be/src/service/query-options.h | 8 +-
be/src/util/debug-util.cc | 1 +
be/src/util/debug-util.h | 1 +
common/thrift/ImpalaService.thrift | 5 +
common/thrift/Query.thrift | 9 ++
.../org/apache/impala/planner/KuduScanNode.java | 20 ++-
.../org/apache/impala/planner/PlannerTest.java | 12 ++
.../kudu-replica-selection-closest-replica.test | 144 +++++++++++++++++++++
.../kudu-replica-selection-leader-only.test | 143 ++++++++++++++++++++
tests/query_test/test_kudu.py | 17 +++
13 files changed, 368 insertions(+), 8 deletions(-)
diff --git a/be/src/exec/kudu-scanner.cc b/be/src/exec/kudu-scanner.cc
index 348ecd6..ff6b94e 100644
--- a/be/src/exec/kudu-scanner.cc
+++ b/be/src/exec/kudu-scanner.cc
@@ -60,8 +60,6 @@ using kudu::client::KuduValue;
DEFINE_string(kudu_read_mode, "READ_LATEST", "(Advanced) Sets the Kudu scan ReadMode. "
"Supported Kudu read modes are READ_LATEST and READ_AT_SNAPSHOT. Can be overridden "
"with the query option of the same name.");
-DEFINE_bool(pick_only_leaders_for_tests, false,
- "Whether to pick only leader replicas, for tests purposes only.");
DEFINE_int32(kudu_scanner_keep_alive_period_sec, 15,
"The period at which Kudu Scanners should send keep-alive requests to the tablet "
"server to ensure that scanners do not time out.");
@@ -192,7 +190,8 @@ Status KuduScanner::OpenNextScanToken(const string& scan_token, bool* eos) {
BuildErrorString("Unable to deserialize scan token"));
scanner_.reset(scanner);
- if (UNLIKELY(FLAGS_pick_only_leaders_for_tests)) {
+ if (state_->query_options().kudu_replica_selection
+ == TKuduReplicaSelection::LEADER_ONLY) {
KUDU_RETURN_IF_ERROR(scanner_->SetSelection(kudu::client::KuduClient::LEADER_ONLY),
BuildErrorString("Could not set replica selection"));
}
diff --git a/be/src/service/query-options-test.cc b/be/src/service/query-options-test.cc
index 4212b58..f7eab4b 100644
--- a/be/src/service/query-options-test.cc
+++ b/be/src/service/query-options-test.cc
@@ -236,6 +236,9 @@ TEST(QueryOptions, SetEnumOptions) {
CASE(enabled_runtime_filter_types, TEnabledRuntimeFilterTypes,
(BLOOM, MIN_MAX, ALL)),
true);
+ TestEnumCase(options,
+ CASE(kudu_replica_selection, TKuduReplicaSelection, (LEADER_ONLY, CLOSEST_REPLICA)),
+ true);
#undef CASE
#undef ENTRIES
#undef ENTRY
diff --git a/be/src/service/query-options.cc b/be/src/service/query-options.cc
index 2862aee..a7a88d8 100644
--- a/be/src/service/query-options.cc
+++ b/be/src/service/query-options.cc
@@ -1076,6 +1076,14 @@ Status impala::SetQueryOption(const string& key, const string& value,
query_options->__set_default_ndv_scale(scale);
break;
}
+ case TImpalaQueryOptions::KUDU_REPLICA_SELECTION: {
+ // Parse the kudu replica selection and validate it.
+ TKuduReplicaSelection::type enum_type;
+ RETURN_IF_ERROR(GetThriftEnum(value, "kudu replica selection",
+ _TKuduReplicaSelection_VALUES_TO_NAMES, &enum_type));
+ query_options->__set_kudu_replica_selection(enum_type);
+ break;
+ }
default:
if (IsRemovedQueryOption(key)) {
LOG(WARNING) << "Ignoring attempt to set removed query option '" << key << "'";
diff --git a/be/src/service/query-options.h b/be/src/service/query-options.h
index f2da309..663c9e4 100644
--- a/be/src/service/query-options.h
+++ b/be/src/service/query-options.h
@@ -47,7 +47,7 @@ typedef std::unordered_map<string, beeswax::TQueryOptionLevel::type>
// time we add or remove a query option to/from the enum TImpalaQueryOptions.
#define QUERY_OPTS_TABLE\
DCHECK_EQ(_TImpalaQueryOptions_VALUES_TO_NAMES.size(),\
- TImpalaQueryOptions::DEFAULT_NDV_SCALE + 1);\
+ TImpalaQueryOptions::KUDU_REPLICA_SELECTION + 1);\
REMOVED_QUERY_OPT_FN(abort_on_default_limit_exceeded, ABORT_ON_DEFAULT_LIMIT_EXCEEDED)\
QUERY_OPT_FN(abort_on_error, ABORT_ON_ERROR, TQueryOptionLevel::REGULAR)\
REMOVED_QUERY_OPT_FN(allow_unsupported_formats, ALLOW_UNSUPPORTED_FORMATS)\
@@ -242,7 +242,9 @@ typedef std::unordered_map<string, beeswax::TQueryOptionLevel::type>
TQueryOptionLevel::ADVANCED)\
QUERY_OPT_FN(show_column_minmax_stats, SHOW_COLUMN_MINMAX_STATS,\
TQueryOptionLevel::ADVANCED)\
- QUERY_OPT_FN(default_ndv_scale, DEFAULT_NDV_SCALE, TQueryOptionLevel::ADVANCED)
+ QUERY_OPT_FN(default_ndv_scale, DEFAULT_NDV_SCALE, TQueryOptionLevel::ADVANCED)\
+ QUERY_OPT_FN(kudu_replica_selection, KUDU_REPLICA_SELECTION,\
+ TQueryOptionLevel::ADVANCED)
;
/// Enforce practical limits on some query options to avoid undesired query state.
@@ -268,7 +270,7 @@ std::string DebugQueryOptions(const TQueryOptions& query_options);
/// Bitmask for the values of TQueryOptions.
/// TODO: Find a way to set the size based on the number of fields.
-typedef std::bitset<128> QueryOptionsMask;
+typedef std::bitset<192> QueryOptionsMask;
/// Updates the query options in dst from those in src where the query option is set
/// (i.e. src->__isset.PROPERTY is true) and the corresponding bit in mask is set. If
diff --git a/be/src/util/debug-util.cc b/be/src/util/debug-util.cc
index 711ab83..25e2697 100644
--- a/be/src/util/debug-util.cc
+++ b/be/src/util/debug-util.cc
@@ -103,6 +103,7 @@ PRINT_THRIFT_ENUM_IMPL(TParquetTimestampType)
PRINT_THRIFT_ENUM_IMPL(TTransactionalType)
PRINT_THRIFT_ENUM_IMPL(TEnabledRuntimeFilterTypes)
PRINT_THRIFT_ENUM_IMPL(TMinmaxFilteringLevel)
+PRINT_THRIFT_ENUM_IMPL(TKuduReplicaSelection)
string PrintId(const TUniqueId& id, const string& separator) {
stringstream out;
diff --git a/be/src/util/debug-util.h b/be/src/util/debug-util.h
index 7f8d962..7cb8388 100644
--- a/be/src/util/debug-util.h
+++ b/be/src/util/debug-util.h
@@ -81,6 +81,7 @@ std::string PrintThriftEnum(const TParquetTimestampType::type& value);
std::string PrintThriftEnum(const TTransactionalType::type& value);
std::string PrintThriftEnum(const TEnabledRuntimeFilterTypes::type& value);
std::string PrintThriftEnum(const TMinmaxFilteringLevel::type& value);
+std::string PrintThriftEnum(const TKuduReplicaSelection::type& value);
std::string PrintTuple(const Tuple* t, const TupleDescriptor& d);
std::string PrintRow(TupleRow* row, const RowDescriptor& d);
diff --git a/common/thrift/ImpalaService.thrift b/common/thrift/ImpalaService.thrift
index da079c5..774111e 100644
--- a/common/thrift/ImpalaService.thrift
+++ b/common/thrift/ImpalaService.thrift
@@ -656,6 +656,11 @@ enum TImpalaQueryOptions {
// Default NDV scale settings, make it easier to change scale in SQL function
// NDV(<expr>).
DEFAULT_NDV_SCALE = 126
+
+ // Policy with which to choose amongst multiple Kudu replicas.
+ // LEADER_ONLY - Select the LEADER replica.
+ // CLOSEST_REPLICA - Select the closest replica to the client (default).
+ KUDU_REPLICA_SELECTION = 127
}
// The summary of a DML statement.
diff --git a/common/thrift/Query.thrift b/common/thrift/Query.thrift
index a03f84b..3ee83a9 100644
--- a/common/thrift/Query.thrift
+++ b/common/thrift/Query.thrift
@@ -52,6 +52,11 @@ enum TKuduReadMode {
READ_AT_SNAPSHOT = 2
}
+enum TKuduReplicaSelection {
+ LEADER_ONLY = 0
+ CLOSEST_REPLICA = 1
+}
+
enum TJoinDistributionMode {
BROADCAST = 0
SHUFFLE = 1
@@ -493,6 +498,10 @@ struct TQueryOptions {
// Default NDV scale
127: optional i32 default_ndv_scale = 2;
+
+ // See comment in ImpalaService.thrift
+ 128: optional TKuduReplicaSelection kudu_replica_selection =
+ TKuduReplicaSelection.CLOSEST_REPLICA;
}
// Impala currently has three types of sessions: Beeswax, HiveServer2 and external
diff --git a/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java b/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java
index 67cd5e4..3ab8ded 100644
--- a/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java
@@ -46,6 +46,7 @@ import org.apache.impala.catalog.Type;
import org.apache.impala.common.ImpalaRuntimeException;
import org.apache.impala.service.BackendConfig;
import org.apache.impala.thrift.TExplainLevel;
+import org.apache.impala.thrift.TKuduReplicaSelection;
import org.apache.impala.thrift.TKuduScanNode;
import org.apache.impala.thrift.TNetworkAddress;
import org.apache.impala.thrift.TPlanNode;
@@ -65,6 +66,7 @@ import org.apache.kudu.client.KuduPredicate.ComparisonOp;
import org.apache.kudu.client.KuduScanToken;
import org.apache.kudu.client.KuduScanToken.KuduScanTokenBuilder;
import org.apache.kudu.client.LocatedTablet;
+import org.apache.kudu.consensus.Metadata.RaftPeerPB.Role;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -98,6 +100,9 @@ public class KuduScanNode extends ScanNode {
// Set in computeNodeResourceProfile().
private boolean useMtScanNode_;
+ // True if the query option of replica selection is set as leader-only.
+ private boolean replicaSelectionLeaderOnly_ = false;
+
// Indexes for the set of hosts that will be used for the query.
// From analyzer.getHostIndex().getIndex(address)
private final Set<Integer> hostIndexSet_ = new HashSet<>();
@@ -217,6 +222,8 @@ public class KuduScanNode extends ScanNode {
throws ImpalaRuntimeException {
scanRangeSpecs_ = new TScanRangeSpec();
+ replicaSelectionLeaderOnly_ = (analyzer.getQueryOptions().getKudu_replica_selection()
+ == TKuduReplicaSelection.LEADER_ONLY);
List<KuduScanToken> scanTokens = createScanTokens(analyzer, client, rpcTable);
for (KuduScanToken token: scanTokens) {
LocatedTablet tablet = token.getTablet();
@@ -228,6 +235,13 @@ public class KuduScanNode extends ScanNode {
}
for (LocatedTablet.Replica replica: tablet.getReplicas()) {
+ // Skip non-leader replicas if query option KUDU_REPLICA_SELECTION is set as
+ // LEADER_ONLY.
+ if (replicaSelectionLeaderOnly_
+ && !replica.getRole().equals(Role.LEADER.toString())) {
+ continue;
+ }
+
TNetworkAddress address =
new TNetworkAddress(replica.getRpcHost(), replica.getRpcPort());
// Use the network address to look up the host in the global list
@@ -406,8 +420,10 @@ public class KuduScanNode extends ScanNode {
StringBuilder result = new StringBuilder();
String aliasStr = desc_.hasExplicitAlias() ? " " + desc_.getAlias() : "";
- result.append(String.format("%s%s:%s [%s%s]\n", prefix, id_.toString(), displayName_,
- kuduTable_.getFullName(), aliasStr));
+ result.append(
+ String.format(replicaSelectionLeaderOnly_ ? "%s%s:%s [%s%s, LEADER-only]\n" :
+ "%s%s:%s [%s%s]\n",
+ prefix, id_.toString(), displayName_, kuduTable_.getFullName(), aliasStr));
switch (detailLevel) {
case MINIMAL: break;
diff --git a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
index b5ae17a..4514add 100644
--- a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
+++ b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
@@ -36,6 +36,7 @@ import org.apache.impala.thrift.TEnabledRuntimeFilterTypes;
import org.apache.impala.thrift.TExecRequest;
import org.apache.impala.thrift.TExplainLevel;
import org.apache.impala.thrift.TJoinDistributionMode;
+import org.apache.impala.thrift.TKuduReplicaSelection;
import org.apache.impala.thrift.TQueryCtx;
import org.apache.impala.thrift.TQueryOptions;
import org.apache.impala.thrift.TRuntimeFilterMode;
@@ -686,6 +687,17 @@ public class PlannerTest extends PlannerTestBase {
}
@Test
+ public void testKuduReplicaSelection() {
+ TQueryOptions options = defaultQueryOptions();
+ options.setExplain_level(TExplainLevel.VERBOSE);
+ options.setKudu_replica_selection(TKuduReplicaSelection.LEADER_ONLY);
+ runPlannerTestFile("kudu-replica-selection-leader-only", options);
+
+ options.setKudu_replica_selection(TKuduReplicaSelection.CLOSEST_REPLICA);
+ runPlannerTestFile("kudu-replica-selection-closest-replica", options);
+ }
+
+ @Test
public void testKuduTpch() {
TQueryOptions options = defaultQueryOptions();
options.setEnabled_runtime_filter_types(TEnabledRuntimeFilterTypes.ALL);
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/kudu-replica-selection-closest-replica.test b/testdata/workloads/functional-planner/queries/PlannerTest/kudu-replica-selection-closest-replica.test
new file mode 100644
index 0000000..c15a708
--- /dev/null
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/kudu-replica-selection-closest-replica.test
@@ -0,0 +1,144 @@
+select * from functional_kudu.zipcode_incomes where id = '8600000US00601'
+---- PLAN
+F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
+Per-Host Resources: mem-estimate=1.88MB mem-reservation=0B thread-reservation=2
+ PLAN-ROOT SINK
+ | output exprs: functional_kudu.zipcode_incomes.id, functional_kudu.zipcode_incomes.zip, functional_kudu.zipcode_incomes.description1, functional_kudu.zipcode_incomes.description2, functional_kudu.zipcode_incomes.income
+ | mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB thread-reservation=0
+ |
+ 00:SCAN KUDU [functional_kudu.zipcode_incomes]
+ kudu predicates: id = '8600000US00601'
+ mem-estimate=1.88MB mem-reservation=0B thread-reservation=1
+ tuple-ids=0 row-size=124B cardinality=1
+ in pipelines: 00(GETNEXT)
+---- DISTRIBUTEDPLAN
+F01:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
+Per-Host Resources: mem-estimate=16.00KB mem-reservation=0B thread-reservation=1
+ PLAN-ROOT SINK
+ | output exprs: functional_kudu.zipcode_incomes.id, functional_kudu.zipcode_incomes.zip, functional_kudu.zipcode_incomes.description1, functional_kudu.zipcode_incomes.description2, functional_kudu.zipcode_incomes.income
+ | mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB thread-reservation=0
+ |
+ 01:EXCHANGE [UNPARTITIONED]
+ mem-estimate=16.00KB mem-reservation=0B thread-reservation=0
+ tuple-ids=0 row-size=124B cardinality=1
+ in pipelines: 00(GETNEXT)
+
+F00:PLAN FRAGMENT [RANDOM] hosts=1 instances=1
+Per-Host Resources: mem-estimate=1.88MB mem-reservation=0B thread-reservation=2
+ DATASTREAM SINK [FRAGMENT=F01, EXCHANGE=01, UNPARTITIONED]
+ | mem-estimate=0B mem-reservation=0B thread-reservation=0
+ 00:SCAN KUDU [functional_kudu.zipcode_incomes]
+ kudu predicates: id = '8600000US00601'
+ mem-estimate=1.88MB mem-reservation=0B thread-reservation=1
+ tuple-ids=0 row-size=124B cardinality=1
+ in pipelines: 00(GETNEXT)
+====
+# The cardinality from "zip = '2'" should dominate.
+select * from functional_kudu.zipcode_incomes where id != '1' and zip = '2'
+---- PLAN
+F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
+Per-Host Resources: mem-estimate=3.75MB mem-reservation=0B thread-reservation=2
+ PLAN-ROOT SINK
+ | output exprs: functional_kudu.zipcode_incomes.id, functional_kudu.zipcode_incomes.zip, functional_kudu.zipcode_incomes.description1, functional_kudu.zipcode_incomes.description2, functional_kudu.zipcode_incomes.income
+ | mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB thread-reservation=0
+ |
+ 00:SCAN KUDU [functional_kudu.zipcode_incomes]
+ predicates: id != '1'
+ kudu predicates: zip = '2'
+ mem-estimate=3.75MB mem-reservation=0B thread-reservation=1
+ tuple-ids=0 row-size=124B cardinality=1
+ in pipelines: 00(GETNEXT)
+---- DISTRIBUTEDPLAN
+F01:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
+Per-Host Resources: mem-estimate=16.00KB mem-reservation=0B thread-reservation=1
+ PLAN-ROOT SINK
+ | output exprs: functional_kudu.zipcode_incomes.id, functional_kudu.zipcode_incomes.zip, functional_kudu.zipcode_incomes.description1, functional_kudu.zipcode_incomes.description2, functional_kudu.zipcode_incomes.income
+ | mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB thread-reservation=0
+ |
+ 01:EXCHANGE [UNPARTITIONED]
+ mem-estimate=16.00KB mem-reservation=0B thread-reservation=0
+ tuple-ids=0 row-size=124B cardinality=1
+ in pipelines: 00(GETNEXT)
+
+F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=3
+Per-Host Resources: mem-estimate=3.75MB mem-reservation=0B thread-reservation=2
+ DATASTREAM SINK [FRAGMENT=F01, EXCHANGE=01, UNPARTITIONED]
+ | mem-estimate=0B mem-reservation=0B thread-reservation=0
+ 00:SCAN KUDU [functional_kudu.zipcode_incomes]
+ predicates: id != '1'
+ kudu predicates: zip = '2'
+ mem-estimate=3.75MB mem-reservation=0B thread-reservation=1
+ tuple-ids=0 row-size=124B cardinality=1
+ in pipelines: 00(GETNEXT)
+====
+select * from functional_kudu.zipcode_incomes where id > '1' and zip > '2'
+---- PLAN
+F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
+Per-Host Resources: mem-estimate=3.75MB mem-reservation=0B thread-reservation=2
+ PLAN-ROOT SINK
+ | output exprs: functional_kudu.zipcode_incomes.id, functional_kudu.zipcode_incomes.zip, functional_kudu.zipcode_incomes.description1, functional_kudu.zipcode_incomes.description2, functional_kudu.zipcode_incomes.income
+ | mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB thread-reservation=0
+ |
+ 00:SCAN KUDU [functional_kudu.zipcode_incomes]
+ kudu predicates: zip > '2', id > '1'
+ mem-estimate=3.75MB mem-reservation=0B thread-reservation=1
+ tuple-ids=0 row-size=124B cardinality=3.32K
+ in pipelines: 00(GETNEXT)
+---- DISTRIBUTEDPLAN
+F01:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
+Per-Host Resources: mem-estimate=517.93KB mem-reservation=0B thread-reservation=1
+ PLAN-ROOT SINK
+ | output exprs: functional_kudu.zipcode_incomes.id, functional_kudu.zipcode_incomes.zip, functional_kudu.zipcode_incomes.description1, functional_kudu.zipcode_incomes.description2, functional_kudu.zipcode_incomes.income
+ | mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB thread-reservation=0
+ |
+ 01:EXCHANGE [UNPARTITIONED]
+ mem-estimate=517.93KB mem-reservation=0B thread-reservation=0
+ tuple-ids=0 row-size=124B cardinality=3.32K
+ in pipelines: 00(GETNEXT)
+
+F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=3
+Per-Host Resources: mem-estimate=3.75MB mem-reservation=0B thread-reservation=2
+ DATASTREAM SINK [FRAGMENT=F01, EXCHANGE=01, UNPARTITIONED]
+ | mem-estimate=0B mem-reservation=0B thread-reservation=0
+ 00:SCAN KUDU [functional_kudu.zipcode_incomes]
+ kudu predicates: zip > '2', id > '1'
+ mem-estimate=3.75MB mem-reservation=0B thread-reservation=1
+ tuple-ids=0 row-size=124B cardinality=3.32K
+ in pipelines: 00(GETNEXT)
+====
+select * from functional_kudu.zipcode_incomes where id = '1' or id = '2' or zip = '3'
+---- PLAN
+F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
+Per-Host Resources: mem-estimate=3.75MB mem-reservation=0B thread-reservation=2
+ PLAN-ROOT SINK
+ | output exprs: functional_kudu.zipcode_incomes.id, functional_kudu.zipcode_incomes.zip, functional_kudu.zipcode_incomes.description1, functional_kudu.zipcode_incomes.description2, functional_kudu.zipcode_incomes.income
+ | mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB thread-reservation=0
+ |
+ 00:SCAN KUDU [functional_kudu.zipcode_incomes]
+ predicates: id IN ('1', '2') OR zip = '3'
+ mem-estimate=3.75MB mem-reservation=0B thread-reservation=1
+ tuple-ids=0 row-size=124B cardinality=3
+ in pipelines: 00(GETNEXT)
+---- DISTRIBUTEDPLAN
+F01:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
+Per-Host Resources: mem-estimate=16.00KB mem-reservation=0B thread-reservation=1
+ PLAN-ROOT SINK
+ | output exprs: functional_kudu.zipcode_incomes.id, functional_kudu.zipcode_incomes.zip, functional_kudu.zipcode_incomes.description1, functional_kudu.zipcode_incomes.description2, functional_kudu.zipcode_incomes.income
+ | mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB thread-reservation=0
+ |
+ 01:EXCHANGE [UNPARTITIONED]
+ mem-estimate=16.00KB mem-reservation=0B thread-reservation=0
+ tuple-ids=0 row-size=124B cardinality=3
+ in pipelines: 00(GETNEXT)
+
+F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=3
+Per-Host Resources: mem-estimate=3.75MB mem-reservation=0B thread-reservation=2
+ DATASTREAM SINK [FRAGMENT=F01, EXCHANGE=01, UNPARTITIONED]
+ | mem-estimate=0B mem-reservation=0B thread-reservation=0
+ 00:SCAN KUDU [functional_kudu.zipcode_incomes]
+ predicates: id IN ('1', '2') OR zip = '3'
+ mem-estimate=3.75MB mem-reservation=0B thread-reservation=1
+ tuple-ids=0 row-size=124B cardinality=3
+ in pipelines: 00(GETNEXT)
+====
+
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/kudu-replica-selection-leader-only.test b/testdata/workloads/functional-planner/queries/PlannerTest/kudu-replica-selection-leader-only.test
new file mode 100644
index 0000000..20d958f
--- /dev/null
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/kudu-replica-selection-leader-only.test
@@ -0,0 +1,143 @@
+select * from functional_kudu.zipcode_incomes where id = '8600000US00601'
+---- PLAN
+F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
+Per-Host Resources: mem-estimate=7.75MB mem-reservation=4.00MB thread-reservation=2
+ PLAN-ROOT SINK
+ | output exprs: functional_kudu.zipcode_incomes.id, functional_kudu.zipcode_incomes.zip, functional_kudu.zipcode_incomes.description1, functional_kudu.zipcode_incomes.description2, functional_kudu.zipcode_incomes.income
+ | mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB thread-reservation=0
+ |
+ 00:SCAN KUDU [functional_kudu.zipcode_incomes, LEADER-only]
+ kudu predicates: id = '8600000US00601'
+ mem-estimate=3.75MB mem-reservation=0B thread-reservation=1
+ tuple-ids=0 row-size=124B cardinality=1
+ in pipelines: 00(GETNEXT)
+---- DISTRIBUTEDPLAN
+F01:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
+Per-Host Resources: mem-estimate=4.02MB mem-reservation=4.00MB thread-reservation=1
+ PLAN-ROOT SINK
+ | output exprs: functional_kudu.zipcode_incomes.id, functional_kudu.zipcode_incomes.zip, functional_kudu.zipcode_incomes.description1, functional_kudu.zipcode_incomes.description2, functional_kudu.zipcode_incomes.income
+ | mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB thread-reservation=0
+ |
+ 01:EXCHANGE [UNPARTITIONED]
+ mem-estimate=16.00KB mem-reservation=0B thread-reservation=0
+ tuple-ids=0 row-size=124B cardinality=1
+ in pipelines: 00(GETNEXT)
+
+F00:PLAN FRAGMENT [RANDOM] hosts=1 instances=1
+Per-Host Resources: mem-estimate=3.75MB mem-reservation=0B thread-reservation=2
+ DATASTREAM SINK [FRAGMENT=F01, EXCHANGE=01, UNPARTITIONED]
+ | mem-estimate=0B mem-reservation=0B thread-reservation=0
+ 00:SCAN KUDU [functional_kudu.zipcode_incomes, LEADER-only]
+ kudu predicates: id = '8600000US00601'
+ mem-estimate=3.75MB mem-reservation=0B thread-reservation=1
+ tuple-ids=0 row-size=124B cardinality=1
+ in pipelines: 00(GETNEXT)
+====
+# The cardinality from "zip = '2'" should dominate.
+select * from functional_kudu.zipcode_incomes where id != '1' and zip = '2'
+---- PLAN
+F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
+Per-Host Resources: mem-estimate=9.62MB mem-reservation=4.00MB thread-reservation=2
+ PLAN-ROOT SINK
+ | output exprs: functional_kudu.zipcode_incomes.id, functional_kudu.zipcode_incomes.zip, functional_kudu.zipcode_incomes.description1, functional_kudu.zipcode_incomes.description2, functional_kudu.zipcode_incomes.income
+ | mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB thread-reservation=0
+ |
+ 00:SCAN KUDU [functional_kudu.zipcode_incomes, LEADER-only]
+ predicates: id != '1'
+ kudu predicates: zip = '2'
+ mem-estimate=5.62MB mem-reservation=0B thread-reservation=1
+ tuple-ids=0 row-size=124B cardinality=1
+ in pipelines: 00(GETNEXT)
+---- DISTRIBUTEDPLAN
+F01:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
+Per-Host Resources: mem-estimate=4.02MB mem-reservation=4.00MB thread-reservation=1
+ PLAN-ROOT SINK
+ | output exprs: functional_kudu.zipcode_incomes.id, functional_kudu.zipcode_incomes.zip, functional_kudu.zipcode_incomes.description1, functional_kudu.zipcode_incomes.description2, functional_kudu.zipcode_incomes.income
+ | mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB thread-reservation=0
+ |
+ 01:EXCHANGE [UNPARTITIONED]
+ mem-estimate=16.00KB mem-reservation=0B thread-reservation=0
+ tuple-ids=0 row-size=124B cardinality=1
+ in pipelines: 00(GETNEXT)
+
+F00:PLAN FRAGMENT [RANDOM] hosts=2 instances=2
+Per-Host Resources: mem-estimate=5.62MB mem-reservation=0B thread-reservation=2
+ DATASTREAM SINK [FRAGMENT=F01, EXCHANGE=01, UNPARTITIONED]
+ | mem-estimate=0B mem-reservation=0B thread-reservation=0
+ 00:SCAN KUDU [functional_kudu.zipcode_incomes, LEADER-only]
+ predicates: id != '1'
+ kudu predicates: zip = '2'
+ mem-estimate=5.62MB mem-reservation=0B thread-reservation=1
+ tuple-ids=0 row-size=124B cardinality=1
+ in pipelines: 00(GETNEXT)
+====
+select * from functional_kudu.zipcode_incomes where id > '1' and zip > '2'
+---- PLAN
+F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
+Per-Host Resources: mem-estimate=9.62MB mem-reservation=4.00MB thread-reservation=2
+ PLAN-ROOT SINK
+ | output exprs: functional_kudu.zipcode_incomes.id, functional_kudu.zipcode_incomes.zip, functional_kudu.zipcode_incomes.description1, functional_kudu.zipcode_incomes.description2, functional_kudu.zipcode_incomes.income
+ | mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB thread-reservation=0
+ |
+ 00:SCAN KUDU [functional_kudu.zipcode_incomes, LEADER-only]
+ kudu predicates: zip > '2', id > '1'
+ mem-estimate=5.62MB mem-reservation=0B thread-reservation=1
+ tuple-ids=0 row-size=124B cardinality=3.32K
+ in pipelines: 00(GETNEXT)
+---- DISTRIBUTEDPLAN
+F01:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
+Per-Host Resources: mem-estimate=4.45MB mem-reservation=4.00MB thread-reservation=1
+ PLAN-ROOT SINK
+ | output exprs: functional_kudu.zipcode_incomes.id, functional_kudu.zipcode_incomes.zip, functional_kudu.zipcode_incomes.description1, functional_kudu.zipcode_incomes.description2, functional_kudu.zipcode_incomes.income
+ | mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB thread-reservation=0
+ |
+ 01:EXCHANGE [UNPARTITIONED]
+ mem-estimate=456.89KB mem-reservation=0B thread-reservation=0
+ tuple-ids=0 row-size=124B cardinality=3.32K
+ in pipelines: 00(GETNEXT)
+
+F00:PLAN FRAGMENT [RANDOM] hosts=2 instances=2
+Per-Host Resources: mem-estimate=5.62MB mem-reservation=0B thread-reservation=2
+ DATASTREAM SINK [FRAGMENT=F01, EXCHANGE=01, UNPARTITIONED]
+ | mem-estimate=0B mem-reservation=0B thread-reservation=0
+ 00:SCAN KUDU [functional_kudu.zipcode_incomes, LEADER-only]
+ kudu predicates: zip > '2', id > '1'
+ mem-estimate=5.62MB mem-reservation=0B thread-reservation=1
+ tuple-ids=0 row-size=124B cardinality=3.32K
+ in pipelines: 00(GETNEXT)
+====
+select * from functional_kudu.zipcode_incomes where id = '1' or id = '2' or zip = '3'
+---- PLAN
+F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
+Per-Host Resources: mem-estimate=9.62MB mem-reservation=4.00MB thread-reservation=2
+ PLAN-ROOT SINK
+ | output exprs: functional_kudu.zipcode_incomes.id, functional_kudu.zipcode_incomes.zip, functional_kudu.zipcode_incomes.description1, functional_kudu.zipcode_incomes.description2, functional_kudu.zipcode_incomes.income
+ | mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB thread-reservation=0
+ |
+ 00:SCAN KUDU [functional_kudu.zipcode_incomes, LEADER-only]
+ predicates: id IN ('1', '2') OR zip = '3'
+ mem-estimate=5.62MB mem-reservation=0B thread-reservation=1
+ tuple-ids=0 row-size=124B cardinality=3
+ in pipelines: 00(GETNEXT)
+---- DISTRIBUTEDPLAN
+F01:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
+Per-Host Resources: mem-estimate=4.02MB mem-reservation=4.00MB thread-reservation=1
+ PLAN-ROOT SINK
+ | output exprs: functional_kudu.zipcode_incomes.id, functional_kudu.zipcode_incomes.zip, functional_kudu.zipcode_incomes.description1, functional_kudu.zipcode_incomes.description2, functional_kudu.zipcode_incomes.income
+ | mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB thread-reservation=0
+ |
+ 01:EXCHANGE [UNPARTITIONED]
+ mem-estimate=16.00KB mem-reservation=0B thread-reservation=0
+ tuple-ids=0 row-size=124B cardinality=3
+ in pipelines: 00(GETNEXT)
+
+F00:PLAN FRAGMENT [RANDOM] hosts=2 instances=2
+Per-Host Resources: mem-estimate=5.62MB mem-reservation=0B thread-reservation=2
+ DATASTREAM SINK [FRAGMENT=F01, EXCHANGE=01, UNPARTITIONED]
+ | mem-estimate=0B mem-reservation=0B thread-reservation=0
+ 00:SCAN KUDU [functional_kudu.zipcode_incomes, LEADER-only]
+ predicates: id IN ('1', '2') OR zip = '3'
+ mem-estimate=5.62MB mem-reservation=0B thread-reservation=1
+ tuple-ids=0 row-size=124B cardinality=3
+ in pipelines: 00(GETNEXT)
+====
diff --git a/tests/query_test/test_kudu.py b/tests/query_test/test_kudu.py
index dd7590c..5d77540 100644
--- a/tests/query_test/test_kudu.py
+++ b/tests/query_test/test_kudu.py
@@ -550,6 +550,23 @@ class TestKuduOperations(KuduTestSuite):
where id > 2 limit 100""" % table_name)
self._retry_query(cursor, "select count(*) from %s" % table_name, [(103,)])
+ def test_replica_selection(self, cursor, unique_database):
+ """This test verifies that scans work as expected with different replica selection.
+ """
+ table_name = "%s.replica_selection" % unique_database
+ cursor.execute("""create table %s (a int primary key, b string) partition by hash(a)
+ partitions 8 stored as kudu""" % table_name)
+ cursor.execute("""insert into %s select id, string_col from functional.alltypes
+ limit 100""" % table_name)
+
+ cursor.execute("set kudu_replica_selection=LEADER_ONLY")
+ cursor.execute("select count(*) from %s" % table_name)
+ assert cursor.fetchall() == [(100,)]
+
+ cursor.execute("set kudu_replica_selection=CLOSEST_REPLICA")
+ cursor.execute("select count(*) from %s" % table_name)
+ assert cursor.fetchall() == [(100,)]
+
class TestKuduPartitioning(KuduTestSuite):
@classmethod