You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by st...@apache.org on 2021/05/24 12:25:51 UTC

[impala] branch master updated: IMPALA-10197: Add KUDU_REPLICA_SELECTION query option

This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new e11237e  IMPALA-10197: Add KUDU_REPLICA_SELECTION query option
e11237e is described below

commit e11237e29ed3a41dd361dd7a541f9702d0d0b16b
Author: wzhou-code <wz...@cloudera.com>
AuthorDate: Fri Apr 30 18:25:42 2021 -0700

    IMPALA-10197: Add KUDU_REPLICA_SELECTION query option
    
    Sometimes it is useful to target queries at the leader only replica
    instead of the default closest replica.
    This patch added new query option KUDU_REPLICA_SELECTION with which to
    choose replicas for Kudu amongst multiple Kudu replicas.
    Removed variable FLAGS_pick_only_leaders_for_tests since its usage can
    be replaced by the new query option.
    Added new planner and end-to-end tests for the new query option.
    
    Testings:
      - Passed exhaustive tests.
    
    Change-Id: I613e6d9be8680c05880f7cf962a31aa38931f3d9
    Reviewed-on: http://gerrit.cloudera.org:8080/17396
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/exec/kudu-scanner.cc                        |   5 +-
 be/src/service/query-options-test.cc               |   3 +
 be/src/service/query-options.cc                    |   8 ++
 be/src/service/query-options.h                     |   8 +-
 be/src/util/debug-util.cc                          |   1 +
 be/src/util/debug-util.h                           |   1 +
 common/thrift/ImpalaService.thrift                 |   5 +
 common/thrift/Query.thrift                         |   9 ++
 .../org/apache/impala/planner/KuduScanNode.java    |  20 ++-
 .../org/apache/impala/planner/PlannerTest.java     |  12 ++
 .../kudu-replica-selection-closest-replica.test    | 144 +++++++++++++++++++++
 .../kudu-replica-selection-leader-only.test        | 143 ++++++++++++++++++++
 tests/query_test/test_kudu.py                      |  17 +++
 13 files changed, 368 insertions(+), 8 deletions(-)

diff --git a/be/src/exec/kudu-scanner.cc b/be/src/exec/kudu-scanner.cc
index 348ecd6..ff6b94e 100644
--- a/be/src/exec/kudu-scanner.cc
+++ b/be/src/exec/kudu-scanner.cc
@@ -60,8 +60,6 @@ using kudu::client::KuduValue;
 DEFINE_string(kudu_read_mode, "READ_LATEST", "(Advanced) Sets the Kudu scan ReadMode. "
     "Supported Kudu read modes are READ_LATEST and READ_AT_SNAPSHOT. Can be overridden "
     "with the query option of the same name.");
-DEFINE_bool(pick_only_leaders_for_tests, false,
-            "Whether to pick only leader replicas, for tests purposes only.");
 DEFINE_int32(kudu_scanner_keep_alive_period_sec, 15,
     "The period at which Kudu Scanners should send keep-alive requests to the tablet "
     "server to ensure that scanners do not time out.");
@@ -192,7 +190,8 @@ Status KuduScanner::OpenNextScanToken(const string& scan_token, bool* eos) {
       BuildErrorString("Unable to deserialize scan token"));
   scanner_.reset(scanner);
 
-  if (UNLIKELY(FLAGS_pick_only_leaders_for_tests)) {
+  if (state_->query_options().kudu_replica_selection
+      == TKuduReplicaSelection::LEADER_ONLY) {
     KUDU_RETURN_IF_ERROR(scanner_->SetSelection(kudu::client::KuduClient::LEADER_ONLY),
         BuildErrorString("Could not set replica selection"));
   }
diff --git a/be/src/service/query-options-test.cc b/be/src/service/query-options-test.cc
index 4212b58..f7eab4b 100644
--- a/be/src/service/query-options-test.cc
+++ b/be/src/service/query-options-test.cc
@@ -236,6 +236,9 @@ TEST(QueryOptions, SetEnumOptions) {
       CASE(enabled_runtime_filter_types, TEnabledRuntimeFilterTypes,
           (BLOOM, MIN_MAX, ALL)),
       true);
+  TestEnumCase(options,
+      CASE(kudu_replica_selection, TKuduReplicaSelection, (LEADER_ONLY, CLOSEST_REPLICA)),
+      true);
 #undef CASE
 #undef ENTRIES
 #undef ENTRY
diff --git a/be/src/service/query-options.cc b/be/src/service/query-options.cc
index 2862aee..a7a88d8 100644
--- a/be/src/service/query-options.cc
+++ b/be/src/service/query-options.cc
@@ -1076,6 +1076,14 @@ Status impala::SetQueryOption(const string& key, const string& value,
         query_options->__set_default_ndv_scale(scale);
         break;
       }
+      case TImpalaQueryOptions::KUDU_REPLICA_SELECTION: {
+        // Parse the kudu replica selection and validate it.
+        TKuduReplicaSelection::type enum_type;
+        RETURN_IF_ERROR(GetThriftEnum(value, "kudu replica selection",
+            _TKuduReplicaSelection_VALUES_TO_NAMES, &enum_type));
+        query_options->__set_kudu_replica_selection(enum_type);
+        break;
+      }
       default:
         if (IsRemovedQueryOption(key)) {
           LOG(WARNING) << "Ignoring attempt to set removed query option '" << key << "'";
diff --git a/be/src/service/query-options.h b/be/src/service/query-options.h
index f2da309..663c9e4 100644
--- a/be/src/service/query-options.h
+++ b/be/src/service/query-options.h
@@ -47,7 +47,7 @@ typedef std::unordered_map<string, beeswax::TQueryOptionLevel::type>
 // time we add or remove a query option to/from the enum TImpalaQueryOptions.
 #define QUERY_OPTS_TABLE\
   DCHECK_EQ(_TImpalaQueryOptions_VALUES_TO_NAMES.size(),\
-      TImpalaQueryOptions::DEFAULT_NDV_SCALE + 1);\
+      TImpalaQueryOptions::KUDU_REPLICA_SELECTION + 1);\
   REMOVED_QUERY_OPT_FN(abort_on_default_limit_exceeded, ABORT_ON_DEFAULT_LIMIT_EXCEEDED)\
   QUERY_OPT_FN(abort_on_error, ABORT_ON_ERROR, TQueryOptionLevel::REGULAR)\
   REMOVED_QUERY_OPT_FN(allow_unsupported_formats, ALLOW_UNSUPPORTED_FORMATS)\
@@ -242,7 +242,9 @@ typedef std::unordered_map<string, beeswax::TQueryOptionLevel::type>
       TQueryOptionLevel::ADVANCED)\
   QUERY_OPT_FN(show_column_minmax_stats, SHOW_COLUMN_MINMAX_STATS,\
       TQueryOptionLevel::ADVANCED)\
-  QUERY_OPT_FN(default_ndv_scale, DEFAULT_NDV_SCALE, TQueryOptionLevel::ADVANCED)
+  QUERY_OPT_FN(default_ndv_scale, DEFAULT_NDV_SCALE, TQueryOptionLevel::ADVANCED)\
+  QUERY_OPT_FN(kudu_replica_selection, KUDU_REPLICA_SELECTION,\
+      TQueryOptionLevel::ADVANCED)
 ;
 
 /// Enforce practical limits on some query options to avoid undesired query state.
@@ -268,7 +270,7 @@ std::string DebugQueryOptions(const TQueryOptions& query_options);
 
 /// Bitmask for the values of TQueryOptions.
 /// TODO: Find a way to set the size based on the number of fields.
-typedef std::bitset<128> QueryOptionsMask;
+typedef std::bitset<192> QueryOptionsMask;
 
 /// Updates the query options in dst from those in src where the query option is set
 /// (i.e. src->__isset.PROPERTY is true) and the corresponding bit in mask is set. If
diff --git a/be/src/util/debug-util.cc b/be/src/util/debug-util.cc
index 711ab83..25e2697 100644
--- a/be/src/util/debug-util.cc
+++ b/be/src/util/debug-util.cc
@@ -103,6 +103,7 @@ PRINT_THRIFT_ENUM_IMPL(TParquetTimestampType)
 PRINT_THRIFT_ENUM_IMPL(TTransactionalType)
 PRINT_THRIFT_ENUM_IMPL(TEnabledRuntimeFilterTypes)
 PRINT_THRIFT_ENUM_IMPL(TMinmaxFilteringLevel)
+PRINT_THRIFT_ENUM_IMPL(TKuduReplicaSelection)
 
 string PrintId(const TUniqueId& id, const string& separator) {
   stringstream out;
diff --git a/be/src/util/debug-util.h b/be/src/util/debug-util.h
index 7f8d962..7cb8388 100644
--- a/be/src/util/debug-util.h
+++ b/be/src/util/debug-util.h
@@ -81,6 +81,7 @@ std::string PrintThriftEnum(const TParquetTimestampType::type& value);
 std::string PrintThriftEnum(const TTransactionalType::type& value);
 std::string PrintThriftEnum(const TEnabledRuntimeFilterTypes::type& value);
 std::string PrintThriftEnum(const TMinmaxFilteringLevel::type& value);
+std::string PrintThriftEnum(const TKuduReplicaSelection::type& value);
 
 std::string PrintTuple(const Tuple* t, const TupleDescriptor& d);
 std::string PrintRow(TupleRow* row, const RowDescriptor& d);
diff --git a/common/thrift/ImpalaService.thrift b/common/thrift/ImpalaService.thrift
index da079c5..774111e 100644
--- a/common/thrift/ImpalaService.thrift
+++ b/common/thrift/ImpalaService.thrift
@@ -656,6 +656,11 @@ enum TImpalaQueryOptions {
   // Default NDV scale settings, make it easier to change scale in SQL function
   // NDV(<expr>).
   DEFAULT_NDV_SCALE = 126
+
+  // Policy with which to choose amongst multiple Kudu replicas.
+  //     LEADER_ONLY     - Select the LEADER replica.
+  //     CLOSEST_REPLICA - Select the closest replica to the client (default).
+  KUDU_REPLICA_SELECTION = 127
 }
 
 // The summary of a DML statement.
diff --git a/common/thrift/Query.thrift b/common/thrift/Query.thrift
index a03f84b..3ee83a9 100644
--- a/common/thrift/Query.thrift
+++ b/common/thrift/Query.thrift
@@ -52,6 +52,11 @@ enum TKuduReadMode {
   READ_AT_SNAPSHOT = 2
 }
 
+enum TKuduReplicaSelection {
+  LEADER_ONLY = 0
+  CLOSEST_REPLICA = 1
+}
+
 enum TJoinDistributionMode {
   BROADCAST = 0
   SHUFFLE = 1
@@ -493,6 +498,10 @@ struct TQueryOptions {
 
   // Default NDV scale
   127: optional i32 default_ndv_scale = 2;
+
+  // See comment in ImpalaService.thrift
+  128: optional TKuduReplicaSelection kudu_replica_selection =
+      TKuduReplicaSelection.CLOSEST_REPLICA;
 }
 
 // Impala currently has three types of sessions: Beeswax, HiveServer2 and external
diff --git a/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java b/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java
index 67cd5e4..3ab8ded 100644
--- a/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java
@@ -46,6 +46,7 @@ import org.apache.impala.catalog.Type;
 import org.apache.impala.common.ImpalaRuntimeException;
 import org.apache.impala.service.BackendConfig;
 import org.apache.impala.thrift.TExplainLevel;
+import org.apache.impala.thrift.TKuduReplicaSelection;
 import org.apache.impala.thrift.TKuduScanNode;
 import org.apache.impala.thrift.TNetworkAddress;
 import org.apache.impala.thrift.TPlanNode;
@@ -65,6 +66,7 @@ import org.apache.kudu.client.KuduPredicate.ComparisonOp;
 import org.apache.kudu.client.KuduScanToken;
 import org.apache.kudu.client.KuduScanToken.KuduScanTokenBuilder;
 import org.apache.kudu.client.LocatedTablet;
+import org.apache.kudu.consensus.Metadata.RaftPeerPB.Role;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -98,6 +100,9 @@ public class KuduScanNode extends ScanNode {
   // Set in computeNodeResourceProfile().
   private boolean useMtScanNode_;
 
+  // True if the query option of replica selection is set as leader-only.
+  private boolean replicaSelectionLeaderOnly_ = false;
+
   // Indexes for the set of hosts that will be used for the query.
   // From analyzer.getHostIndex().getIndex(address)
   private final Set<Integer> hostIndexSet_ = new HashSet<>();
@@ -217,6 +222,8 @@ public class KuduScanNode extends ScanNode {
       throws ImpalaRuntimeException {
     scanRangeSpecs_ = new TScanRangeSpec();
 
+    replicaSelectionLeaderOnly_ = (analyzer.getQueryOptions().getKudu_replica_selection()
+        == TKuduReplicaSelection.LEADER_ONLY);
     List<KuduScanToken> scanTokens = createScanTokens(analyzer, client, rpcTable);
     for (KuduScanToken token: scanTokens) {
       LocatedTablet tablet = token.getTablet();
@@ -228,6 +235,13 @@ public class KuduScanNode extends ScanNode {
       }
 
       for (LocatedTablet.Replica replica: tablet.getReplicas()) {
+        // Skip non-leader replicas if query option KUDU_REPLICA_SELECTION is set as
+        // LEADER_ONLY.
+        if (replicaSelectionLeaderOnly_
+            && !replica.getRole().equals(Role.LEADER.toString())) {
+          continue;
+        }
+
         TNetworkAddress address =
             new TNetworkAddress(replica.getRpcHost(), replica.getRpcPort());
         // Use the network address to look up the host in the global list
@@ -406,8 +420,10 @@ public class KuduScanNode extends ScanNode {
     StringBuilder result = new StringBuilder();
 
     String aliasStr = desc_.hasExplicitAlias() ? " " + desc_.getAlias() : "";
-    result.append(String.format("%s%s:%s [%s%s]\n", prefix, id_.toString(), displayName_,
-        kuduTable_.getFullName(), aliasStr));
+    result.append(
+        String.format(replicaSelectionLeaderOnly_ ? "%s%s:%s [%s%s, LEADER-only]\n" :
+                                                    "%s%s:%s [%s%s]\n",
+            prefix, id_.toString(), displayName_, kuduTable_.getFullName(), aliasStr));
 
     switch (detailLevel) {
       case MINIMAL: break;
diff --git a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
index b5ae17a..4514add 100644
--- a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
+++ b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
@@ -36,6 +36,7 @@ import org.apache.impala.thrift.TEnabledRuntimeFilterTypes;
 import org.apache.impala.thrift.TExecRequest;
 import org.apache.impala.thrift.TExplainLevel;
 import org.apache.impala.thrift.TJoinDistributionMode;
+import org.apache.impala.thrift.TKuduReplicaSelection;
 import org.apache.impala.thrift.TQueryCtx;
 import org.apache.impala.thrift.TQueryOptions;
 import org.apache.impala.thrift.TRuntimeFilterMode;
@@ -686,6 +687,17 @@ public class PlannerTest extends PlannerTestBase {
   }
 
   @Test
+  public void testKuduReplicaSelection() {
+    TQueryOptions options = defaultQueryOptions();
+    options.setExplain_level(TExplainLevel.VERBOSE);
+    options.setKudu_replica_selection(TKuduReplicaSelection.LEADER_ONLY);
+    runPlannerTestFile("kudu-replica-selection-leader-only", options);
+
+    options.setKudu_replica_selection(TKuduReplicaSelection.CLOSEST_REPLICA);
+    runPlannerTestFile("kudu-replica-selection-closest-replica", options);
+  }
+
+  @Test
   public void testKuduTpch() {
     TQueryOptions options = defaultQueryOptions();
     options.setEnabled_runtime_filter_types(TEnabledRuntimeFilterTypes.ALL);
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/kudu-replica-selection-closest-replica.test b/testdata/workloads/functional-planner/queries/PlannerTest/kudu-replica-selection-closest-replica.test
new file mode 100644
index 0000000..c15a708
--- /dev/null
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/kudu-replica-selection-closest-replica.test
@@ -0,0 +1,144 @@
+select * from functional_kudu.zipcode_incomes where id = '8600000US00601'
+---- PLAN
+F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
+Per-Host Resources: mem-estimate=1.88MB mem-reservation=0B thread-reservation=2
+  PLAN-ROOT SINK
+  |  output exprs: functional_kudu.zipcode_incomes.id, functional_kudu.zipcode_incomes.zip, functional_kudu.zipcode_incomes.description1, functional_kudu.zipcode_incomes.description2, functional_kudu.zipcode_incomes.income
+  |  mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB thread-reservation=0
+  |
+  00:SCAN KUDU [functional_kudu.zipcode_incomes]
+     kudu predicates: id = '8600000US00601'
+     mem-estimate=1.88MB mem-reservation=0B thread-reservation=1
+     tuple-ids=0 row-size=124B cardinality=1
+     in pipelines: 00(GETNEXT)
+---- DISTRIBUTEDPLAN
+F01:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
+Per-Host Resources: mem-estimate=16.00KB mem-reservation=0B thread-reservation=1
+  PLAN-ROOT SINK
+  |  output exprs: functional_kudu.zipcode_incomes.id, functional_kudu.zipcode_incomes.zip, functional_kudu.zipcode_incomes.description1, functional_kudu.zipcode_incomes.description2, functional_kudu.zipcode_incomes.income
+  |  mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB thread-reservation=0
+  |
+  01:EXCHANGE [UNPARTITIONED]
+     mem-estimate=16.00KB mem-reservation=0B thread-reservation=0
+     tuple-ids=0 row-size=124B cardinality=1
+     in pipelines: 00(GETNEXT)
+
+F00:PLAN FRAGMENT [RANDOM] hosts=1 instances=1
+Per-Host Resources: mem-estimate=1.88MB mem-reservation=0B thread-reservation=2
+  DATASTREAM SINK [FRAGMENT=F01, EXCHANGE=01, UNPARTITIONED]
+  |  mem-estimate=0B mem-reservation=0B thread-reservation=0
+  00:SCAN KUDU [functional_kudu.zipcode_incomes]
+     kudu predicates: id = '8600000US00601'
+     mem-estimate=1.88MB mem-reservation=0B thread-reservation=1
+     tuple-ids=0 row-size=124B cardinality=1
+     in pipelines: 00(GETNEXT)
+====
+# The cardinality from "zip = '2'" should dominate.
+select * from functional_kudu.zipcode_incomes where id != '1' and zip = '2'
+---- PLAN
+F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
+Per-Host Resources: mem-estimate=3.75MB mem-reservation=0B thread-reservation=2
+  PLAN-ROOT SINK
+  |  output exprs: functional_kudu.zipcode_incomes.id, functional_kudu.zipcode_incomes.zip, functional_kudu.zipcode_incomes.description1, functional_kudu.zipcode_incomes.description2, functional_kudu.zipcode_incomes.income
+  |  mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB thread-reservation=0
+  |
+  00:SCAN KUDU [functional_kudu.zipcode_incomes]
+     predicates: id != '1'
+     kudu predicates: zip = '2'
+     mem-estimate=3.75MB mem-reservation=0B thread-reservation=1
+     tuple-ids=0 row-size=124B cardinality=1
+     in pipelines: 00(GETNEXT)
+---- DISTRIBUTEDPLAN
+F01:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
+Per-Host Resources: mem-estimate=16.00KB mem-reservation=0B thread-reservation=1
+  PLAN-ROOT SINK
+  |  output exprs: functional_kudu.zipcode_incomes.id, functional_kudu.zipcode_incomes.zip, functional_kudu.zipcode_incomes.description1, functional_kudu.zipcode_incomes.description2, functional_kudu.zipcode_incomes.income
+  |  mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB thread-reservation=0
+  |
+  01:EXCHANGE [UNPARTITIONED]
+     mem-estimate=16.00KB mem-reservation=0B thread-reservation=0
+     tuple-ids=0 row-size=124B cardinality=1
+     in pipelines: 00(GETNEXT)
+
+F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=3
+Per-Host Resources: mem-estimate=3.75MB mem-reservation=0B thread-reservation=2
+  DATASTREAM SINK [FRAGMENT=F01, EXCHANGE=01, UNPARTITIONED]
+  |  mem-estimate=0B mem-reservation=0B thread-reservation=0
+  00:SCAN KUDU [functional_kudu.zipcode_incomes]
+     predicates: id != '1'
+     kudu predicates: zip = '2'
+     mem-estimate=3.75MB mem-reservation=0B thread-reservation=1
+     tuple-ids=0 row-size=124B cardinality=1
+     in pipelines: 00(GETNEXT)
+====
+select * from functional_kudu.zipcode_incomes where id > '1' and zip > '2'
+---- PLAN
+F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
+Per-Host Resources: mem-estimate=3.75MB mem-reservation=0B thread-reservation=2
+  PLAN-ROOT SINK
+  |  output exprs: functional_kudu.zipcode_incomes.id, functional_kudu.zipcode_incomes.zip, functional_kudu.zipcode_incomes.description1, functional_kudu.zipcode_incomes.description2, functional_kudu.zipcode_incomes.income
+  |  mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB thread-reservation=0
+  |
+  00:SCAN KUDU [functional_kudu.zipcode_incomes]
+     kudu predicates: zip > '2', id > '1'
+     mem-estimate=3.75MB mem-reservation=0B thread-reservation=1
+     tuple-ids=0 row-size=124B cardinality=3.32K
+     in pipelines: 00(GETNEXT)
+---- DISTRIBUTEDPLAN
+F01:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
+Per-Host Resources: mem-estimate=517.93KB mem-reservation=0B thread-reservation=1
+  PLAN-ROOT SINK
+  |  output exprs: functional_kudu.zipcode_incomes.id, functional_kudu.zipcode_incomes.zip, functional_kudu.zipcode_incomes.description1, functional_kudu.zipcode_incomes.description2, functional_kudu.zipcode_incomes.income
+  |  mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB thread-reservation=0
+  |
+  01:EXCHANGE [UNPARTITIONED]
+     mem-estimate=517.93KB mem-reservation=0B thread-reservation=0
+     tuple-ids=0 row-size=124B cardinality=3.32K
+     in pipelines: 00(GETNEXT)
+
+F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=3
+Per-Host Resources: mem-estimate=3.75MB mem-reservation=0B thread-reservation=2
+  DATASTREAM SINK [FRAGMENT=F01, EXCHANGE=01, UNPARTITIONED]
+  |  mem-estimate=0B mem-reservation=0B thread-reservation=0
+  00:SCAN KUDU [functional_kudu.zipcode_incomes]
+     kudu predicates: zip > '2', id > '1'
+     mem-estimate=3.75MB mem-reservation=0B thread-reservation=1
+     tuple-ids=0 row-size=124B cardinality=3.32K
+     in pipelines: 00(GETNEXT)
+====
+select * from functional_kudu.zipcode_incomes where id = '1' or id = '2' or zip = '3'
+---- PLAN
+F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
+Per-Host Resources: mem-estimate=3.75MB mem-reservation=0B thread-reservation=2
+  PLAN-ROOT SINK
+  |  output exprs: functional_kudu.zipcode_incomes.id, functional_kudu.zipcode_incomes.zip, functional_kudu.zipcode_incomes.description1, functional_kudu.zipcode_incomes.description2, functional_kudu.zipcode_incomes.income
+  |  mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB thread-reservation=0
+  |
+  00:SCAN KUDU [functional_kudu.zipcode_incomes]
+     predicates: id IN ('1', '2') OR zip = '3'
+     mem-estimate=3.75MB mem-reservation=0B thread-reservation=1
+     tuple-ids=0 row-size=124B cardinality=3
+     in pipelines: 00(GETNEXT)
+---- DISTRIBUTEDPLAN
+F01:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
+Per-Host Resources: mem-estimate=16.00KB mem-reservation=0B thread-reservation=1
+  PLAN-ROOT SINK
+  |  output exprs: functional_kudu.zipcode_incomes.id, functional_kudu.zipcode_incomes.zip, functional_kudu.zipcode_incomes.description1, functional_kudu.zipcode_incomes.description2, functional_kudu.zipcode_incomes.income
+  |  mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB thread-reservation=0
+  |
+  01:EXCHANGE [UNPARTITIONED]
+     mem-estimate=16.00KB mem-reservation=0B thread-reservation=0
+     tuple-ids=0 row-size=124B cardinality=3
+     in pipelines: 00(GETNEXT)
+
+F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=3
+Per-Host Resources: mem-estimate=3.75MB mem-reservation=0B thread-reservation=2
+  DATASTREAM SINK [FRAGMENT=F01, EXCHANGE=01, UNPARTITIONED]
+  |  mem-estimate=0B mem-reservation=0B thread-reservation=0
+  00:SCAN KUDU [functional_kudu.zipcode_incomes]
+     predicates: id IN ('1', '2') OR zip = '3'
+     mem-estimate=3.75MB mem-reservation=0B thread-reservation=1
+     tuple-ids=0 row-size=124B cardinality=3
+     in pipelines: 00(GETNEXT)
+====
+
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/kudu-replica-selection-leader-only.test b/testdata/workloads/functional-planner/queries/PlannerTest/kudu-replica-selection-leader-only.test
new file mode 100644
index 0000000..20d958f
--- /dev/null
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/kudu-replica-selection-leader-only.test
@@ -0,0 +1,143 @@
+select * from functional_kudu.zipcode_incomes where id = '8600000US00601'
+---- PLAN
+F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
+Per-Host Resources: mem-estimate=7.75MB mem-reservation=4.00MB thread-reservation=2
+  PLAN-ROOT SINK
+  |  output exprs: functional_kudu.zipcode_incomes.id, functional_kudu.zipcode_incomes.zip, functional_kudu.zipcode_incomes.description1, functional_kudu.zipcode_incomes.description2, functional_kudu.zipcode_incomes.income
+  |  mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB thread-reservation=0
+  |
+  00:SCAN KUDU [functional_kudu.zipcode_incomes, LEADER-only]
+     kudu predicates: id = '8600000US00601'
+     mem-estimate=3.75MB mem-reservation=0B thread-reservation=1
+     tuple-ids=0 row-size=124B cardinality=1
+     in pipelines: 00(GETNEXT)
+---- DISTRIBUTEDPLAN
+F01:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
+Per-Host Resources: mem-estimate=4.02MB mem-reservation=4.00MB thread-reservation=1
+  PLAN-ROOT SINK
+  |  output exprs: functional_kudu.zipcode_incomes.id, functional_kudu.zipcode_incomes.zip, functional_kudu.zipcode_incomes.description1, functional_kudu.zipcode_incomes.description2, functional_kudu.zipcode_incomes.income
+  |  mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB thread-reservation=0
+  |
+  01:EXCHANGE [UNPARTITIONED]
+     mem-estimate=16.00KB mem-reservation=0B thread-reservation=0
+     tuple-ids=0 row-size=124B cardinality=1
+     in pipelines: 00(GETNEXT)
+
+F00:PLAN FRAGMENT [RANDOM] hosts=1 instances=1
+Per-Host Resources: mem-estimate=3.75MB mem-reservation=0B thread-reservation=2
+  DATASTREAM SINK [FRAGMENT=F01, EXCHANGE=01, UNPARTITIONED]
+  |  mem-estimate=0B mem-reservation=0B thread-reservation=0
+  00:SCAN KUDU [functional_kudu.zipcode_incomes, LEADER-only]
+     kudu predicates: id = '8600000US00601'
+     mem-estimate=3.75MB mem-reservation=0B thread-reservation=1
+     tuple-ids=0 row-size=124B cardinality=1
+     in pipelines: 00(GETNEXT)
+====
+# The cardinality from "zip = '2'" should dominate.
+select * from functional_kudu.zipcode_incomes where id != '1' and zip = '2'
+---- PLAN
+F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
+Per-Host Resources: mem-estimate=9.62MB mem-reservation=4.00MB thread-reservation=2
+  PLAN-ROOT SINK
+  |  output exprs: functional_kudu.zipcode_incomes.id, functional_kudu.zipcode_incomes.zip, functional_kudu.zipcode_incomes.description1, functional_kudu.zipcode_incomes.description2, functional_kudu.zipcode_incomes.income
+  |  mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB thread-reservation=0
+  |
+  00:SCAN KUDU [functional_kudu.zipcode_incomes, LEADER-only]
+     predicates: id != '1'
+     kudu predicates: zip = '2'
+     mem-estimate=5.62MB mem-reservation=0B thread-reservation=1
+     tuple-ids=0 row-size=124B cardinality=1
+     in pipelines: 00(GETNEXT)
+---- DISTRIBUTEDPLAN
+F01:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
+Per-Host Resources: mem-estimate=4.02MB mem-reservation=4.00MB thread-reservation=1
+  PLAN-ROOT SINK
+  |  output exprs: functional_kudu.zipcode_incomes.id, functional_kudu.zipcode_incomes.zip, functional_kudu.zipcode_incomes.description1, functional_kudu.zipcode_incomes.description2, functional_kudu.zipcode_incomes.income
+  |  mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB thread-reservation=0
+  |
+  01:EXCHANGE [UNPARTITIONED]
+     mem-estimate=16.00KB mem-reservation=0B thread-reservation=0
+     tuple-ids=0 row-size=124B cardinality=1
+     in pipelines: 00(GETNEXT)
+
+F00:PLAN FRAGMENT [RANDOM] hosts=2 instances=2
+Per-Host Resources: mem-estimate=5.62MB mem-reservation=0B thread-reservation=2
+  DATASTREAM SINK [FRAGMENT=F01, EXCHANGE=01, UNPARTITIONED]
+  |  mem-estimate=0B mem-reservation=0B thread-reservation=0
+  00:SCAN KUDU [functional_kudu.zipcode_incomes, LEADER-only]
+     predicates: id != '1'
+     kudu predicates: zip = '2'
+     mem-estimate=5.62MB mem-reservation=0B thread-reservation=1
+     tuple-ids=0 row-size=124B cardinality=1
+     in pipelines: 00(GETNEXT)
+====
+select * from functional_kudu.zipcode_incomes where id > '1' and zip > '2'
+---- PLAN
+F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
+Per-Host Resources: mem-estimate=9.62MB mem-reservation=4.00MB thread-reservation=2
+  PLAN-ROOT SINK
+  |  output exprs: functional_kudu.zipcode_incomes.id, functional_kudu.zipcode_incomes.zip, functional_kudu.zipcode_incomes.description1, functional_kudu.zipcode_incomes.description2, functional_kudu.zipcode_incomes.income
+  |  mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB thread-reservation=0
+  |
+  00:SCAN KUDU [functional_kudu.zipcode_incomes, LEADER-only]
+     kudu predicates: zip > '2', id > '1'
+     mem-estimate=5.62MB mem-reservation=0B thread-reservation=1
+     tuple-ids=0 row-size=124B cardinality=3.32K
+     in pipelines: 00(GETNEXT)
+---- DISTRIBUTEDPLAN
+F01:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
+Per-Host Resources: mem-estimate=4.45MB mem-reservation=4.00MB thread-reservation=1
+  PLAN-ROOT SINK
+  |  output exprs: functional_kudu.zipcode_incomes.id, functional_kudu.zipcode_incomes.zip, functional_kudu.zipcode_incomes.description1, functional_kudu.zipcode_incomes.description2, functional_kudu.zipcode_incomes.income
+  |  mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB thread-reservation=0
+  |
+  01:EXCHANGE [UNPARTITIONED]
+     mem-estimate=456.89KB mem-reservation=0B thread-reservation=0
+     tuple-ids=0 row-size=124B cardinality=3.32K
+     in pipelines: 00(GETNEXT)
+
+F00:PLAN FRAGMENT [RANDOM] hosts=2 instances=2
+Per-Host Resources: mem-estimate=5.62MB mem-reservation=0B thread-reservation=2
+  DATASTREAM SINK [FRAGMENT=F01, EXCHANGE=01, UNPARTITIONED]
+  |  mem-estimate=0B mem-reservation=0B thread-reservation=0
+  00:SCAN KUDU [functional_kudu.zipcode_incomes, LEADER-only]
+     kudu predicates: zip > '2', id > '1'
+     mem-estimate=5.62MB mem-reservation=0B thread-reservation=1
+     tuple-ids=0 row-size=124B cardinality=3.32K
+     in pipelines: 00(GETNEXT)
+====
+select * from functional_kudu.zipcode_incomes where id = '1' or id = '2' or zip = '3'
+---- PLAN
+F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
+Per-Host Resources: mem-estimate=9.62MB mem-reservation=4.00MB thread-reservation=2
+  PLAN-ROOT SINK
+  |  output exprs: functional_kudu.zipcode_incomes.id, functional_kudu.zipcode_incomes.zip, functional_kudu.zipcode_incomes.description1, functional_kudu.zipcode_incomes.description2, functional_kudu.zipcode_incomes.income
+  |  mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB thread-reservation=0
+  |
+  00:SCAN KUDU [functional_kudu.zipcode_incomes, LEADER-only]
+     predicates: id IN ('1', '2') OR zip = '3'
+     mem-estimate=5.62MB mem-reservation=0B thread-reservation=1
+     tuple-ids=0 row-size=124B cardinality=3
+     in pipelines: 00(GETNEXT)
+---- DISTRIBUTEDPLAN
+F01:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
+Per-Host Resources: mem-estimate=4.02MB mem-reservation=4.00MB thread-reservation=1
+  PLAN-ROOT SINK
+  |  output exprs: functional_kudu.zipcode_incomes.id, functional_kudu.zipcode_incomes.zip, functional_kudu.zipcode_incomes.description1, functional_kudu.zipcode_incomes.description2, functional_kudu.zipcode_incomes.income
+  |  mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB thread-reservation=0
+  |
+  01:EXCHANGE [UNPARTITIONED]
+     mem-estimate=16.00KB mem-reservation=0B thread-reservation=0
+     tuple-ids=0 row-size=124B cardinality=3
+     in pipelines: 00(GETNEXT)
+
+F00:PLAN FRAGMENT [RANDOM] hosts=2 instances=2
+Per-Host Resources: mem-estimate=5.62MB mem-reservation=0B thread-reservation=2
+  DATASTREAM SINK [FRAGMENT=F01, EXCHANGE=01, UNPARTITIONED]
+  |  mem-estimate=0B mem-reservation=0B thread-reservation=0
+  00:SCAN KUDU [functional_kudu.zipcode_incomes, LEADER-only]
+     predicates: id IN ('1', '2') OR zip = '3'
+     mem-estimate=5.62MB mem-reservation=0B thread-reservation=1
+     tuple-ids=0 row-size=124B cardinality=3
+     in pipelines: 00(GETNEXT)
+====
diff --git a/tests/query_test/test_kudu.py b/tests/query_test/test_kudu.py
index dd7590c..5d77540 100644
--- a/tests/query_test/test_kudu.py
+++ b/tests/query_test/test_kudu.py
@@ -550,6 +550,23 @@ class TestKuduOperations(KuduTestSuite):
     where id > 2 limit 100""" % table_name)
     self._retry_query(cursor, "select count(*) from %s" % table_name, [(103,)])
 
+  def test_replica_selection(self, cursor, unique_database):
+    """This test verifies that scans work as expected with different replica selection.
+    """
+    table_name = "%s.replica_selection" % unique_database
+    cursor.execute("""create table %s (a int primary key, b string) partition by hash(a)
+        partitions 8 stored as kudu""" % table_name)
+    cursor.execute("""insert into %s select id, string_col from functional.alltypes
+        limit 100""" % table_name)
+
+    cursor.execute("set kudu_replica_selection=LEADER_ONLY")
+    cursor.execute("select count(*) from %s" % table_name)
+    assert cursor.fetchall() == [(100,)]
+
+    cursor.execute("set kudu_replica_selection=CLOSEST_REPLICA")
+    cursor.execute("select count(*) from %s" % table_name)
+    assert cursor.fetchall() == [(100,)]
+
 
 class TestKuduPartitioning(KuduTestSuite):
   @classmethod