You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by ba...@apache.org on 2020/06/29 16:38:16 UTC

[kudu] branch master updated: [perf] KUDU-3140 Heuristics to disable predicate evaluation for Bloom filter

This is an automated email from the ASF dual-hosted git repository.

bankim pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new f1905eb  [perf] KUDU-3140 Heuristics to disable predicate evaluation for Bloom filter
f1905eb is described below

commit f1905eb2713418294daf71f67b48ea67fb09403e
Author: Bankim Bhavsar <ba...@cloudera.com>
AuthorDate: Wed Jun 3 11:37:24 2020 -0700

    [perf] KUDU-3140 Heuristics to disable predicate evaluation for Bloom filter
    
    Column predicate evaluation can be expensive and ineffective column predicates
    can waste CPU. TPCH Q9 exhibits significant regression of 50-96% on enabling
    Bloom filter predicates. See KUDU-3140 for details.
    
    Excerpt from TPCH run exhibiting regression:
    https://gist.github.com/bbhavsar/943cf8ebbab63f598353efef8f87db32
    TPCH Q9 specific info:
    https://gist.github.com/bbhavsar/811ccbe0cd144090f82bdabcd801f827
    
    This change adds simple heuristic taken from HDFS scanner in Impala
    that basically checks for every 16 blocks and if a predicate has rejected
    less than 10% of the rows scanned then disables the predicate.
    
    To match the equivalent number of rows in Kudu, the check is made
    every 128 blocks by default.
    
    The stats collection and enforcement is enabled only for disableable
    predicate types, Bloom filter for now.
    
    With Bloom filter predicate type, false positives are expected so
    client is expected to do further filtering to remove false positives.
    Kudu makes the decision to disable the predicate independently and doesn't
    inform the client in this change which is okay for Bloom filter given
    the rationale above. Client API docs have been updated accordingly.
    
    Added a tablet level metric to track disabled column predicates.
    
    Tests with PS6:
    - TPCH no longer reports regression with Q9. With multiple runs,
      the delta are +1.95%, -24.67%, +2.67%, -17.09%, -14.59% with a std dev
      of 17% - 38% to report it neither as improvement nor as regression.
      https://gist.github.com/bbhavsar/0a773359b9225f014d353759a535c5be
    - Improvements with other queries reported before this change remain intact.
    
    Change-Id: I10197800a01a1b34c7821ac879caf8d272cab8dd
    Reviewed-on: http://gerrit.cloudera.org:8080/16036
    Tested-by: Kudu Jenkins
    Reviewed-by: Andrew Wong <aw...@cloudera.com>
    Reviewed-by: Alexey Serbin <as...@cloudera.com>
---
 src/kudu/client/client-test.cc                   |  15 +-
 src/kudu/client/client.h                         |  14 ++
 src/kudu/client/predicate-test.cc                | 179 +++++++++++++++++---
 src/kudu/common/CMakeLists.txt                   |   1 +
 src/kudu/common/column_materialization_context.h |   5 +-
 src/kudu/common/generic_iterators-test.cc        | 167 ++++++++++++++++++-
 src/kudu/common/generic_iterators.cc             | 199 +++++++++++++++++++----
 src/kudu/common/generic_iterators.h              |   2 +-
 src/kudu/common/iterator_stats.cc                |  10 +-
 src/kudu/common/iterator_stats.h                 |   6 +
 src/kudu/common/predicate_effectiveness.cc       |  95 +++++++++++
 src/kudu/common/predicate_effectiveness.h        | 116 +++++++++++++
 src/kudu/mini-cluster/internal_mini_cluster.cc   |  21 +++
 src/kudu/mini-cluster/internal_mini_cluster.h    |   3 +
 src/kudu/tablet/tablet_metrics.cc                |   8 +
 src/kudu/tablet/tablet_metrics.h                 |   1 +
 src/kudu/tserver/tablet_service.cc               |   1 +
 17 files changed, 768 insertions(+), 75 deletions(-)

diff --git a/src/kudu/client/client-test.cc b/src/kudu/client/client-test.cc
index 4c57900..0637bf9 100644
--- a/src/kudu/client/client-test.cc
+++ b/src/kudu/client/client-test.cc
@@ -327,15 +327,6 @@ class ClientTest : public KuduTest {
     return resp.tablet_locations(0).tablet_id();
   }
 
-  void FlushTablet(const string& tablet_id) {
-    for (int i = 0; i < cluster_->num_tablet_servers(); i++) {
-      scoped_refptr<TabletReplica> tablet_replica;
-      ASSERT_TRUE(cluster_->mini_tablet_server(i)->server()->tablet_manager()->LookupTablet(
-          tablet_id, &tablet_replica));
-      ASSERT_OK(tablet_replica->tablet()->Flush());
-    }
-  }
-
   void CheckNoRpcOverflow() {
     for (int i = 0; i < cluster_->num_tablet_servers(); i++) {
       MiniTabletServer* server = cluster_->mini_tablet_server(i);
@@ -451,7 +442,7 @@ class ClientTest : public KuduTest {
     KuduScanner scanner(client_table_.get());
     string tablet_id = GetFirstTabletId(client_table_.get());
     // Flush to ensure we scan disk later
-    FlushTablet(tablet_id);
+    ASSERT_OK(cluster_->FlushTablet(tablet_id));
     ASSERT_OK(scanner.SetProjectedColumnNames({ "key" }));
     LOG_TIMING(INFO, "Scanning disk with no predicates") {
       ASSERT_OK(scanner.Open());
@@ -1551,7 +1542,7 @@ TEST_F(ClientTest, TestScanFaultTolerance) {
       // disk.
       if (with_flush) {
         string tablet_id = GetFirstTabletId(table.get());
-        FlushTablet(tablet_id);
+        ASSERT_OK(cluster_->FlushTablet(tablet_id));
       }
 
       // Test a few different recoverable server-side error conditions.
@@ -6619,7 +6610,7 @@ TEST_F(ClientTest, TestProjectionPredicatesFuzz) {
     // Leave one row in the tablet's MRS so that the scan includes one rowset
     // without bounds. This affects the behavior of FT scans.
     if (i < kNumRows - 1 && rng.OneIn(2)) {
-      NO_FATALS(FlushTablet(GetFirstTabletId(table.get())));
+      ASSERT_OK(cluster_->FlushTablet(GetFirstTabletId(table.get())));
     }
   }
 
diff --git a/src/kudu/client/client.h b/src/kudu/client/client.h
index 82ab546..d8cfb42 100644
--- a/src/kudu/client/client.h
+++ b/src/kudu/client/client.h
@@ -1104,6 +1104,13 @@ class KUDU_EXPORT KuduTable : public sp::enable_shared_from_this<KuduTable> {
   /// hand with IN Bloom filter predicate large number of values can be tested
   /// for membership in a space-efficient manner.
   ///
+  /// IN Bloom filter predicate may be automatically disabled if determined to
+  /// be ineffective in filtering rows during scan requests.
+  ///
+  /// Users are expected to perform further filtering to guard against false
+  /// positives and automatic disablement of an ineffective Bloom filter
+  /// predicate to get precise set membership information.
+  ///
   /// @param [in] col_name
   ///   Name of the column to which the predicate applies.
   /// @param [in] bloom_filters
@@ -1135,6 +1142,13 @@ class KUDU_EXPORT KuduTable : public sp::enable_shared_from_this<KuduTable> {
   /// hand with IN Bloom filter predicate large number of values can be tested
   /// for membership in a space-efficient manner.
   ///
+  /// IN Bloom filter predicate may be automatically disabled if determined to
+  /// be ineffective in filtering rows during scan requests.
+  ///
+  /// Users are expected to perform further filtering to guard against false
+  /// positives and automatic disablement of an ineffective Bloom filter
+  /// predicate to get precise set membership information.
+  ///
   /// @param [in] col_name
   ///   Name of the column to which the predicate applies.
   /// @param bloom_filters
diff --git a/src/kudu/client/predicate-test.cc b/src/kudu/client/predicate-test.cc
index 3d17c8a..af59474 100644
--- a/src/kudu/client/predicate-test.cc
+++ b/src/kudu/client/predicate-test.cc
@@ -19,6 +19,7 @@
 #include <cmath>
 #include <cstddef>
 #include <cstdint>
+#include <deque>
 #include <initializer_list>
 #include <iterator>
 #include <limits>
@@ -30,6 +31,7 @@
 #include <utility>
 #include <vector>
 
+#include <gflags/gflags_declare.h>
 #include <glog/logging.h>
 #include <gtest/gtest.h>
 
@@ -44,11 +46,15 @@
 #include "kudu/gutil/integral_types.h"
 #include "kudu/gutil/strings/escaping.h"
 #include "kudu/gutil/strings/substitute.h"
+#include "kudu/integration-tests/cluster_itest_util.h"
 #include "kudu/mini-cluster/internal_mini_cluster.h"
+#include "kudu/tserver/mini_tablet_server.h"
 #include "kudu/util/block_bloom_filter.h"
 #include "kudu/util/decimal_util.h"
 #include "kudu/util/hash.pb.h"
 #include "kudu/util/int128.h"
+#include "kudu/util/metrics.h"
+#include "kudu/util/net/net_util.h"
 #include "kudu/util/random.h"
 #include "kudu/util/random_util.h"
 #include "kudu/util/slice.h"
@@ -57,7 +63,12 @@
 #include "kudu/util/test_macros.h"
 #include "kudu/util/test_util.h"
 
+DECLARE_int32(predicate_effectivess_num_skip_blocks);
+METRIC_DECLARE_counter(scanner_predicates_disabled);
+METRIC_DECLARE_entity(tablet);
+
 using std::count_if;
+using std::deque;
 using std::numeric_limits;
 using std::string;
 using std::unique_ptr;
@@ -341,6 +352,27 @@ class PredicateTest : public KuduTest {
     return bf;
   }
 
+  template<typename BloomFilterType, typename Collection>
+  static void InsertValuesInBloomFilter(BloomFilterType* bloom_filter, const Collection& values) {
+    for (const auto& v : values) {
+      bloom_filter->Insert(Slice(reinterpret_cast<const uint8_t*>(&v), sizeof(v)));
+    }
+  }
+
+  template<class Collection>
+  static KuduBloomFilter* CreateBloomFilterWithValues(const Collection& values) {
+    KuduBloomFilter* bloom_filter = CreateBloomFilter(values.size());
+    InsertValuesInBloomFilter(bloom_filter, values);
+    return bloom_filter;
+  }
+
+  template<class Collection>
+  static unique_ptr<BlockBloomFilter> CreateDirectBloomFilterWithValues(const Collection& values) {
+    unique_ptr<BlockBloomFilter> bloom_filter = CreateDirectBloomFilter(values.size());
+    InsertValuesInBloomFilter(bloom_filter.get(), values);
+    return bloom_filter;
+  }
+
   void CheckInBloomFilterPredicate(const shared_ptr<KuduTable>& table,
                                    KuduBloomFilter* in_bloom_filter,
                                    int expected_count) {
@@ -584,6 +616,30 @@ class PredicateTest : public KuduTest {
     ASSERT_EQ(1, CountRows(table, { table->NewIsNullPredicate("value") }));
   }
 
+  string GetTheOnlyTabletId() {
+    // Cluster is setup with single tablet server and a single table.
+    CHECK_EQ(1, cluster_->num_tablet_servers());
+    const auto* mini_tserver = cluster_->mini_tablet_server(0);
+    const auto& tablets = mini_tserver->ListTablets();
+    CHECK_EQ(1, tablets.size());
+    return tablets[0];
+  }
+
+  // Function to verify ineffective disabled predicates on specified tablet.
+  void CheckDisabledPredicates(const string& tablet_id, int64_t expected_disabled_predicates) {
+    // Cluster is setup with single tablet server and a single table.
+    ASSERT_EQ(1, cluster_->num_tablet_servers());
+    const auto* mini_tserver = cluster_->mini_tablet_server(0);
+    int64_t predicates_disabled = 0;
+    ASSERT_OK(itest::GetInt64Metric(HostPort(mini_tserver->bound_http_addr()),
+                                    &METRIC_ENTITY_tablet,
+                                    tablet_id.c_str(),
+                                    &METRIC_scanner_predicates_disabled,
+                                    "value",
+                                    &predicates_disabled));
+    ASSERT_EQ(expected_disabled_predicates, predicates_disabled);
+  }
+
   shared_ptr<KuduClient> client_;
   unique_ptr<InternalMiniCluster> cluster_;
 };
@@ -1295,8 +1351,7 @@ class BloomFilterPredicateTest : public PredicateTest {
   // one with 'num_included_values' values from the table, and one with 'num_excluded_values'
   // values that aren't present in the table.
   void Init(int num_all_values, int num_included_values, int num_excluded_values) {
-    ASSERT_LT(num_included_values, num_all_values);
-    ASSERT_LT(num_excluded_values, num_all_values);
+    ASSERT_LE(num_included_values, num_all_values);
 
     table_ = CreateAndOpenTable(KuduColumnSchema::INT32);
     session_ = CreateSession();
@@ -1313,27 +1368,6 @@ class BloomFilterPredicateTest : public PredicateTest {
     num_false_positive_values_ = num_all_values * kBloomFilterFalsePositiveProb;
   }
 
-  template<typename BloomFilterType, typename Collection>
-  static void InsertValues(BloomFilterType* bloom_filter, const Collection& values) {
-    for (const auto& v : values) {
-      bloom_filter->Insert(Slice(reinterpret_cast<const uint8_t*>(&v), sizeof(v)));
-    }
-  }
-
-  template<class Collection>
-  static KuduBloomFilter* CreateBloomFilterWithValues(const Collection& values) {
-    KuduBloomFilter* bloom_filter = CreateBloomFilter(values.size());
-    InsertValues(bloom_filter, values);
-    return bloom_filter;
-  }
-
-  template<class Collection>
-  static unique_ptr<BlockBloomFilter> CreateDirectBloomFilterWithValues(const Collection& values) {
-    unique_ptr<BlockBloomFilter> bloom_filter = CreateDirectBloomFilter(values.size());
-    InsertValues(bloom_filter.get(), values);
-    return bloom_filter;
-  }
-
   void InsertAllValuesInTable() {
     int i = 0;
     for (auto value : all_values_) {
@@ -1442,6 +1476,28 @@ TEST_F(BloomFilterPredicateTest, TestDirectBlockBloomFilterPredicate) {
   TestWithRangePredicate(included_predicate_clone1, included_predicate_clone2);
 }
 
+// Test to verify that an ineffective Bloom filter predicate will be disabled.
+TEST_F(BloomFilterPredicateTest, TestDisabledBloomFilterPredicate) {
+  // Insert all values from the table in Bloom filter so that the predicate
+  // is determined to be ineffective.
+  Init(10000/*num_all_values*/, 10000/*num_included_values*/, 0/*num_excluded_values*/);
+  KuduBloomFilter* included_bf = CreateBloomFilterWithValues(included_values_);
+
+  InsertAllValuesInTable();
+
+  vector<KuduBloomFilter*> included_bf_vec = { included_bf };
+  auto* included_predicate =
+      table_->NewInBloomFilterPredicate("value", &included_bf_vec);
+
+  ASSERT_TRUE(included_bf_vec.empty());
+  FLAGS_predicate_effectivess_num_skip_blocks = 4;
+  int actual_count_included = CountRows(table_, { included_predicate });
+  ASSERT_EQ(included_values_.size(), actual_count_included);
+
+  // CountRows() runs 2 scans using a cloned predicate.
+  CheckDisabledPredicates(GetTheOnlyTabletId(), 2 /* expected_disabled_predicates */);
+}
+
 // Benchmark test that combines Bloom filter predicate with range predicate.
 TEST_F(BloomFilterPredicateTest, TestKuduBloomFilterPredicateBenchmark) {
   SKIP_IF_SLOW_NOT_ALLOWED();
@@ -1459,6 +1515,83 @@ TEST_F(BloomFilterPredicateTest, TestKuduBloomFilterPredicateBenchmark) {
   TestWithRangePredicate(included_predicate, included_predicate_clone);
 }
 
+class ParameterizedBloomFilterPredicateTest :
+    public PredicateTest,
+    public ::testing::WithParamInterface<std::tuple<bool, bool>> {};
+
+INSTANTIATE_TEST_CASE_P(, ParameterizedBloomFilterPredicateTest,
+                        ::testing::Combine(::testing::Bool(),
+                                           ::testing::Bool()));
+
+// Test to verify that an ineffective Bloom filter predicate will be disabled
+// using a pattern of repeated strings.
+TEST_P(ParameterizedBloomFilterPredicateTest, TestDisabledBloomFilterWithRepeatedStrings) {
+  // Combination of following 2 flags help test cases with MRSs flushed, DRS and delta files.
+  const bool flush_tablet = std::get<0>(GetParam());
+  const bool update_rows = std::get<1>(GetParam());
+
+  shared_ptr<KuduTable> table = CreateAndOpenTable(KuduColumnSchema::STRING);
+  shared_ptr<KuduSession> session = CreateSession();
+
+  // Use a set of predetermined strings and populate the table.
+  // Create a BF predicate with same set of strings.
+  deque<string> values = {"Alice", "Bob", "Charlie", "Doug", "Elizabeth", "Frank", "George",
+                          "Harry"};
+
+  // Populate table with a small set of strings that are repeated.
+  static constexpr int kNumRows = 10000;
+  auto upsert_rows = [&]() {
+    int i = 0;
+    while (i < kNumRows) {
+      for (int j = 0; j < values.size() && i < kNumRows; j++, i++) {
+        const string &value = values[j];
+        unique_ptr<KuduUpsert> upsert(table->NewUpsert());
+        ASSERT_OK(upsert->mutable_row()->SetInt64("key", i));
+        ASSERT_OK(upsert->mutable_row()->SetStringNoCopy("value", value));
+        ASSERT_OK(session->Apply(upsert.release()));
+      }
+      // TSAN builds timeout and fail on flushing with large number of rows.
+      if ((i % (kNumRows / 10))  == 0) {
+        ASSERT_OK(session->Flush());
+      }
+    }
+    ASSERT_OK(session->Flush());
+  };
+
+  upsert_rows();
+  const string tablet_id = GetTheOnlyTabletId();
+
+  if (flush_tablet) {
+    ASSERT_OK(cluster_->FlushTablet(tablet_id));
+  }
+
+  if (update_rows) {
+    string first_name = values.front();
+    values.pop_front();
+    values.push_back(first_name);
+
+    upsert_rows();
+    if (flush_tablet) {
+      ASSERT_OK(cluster_->FlushTablet(tablet_id));
+    }
+  }
+
+  // Create Bloom filter predicate with string values.
+  auto* bf = CreateBloomFilter(values.size());
+  for (const auto& value : values) {
+    bf->Insert(Slice(value));
+  }
+  vector<KuduBloomFilter*> bf_vec = { bf };
+  auto* bf_predicate = table->NewInBloomFilterPredicate("value", &bf_vec);
+  ASSERT_TRUE(bf_vec.empty());
+
+  FLAGS_predicate_effectivess_num_skip_blocks = 4;
+  int actual_count_included = DoCountRows(table, { bf_predicate });
+  ASSERT_EQ(kNumRows, actual_count_included);
+
+  CheckDisabledPredicates(tablet_id, 1 /* expected_disabled_predicates */);
+}
+
 class ParameterizedPredicateTest : public PredicateTest,
   public ::testing::WithParamInterface<KuduColumnSchema::DataType> {};
 
diff --git a/src/kudu/common/CMakeLists.txt b/src/kudu/common/CMakeLists.txt
index 4ffe9c4..b9aa87f 100644
--- a/src/kudu/common/CMakeLists.txt
+++ b/src/kudu/common/CMakeLists.txt
@@ -53,6 +53,7 @@ set(COMMON_SRCS
   partial_row.cc
   partition.cc
   partition_pruner.cc
+  predicate_effectiveness.cc
   rowblock.cc
   row_changelist.cc
   row_operations.cc
diff --git a/src/kudu/common/column_materialization_context.h b/src/kudu/common/column_materialization_context.h
index 9f98b7d..899e6bd 100644
--- a/src/kudu/common/column_materialization_context.h
+++ b/src/kudu/common/column_materialization_context.h
@@ -116,8 +116,9 @@ class ColumnMaterializationContext {
     // correct status will be set.
     kNotSet = 0,
 
-    // May be set before scanning if the decoder eval flag is set to false or
-    // if iterator has deltas associated with it.
+    // May be set before scanning if the decoder eval flag is set to false
+    // or if iterator has deltas associated with it or if the predicate is
+    // determined to be ineffective.
     // May be set by decoder during scan if decoder eval is not supported.
     // Once set, scanning will materialize the entire column into the block,
     // leaving evaluation for after the scan.
diff --git a/src/kudu/common/generic_iterators-test.cc b/src/kudu/common/generic_iterators-test.cc
index 0222f07..6713a09 100644
--- a/src/kudu/common/generic_iterators-test.cc
+++ b/src/kudu/common/generic_iterators-test.cc
@@ -43,15 +43,20 @@
 #include "kudu/common/iterator.h"
 #include "kudu/common/iterator_stats.h"
 #include "kudu/common/key_encoder.h"
+#include "kudu/common/predicate_effectiveness.h"
 #include "kudu/common/rowblock.h"
 #include "kudu/common/scan_spec.h"
 #include "kudu/common/schema.h"
 #include "kudu/common/types.h"
+#include "kudu/gutil/integral_types.h"
 #include "kudu/gutil/map-util.h"
 #include "kudu/gutil/mathlimits.h"
 #include "kudu/gutil/strings/substitute.h"
+#include "kudu/util/block_bloom_filter.h"
+#include "kudu/util/hash.pb.h"
 #include "kudu/util/memory/arena.h"
 #include "kudu/util/random.h"
+#include "kudu/util/slice.h"
 #include "kudu/util/status.h"
 #include "kudu/util/stopwatch.h"
 #include "kudu/util/test_macros.h"
@@ -61,6 +66,7 @@ DEFINE_int32(num_lists, 3, "Number of lists to merge");
 DEFINE_int32(num_rows, 1000, "Number of entries per list");
 DEFINE_int32(num_iters, 1, "Number of times to run merge");
 DECLARE_bool(materializing_iterator_do_pushdown);
+DECLARE_int32(predicate_effectivess_num_skip_blocks);
 
 using std::map;
 using std::pair;
@@ -633,6 +639,15 @@ class DummyIterator : public RowwiseIterator {
   Schema schema_;
 };
 
+// Verify the vectors of ColumnPredicate are the same.
+void CheckColumnPredicatesAreEqual(const vector<ColumnPredicate>& expected,
+                                   const vector<const ColumnPredicate*>& actual) {
+  ASSERT_EQ(expected.size(), actual.size());
+  for (int i = 0; i < expected.size(); i++) {
+    ASSERT_EQ(expected[i], *actual[i]);
+  }
+}
+
 TEST(TestPredicateEvaluatingIterator, TestPredicateEvaluationOrder) {
   Schema schema({ ColumnSchema("a_int64", INT64),
                   ColumnSchema("b_int64", INT64),
@@ -653,8 +668,9 @@ TEST(TestPredicateEvaluatingIterator, TestPredicateEvaluationOrder) {
 
     unique_ptr<RowwiseIterator> iter(new DummyIterator(schema));
     ASSERT_OK(InitAndMaybeWrap(&iter, &spec));
-    ASSERT_EQ(GetIteratorPredicatesForTests(iter),
-              vector<ColumnPredicate>({ c_equality, b_equality, a_range }));
+    NO_FATALS(CheckColumnPredicatesAreEqual(
+        vector<ColumnPredicate>({ c_equality, b_equality, a_range }),
+        GetIteratorPredicatesForTests(iter)));
   }
 
   { // Test that smaller columns come before larger ones, and ties are broken by idx.
@@ -666,9 +682,152 @@ TEST(TestPredicateEvaluatingIterator, TestPredicateEvaluationOrder) {
     unique_ptr<RowwiseIterator> iter(new DummyIterator(schema));
     ASSERT_OK(InitAndMaybeWrap(&iter, &spec));
 
-    ASSERT_EQ(GetIteratorPredicatesForTests(iter),
-              vector<ColumnPredicate>({ c_equality, a_equality, b_equality }));
+    NO_FATALS(CheckColumnPredicatesAreEqual(
+        vector<ColumnPredicate>({ c_equality, a_equality, b_equality }),
+        GetIteratorPredicatesForTests(iter)));
+  }
+}
+
+// Class to test column predicate effectiveness.
+class PredicateEffectivenessTest :
+    public KuduTest,
+    public ::testing::WithParamInterface<bool> {
+ public:
+  // For 'all_values' true case, initialize the Bloom filter 'bf' with all values that are
+  // added to the output 'ints' vector. Otherwise insert subset of the values in the Bloom filter.
+  static ColumnPredicate CreateBloomFilterPredicate(bool all_values, BlockBloomFilter* bf,
+                                                    vector<int64_t>* ints) {
+    CHECK_OK(bf->Init(BlockBloomFilter::MinLogSpace(kNumRows, 0.01), FAST_HASH, 0));
+    ints->resize(kNumRows);
+
+    for (int64_t i = 0; i < kNumRows; i++) {
+      (*ints)[i] = i;
+      if (all_values || i < kSubsetNums) {
+        bf->Insert(Slice(reinterpret_cast<uint8 *>(&i), sizeof(i)));
+      }
+    }
+
+    vector<BlockBloomFilter*> bloom_filters;
+    bloom_filters.push_back(bf);
+    auto bf_pred = ColumnPredicate::InBloomFilter(kIntSchema.column(0), bloom_filters, nullptr,
+                                                  nullptr);
+    LOG(INFO) << "Bloom filter predicate: " << bf_pred.ToString();
+    return bf_pred;
+  }
+
+  // For 'all_values' true case, verify predicate effectiveness for the iterator 'iter'
+  // when the Bloom filter predicate was initialized with all values being iterated.
+  // Otherwise verify for the case when Bloom filter was initialized with subset of values
+  // being iterated.
+  static void VerifyPredicateEffectiveness(bool all_values,
+                                           const unique_ptr<RowwiseIterator>& iter) {
+    ASSERT_EQ(1, GetIteratorPredicateEffectivenessCtxForTests(iter).num_predicate_ctxs())
+        << "Predicate effectiveness contexts must match with number of predicates";
+    ASSERT_TRUE(GetIteratorPredicateEffectivenessCtxForTests(iter)[0].enabled)
+        << "Predicate must be enabled to begin with";
+
+    Arena arena(1024);
+    FLAGS_predicate_effectivess_num_skip_blocks = 4;
+    if (all_values) {
+      for (int i = 0; i < kNumRows / kBatchSize; i++) {
+        RowBlock dst(&kIntSchema, kBatchSize, &arena);
+        ASSERT_OK(iter->NextBlock(&dst));
+        ASSERT_EQ(kBatchSize, dst.nrows());
+        ASSERT_EQ(kBatchSize, dst.selection_vector()->CountSelected());
+        // For all values case, the predicate gets disabled on first effectiveness check.
+        if (i >= FLAGS_predicate_effectivess_num_skip_blocks) {
+          ASSERT_FALSE(GetIteratorPredicateEffectivenessCtxForTests(iter)[0].enabled);
+        }
+      }
+    } else {
+      for (int i = 0; i < kNumRows / kBatchSize; i++) {
+        RowBlock dst(&kIntSchema, kBatchSize, &arena);
+        ASSERT_OK(iter->NextBlock(&dst));
+        ASSERT_EQ(kBatchSize, dst.nrows());
+        // For subset case, the predicate should never be disabled.
+        ASSERT_TRUE(GetIteratorPredicateEffectivenessCtxForTests(iter)[0].enabled);
+        auto rows_selected = dst.selection_vector()->CountSelected();
+        if (i == 0) {
+          ASSERT_EQ(kBatchSize, rows_selected);
+        } else {
+          ASSERT_EQ(0, rows_selected);
+        }
+      }
+    }
   }
+
+ private:
+  static constexpr int kNumRows = 1000;
+  static constexpr int kSubsetNums = 100;
+  static constexpr int kBatchSize = 100;
+};
+
+// Despite being a static constexpr integer, this static variable needs explicit definition
+// outside to avoid linker error because it's used with ASSERT_EQ() macro that binds 1st variable
+// to const reference.
+constexpr int PredicateEffectivenessTest::kBatchSize;
+
+// Test effectiveness of Bloom filter predicate with PredicateEvaluatingIterator.
+TEST_P(PredicateEffectivenessTest, PredicateEvaluatingIterator) {
+  // For 'all_values' true case, initialize the Bloom filter with all values that are inserted in
+  // the iterator. This helps test the case of ineffective Bloom filter predicate.
+  // For 'all_values' false case i.e. subset values case, initialize the Bloom filter with a subset
+  // of values inserted in the iterator. This helps test the case of effective Bloom filter
+  // predicate that filters out rows.
+  bool all_values = GetParam();
+  BlockBloomFilter bf(DefaultBlockBloomFilterBufferAllocator::GetSingleton());
+  vector<int64_t> ints;
+  auto bf_pred = CreateBloomFilterPredicate(all_values, &bf, &ints);
+
+  ScanSpec spec;
+  spec.AddPredicate(bf_pred);
+
+  // Set up a MaterializingIterator with pushdown disabled, so that the
+  // PredicateEvaluatingIterator will wrap it and do evaluation.
+  unique_ptr<VectorIterator> colwise(new VectorIterator(ints));
+  FLAGS_materializing_iterator_do_pushdown = false;
+  unique_ptr<RowwiseIterator> materializing(NewMaterializingIterator(std::move(colwise)));
+
+  // Wrap it in another iterator to do the evaluation
+  const RowwiseIterator* mat_iter_addr = materializing.get();
+  unique_ptr<RowwiseIterator> outer_iter(std::move(materializing));
+  ASSERT_OK(InitAndMaybeWrap(&outer_iter, &spec));
+
+  ASSERT_NE(reinterpret_cast<uintptr_t>(outer_iter.get()),
+            reinterpret_cast<uintptr_t>(mat_iter_addr))
+      << "Iterator pointer should differ after wrapping";
+  ASSERT_EQ(0, spec.predicates().size()) << "Iterator tree should have accepted predicate";
+  ASSERT_EQ(1, GetIteratorPredicatesForTests(outer_iter).size())
+      << "Predicate should be evaluated by the outer iterator";
+  VerifyPredicateEffectiveness(all_values, outer_iter);
+}
+
+// Test effectiveness of Bloom filter predicate with MaterializingIterator.
+TEST_P(PredicateEffectivenessTest, MaterializingIterator) {
+  // For 'all_values' true case, initialize the Bloom filter with all values that are inserted in
+  // the iterator. This helps test the case of ineffective Bloom filter predicate.
+  // For 'all_values' false case i.e. subset values case, initialize the Bloom filter with a subset
+  // of values inserted in the iterator. This helps test the case of effective Bloom filter
+  // predicate that filters out rows.
+  bool all_values = GetParam();
+  BlockBloomFilter bf(DefaultBlockBloomFilterBufferAllocator::GetSingleton());
+  vector<int64_t> ints;
+  auto bf_pred = CreateBloomFilterPredicate(all_values, &bf, &ints);
+
+  ScanSpec spec;
+  spec.AddPredicate(bf_pred);
+
+  // Setup a materializing iterator with pushdown enabled(default).
+  unique_ptr<VectorIterator> colwise(new VectorIterator(ints));
+  unique_ptr<RowwiseIterator> mat_iter(
+      NewMaterializingIterator(std::move(colwise)));
+
+  ASSERT_OK(mat_iter->Init(&spec));
+  ASSERT_EQ(0, spec.predicates().size()) << "Iterator tree should have accepted predicate";
+
+  VerifyPredicateEffectiveness(all_values, mat_iter);
 }
 
+INSTANTIATE_TEST_CASE_P(, PredicateEffectivenessTest, ::testing::Bool());
+
 } // namespace kudu
diff --git a/src/kudu/common/generic_iterators.cc b/src/kudu/common/generic_iterators.cc
index b865f79..d1301aa 100644
--- a/src/kudu/common/generic_iterators.cc
+++ b/src/kudu/common/generic_iterators.cc
@@ -29,6 +29,7 @@
 #include <numeric>
 #include <ostream>
 #include <string>
+#include <typeinfo>
 #include <unordered_map>
 #include <utility>
 
@@ -45,6 +46,7 @@
 #include "kudu/common/common.pb.h"
 #include "kudu/common/iterator.h"
 #include "kudu/common/iterator_stats.h"
+#include "kudu/common/predicate_effectiveness.h"
 #include "kudu/common/row.h"
 #include "kudu/common/rowblock.h"
 #include "kudu/common/scan_spec.h"
@@ -68,7 +70,6 @@ template <class T> struct compare;
 
 using std::deque;
 using std::get;
-using std::pair;
 using std::sort;
 using std::string;
 using std::unique_ptr;
@@ -96,6 +97,9 @@ void AddIterStats(const RowwiseIterator& iter,
 }
 } // anonymous namespace
 
+
+typedef std::pair<int32_t, ColumnPredicate> ColumnIdxAndPredicate;
+
 ////////////////////////////////////////////////////////////
 // MergeIterator
 ////////////////////////////////////////////////////////////
@@ -1138,10 +1142,16 @@ class MaterializingIterator : public RowwiseIterator {
 
   virtual void GetIteratorStats(std::vector<IteratorStats>* stats) const OVERRIDE {
     iter_->GetIteratorStats(stats);
+    predicates_effectiveness_ctx_.PopulateIteratorStatsWithDisabledPredicates(
+        col_idx_predicates_, stats);
   }
 
   virtual Status NextBlock(RowBlock* dst) OVERRIDE;
 
+  const IteratorPredicateEffectivenessContext& effectiveness_context() const {
+    return predicates_effectiveness_ctx_;
+  }
+
  private:
   Status MaterializeBlock(RowBlock *dst);
 
@@ -1149,11 +1159,14 @@ class MaterializingIterator : public RowwiseIterator {
 
   // List of (column index, predicate) in order of most to least selective, with
   // ties broken by the index.
-  vector<std::pair<int32_t, ColumnPredicate>> col_idx_predicates_;
+  vector<ColumnIdxAndPredicate> col_idx_predicates_;
 
   // List of column indexes without predicates to materialize.
   vector<int32_t> non_predicate_column_indexes_;
 
+  // Predicate effective contexts help disable ineffective column predicates.
+  IteratorPredicateEffectivenessContext predicates_effectiveness_ctx_;
+
   // Set only by test code to disallow pushdown.
   bool disallow_pushdown_for_tests_;
   bool disallow_decoder_eval_;
@@ -1171,7 +1184,6 @@ Status MaterializingIterator::Init(ScanSpec *spec) {
   const int32_t num_columns = schema().num_columns();
   col_idx_predicates_.clear();
   non_predicate_column_indexes_.clear();
-
   if (PREDICT_FALSE(!disallow_pushdown_for_tests_) && spec != nullptr) {
     col_idx_predicates_.reserve(spec->predicates().size());
     DCHECK_GE(num_columns, spec->predicates().size());
@@ -1205,12 +1217,20 @@ Status MaterializingIterator::Init(ScanSpec *spec) {
   // Sort the predicates by selectivity so that the most selective are evaluated
   // earlier, with ties broken by the column index.
   sort(col_idx_predicates_.begin(), col_idx_predicates_.end(),
-       [] (const pair<int32_t, ColumnPredicate>& left,
-           const pair<int32_t, ColumnPredicate>& right) {
+       [] (const ColumnIdxAndPredicate& left,
+           const ColumnIdxAndPredicate& right) {
           int comp = SelectivityComparator(left.second, right.second);
           return comp ? comp < 0 : left.first < right.first;
        });
 
+  // Important to initialize the effectiveness contexts after sorting the predicates
+  // to get right order of predicate indices.
+  for (int pred_idx = 0; pred_idx < col_idx_predicates_.size(); pred_idx++) {
+    const auto* predicate = &col_idx_predicates_[pred_idx].second;
+    if (IsColumnPredicateDisableable(predicate->predicate_type())) {
+      predicates_effectiveness_ctx_.AddDisableablePredicate(pred_idx, predicate);
+    }
+  }
   return Status::OK();
 }
 
@@ -1244,21 +1264,58 @@ Status MaterializingIterator::MaterializeBlock(RowBlock *dst) {
     return Status::OK();
   }
 
-  for (const auto& col_pred : col_idx_predicates_) {
+  predicates_effectiveness_ctx_.IncrementNextBlockCount();
+  for (int i = 0; i < col_idx_predicates_.size(); i++) {
+    const auto& col_pred = col_idx_predicates_[i];
+    const auto& col_idx = get<0>(col_pred);
+    const auto& predicate = get<1>(col_pred);
+
     // Materialize the column itself into the row block.
-    ColumnBlock dst_col(dst->column_block(get<0>(col_pred)));
-    ColumnMaterializationContext ctx(get<0>(col_pred),
-                                     &get<1>(col_pred),
+    ColumnBlock dst_col(dst->column_block(col_idx));
+    ColumnMaterializationContext ctx(col_idx,
+                                     &predicate,
                                      &dst_col,
                                      dst->selection_vector());
     // None predicates should be short-circuited in scan spec.
     DCHECK(ctx.pred()->predicate_type() != PredicateType::None);
-    if (disallow_decoder_eval_) {
+    auto* effectiveness_ctx = IsColumnPredicateDisableable(predicate.predicate_type()) ?
+                              &predicates_effectiveness_ctx_[i] : nullptr;
+    // Predicate evaluation will be disabled for a predicate already determined to be ineffective
+    // both at decoder level and outside.
+    //
+    // Indicate whether the predicate has been disabled or not. If the predicate is not disableable
+    // (currently only Bloom filter predicates are disableable), both of these are false.
+    bool disableable_predicate_disabled =
+        effectiveness_ctx && !effectiveness_ctx->IsPredicateEnabled();
+    bool disableable_predicate_enabled =
+        effectiveness_ctx && effectiveness_ctx->IsPredicateEnabled();
+
+    if (disallow_decoder_eval_ || disableable_predicate_disabled) {
+      // This column predicate is determined to be ineffective, hence disable the decoder level
+      // evaluation.
       ctx.SetDecoderEvalNotSupported();
     }
+
+    // Determine the number of rows filtered out by this predicate, if disableable.
+    //
+    // Currently we don't have a mechanism to get precise number of rows filtered out by a
+    // particular predicate and adding such a stat could in fact slow down the filtering.
+    // This count of rows filtered out is not precise as the rows filtered out by predicates
+    // earlier in the sort order get more credit than the ones later in the order. Nevertheless it
+    // still helps measure whether the predicate is effective in filtering out rows.
+    auto num_rows_before = disableable_predicate_enabled ?
+                           dst->selection_vector()->CountSelected() : 0;
+
     RETURN_NOT_OK(iter_->MaterializeColumn(&ctx));
-    if (ctx.DecoderEvalNotSupported()) {
-      get<1>(col_pred).Evaluate(dst_col, dst->selection_vector());
+    if (ctx.DecoderEvalNotSupported() && !disableable_predicate_disabled) {
+      predicate.Evaluate(dst_col, dst->selection_vector());
+    }
+    if (disableable_predicate_enabled) {
+      auto num_rows_rejected = num_rows_before - dst->selection_vector()->CountSelected();
+      DCHECK_GE(num_rows_rejected, 0);
+      DCHECK_LE(num_rows_rejected, num_rows_before);
+      effectiveness_ctx->rows_rejected += num_rows_rejected;
+      effectiveness_ctx->rows_read += dst->nrows();
     }
 
     // If after evaluating this predicate the entire row block has been filtered
@@ -1268,6 +1325,7 @@ Status MaterializingIterator::MaterializeBlock(RowBlock *dst) {
       return Status::OK();
     }
   }
+  predicates_effectiveness_ctx_.DisableIneffectivePredicates();
 
   for (size_t col_idx : non_predicate_column_indexes_) {
     // Materialize the column itself into the row block.
@@ -1325,16 +1383,36 @@ class PredicateEvaluatingIterator : public RowwiseIterator {
 
   virtual void GetIteratorStats(std::vector<IteratorStats>* stats) const OVERRIDE {
     base_iter_->GetIteratorStats(stats);
+    predicates_effectiveness_ctx_.PopulateIteratorStatsWithDisabledPredicates(
+        col_idx_predicates_, stats);
   }
 
-  const vector<ColumnPredicate>& col_predicates() const { return col_predicates_; }
+  // Return the column predicates tracked by this iterator. Only valid for the lifetime of
+  // the iterator.
+  vector<const ColumnPredicate*> col_predicates() const {
+    vector<const ColumnPredicate*> result;
+    result.reserve(col_idx_predicates_.size());
+    std::transform(col_idx_predicates_.begin(),
+                   col_idx_predicates_.end(),
+                   std::back_inserter(result),
+                   [](const ColumnIdxAndPredicate& idx_and_pred) { return &idx_and_pred.second; });
+    return result;
+  }
+
+  const IteratorPredicateEffectivenessContext& effectiveness_context() const {
+    return predicates_effectiveness_ctx_;
+  }
 
  private:
+
   unique_ptr<RowwiseIterator> base_iter_;
 
-  // List of predicates in order of most to least selective, with
+  // List of (column index, predicate) in order of most to least selective, with
   // ties broken by the column index.
-  vector<ColumnPredicate> col_predicates_;
+  vector<ColumnIdxAndPredicate> col_idx_predicates_;
+
+  // Predicate effective contexts help disable ineffective column predicates.
+  IteratorPredicateEffectivenessContext predicates_effectiveness_ctx_;
 };
 
 PredicateEvaluatingIterator::PredicateEvaluatingIterator(unique_ptr<RowwiseIterator> base_iter)
@@ -1347,23 +1425,39 @@ Status PredicateEvaluatingIterator::Init(ScanSpec *spec) {
 
   // Gather any predicates that the base iterator did not pushdown, and remove
   // the predicates from the spec.
-  col_predicates_.clear();
-  col_predicates_.reserve(spec->predicates().size());
-  for (auto& predicate : spec->predicates()) {
-    col_predicates_.emplace_back(predicate.second);
+  col_idx_predicates_.clear();
+  col_idx_predicates_.reserve(spec->predicates().size());
+  for (const auto& col_pred : spec->predicates()) {
+    const auto& col_name = col_pred.first;
+    const ColumnPredicate& pred = col_pred.second;
+    DCHECK_EQ(col_name, pred.column().name());
+    int col_idx = schema().find_column(col_name);
+    if (col_idx == Schema::kColumnNotFound) {
+      return Status::InvalidArgument("No such column", col_name);
+    }
+    VLOG(1) << "Pushing down predicate " << pred.ToString();
+    col_idx_predicates_.emplace_back(col_idx, pred);
   }
   spec->RemovePredicates();
 
   // Sort the predicates by selectivity so that the most selective are evaluated
   // earlier, with ties broken by the column index.
-  sort(col_predicates_.begin(), col_predicates_.end(),
-       [&] (const ColumnPredicate& left, const ColumnPredicate& right) {
-          int comp = SelectivityComparator(left, right);
-          if (comp != 0) return comp < 0;
-          return schema().find_column(left.column().name())
-               < schema().find_column(right.column().name());
+  sort(col_idx_predicates_.begin(), col_idx_predicates_.end(),
+       [] (const ColumnIdxAndPredicate& left,
+           const ColumnIdxAndPredicate& right) {
+         int comp = SelectivityComparator(left.second, right.second);
+         return comp ? comp < 0 : left.first < right.first;
        });
 
+  // Important to initialize the effectiveness contexts after sorting the predicates
+  // to get right order of predicate indices.
+  for (int pred_idx = 0; pred_idx < col_idx_predicates_.size(); pred_idx++) {
+    const auto* predicate = &col_idx_predicates_[pred_idx].second;
+    if (IsColumnPredicateDisableable(predicate->predicate_type())) {
+      predicates_effectiveness_ctx_.AddDisableablePredicate(pred_idx, predicate);
+    }
+  }
+
   return Status::OK();
 }
 
@@ -1374,19 +1468,46 @@ bool PredicateEvaluatingIterator::HasNext() const {
 Status PredicateEvaluatingIterator::NextBlock(RowBlock *dst) {
   RETURN_NOT_OK(base_iter_->NextBlock(dst));
 
-  for (const auto& predicate : col_predicates_) {
-    int32_t col_idx = dst->schema()->find_column(predicate.column().name());
-    if (col_idx == Schema::kColumnNotFound) {
-      return Status::InvalidArgument("Unknown column in predicate", predicate.ToString());
+  predicates_effectiveness_ctx_.IncrementNextBlockCount();
+  for (int i = 0; i < col_idx_predicates_.size(); i++) {
+    const auto& col_idx = col_idx_predicates_[i].first;
+    const auto& predicate = col_idx_predicates_[i].second;
+    DCHECK_NE(col_idx, Schema::kColumnNotFound);
+
+    auto* effectiveness_ctx = IsColumnPredicateDisableable(predicate.predicate_type()) ?
+                              &predicates_effectiveness_ctx_[i] : nullptr;
+    if (effectiveness_ctx && !effectiveness_ctx->IsPredicateEnabled()) {
+      // Column predicate determined to be disabled.
+      continue;
     }
+
+    // Determine the number of rows filtered out by this predicate.
+    //
+    // Currently we don't have a mechanism to get precise number of rows filtered out by a
+    // particular predicate and adding such stat could in fact slow down the filtering.
+    // This count of rows filtered out is not precise as the rows filtered out by predicates
+    // earlier in the sort order get more credit than the ones later in the order. Nevertheless it
+    // still helps measure whether the predicate is effective in filtering out rows.
+    auto num_rows_before = effectiveness_ctx ?
+                           dst->selection_vector()->CountSelected() : 0;
+
     predicate.Evaluate(dst->column_block(col_idx), dst->selection_vector());
 
+    if (effectiveness_ctx) {
+      auto num_rows_rejected = num_rows_before - dst->selection_vector()->CountSelected();
+      DCHECK_GE(num_rows_rejected, 0);
+      DCHECK_LE(num_rows_rejected, num_rows_before);
+      effectiveness_ctx->rows_rejected += num_rows_rejected;
+      effectiveness_ctx->rows_read += dst->nrows();
+    }
+
     // If after evaluating this predicate, the entire row block has now been
     // filtered out, we don't need to evaluate any further predicates.
     if (!dst->selection_vector()->AnySelected()) {
       break;
     }
   }
+  predicates_effectiveness_ctx_.DisableIneffectivePredicates();
 
   return Status::OK();
 }
@@ -1408,11 +1529,29 @@ Status InitAndMaybeWrap(unique_ptr<RowwiseIterator>* base_iter,
   return Status::OK();
 }
 
-const vector<ColumnPredicate>& GetIteratorPredicatesForTests(
+vector<const ColumnPredicate*> GetIteratorPredicatesForTests(
     const unique_ptr<RowwiseIterator>& iter) {
   PredicateEvaluatingIterator* pred_eval =
       down_cast<PredicateEvaluatingIterator*>(iter.get());
   return pred_eval->col_predicates();
 }
 
+const IteratorPredicateEffectivenessContext& GetIteratorPredicateEffectivenessCtxForTests(
+    const std::unique_ptr<RowwiseIterator>& iter) {
+  auto* iter_ptr = iter.get();
+  // Using dynamic_cast like below is not recommended but okay considering following reasons:
+  // - This function is only used for tests
+  // - PredicateEvaluatingIterator and MaterializingIterator are hidden from public access.
+  // - Introducing effectiveness context for base RowwiseIterator would be unnecessary
+  //   for other derived classes of RowwiseIterator.
+  if (auto* pred_iter = dynamic_cast<PredicateEvaluatingIterator*>(iter_ptr)) {
+    return pred_iter->effectiveness_context();
+  }
+  if (auto* pred_iter = dynamic_cast<MaterializingIterator*>(iter_ptr)) {
+    return pred_iter->effectiveness_context();
+  }
+  LOG(FATAL) << "Effectiveness context not available for iterator type: "
+             << typeid(iter_ptr).name();
+}
+
 } // namespace kudu
diff --git a/src/kudu/common/generic_iterators.h b/src/kudu/common/generic_iterators.h
index 92462c2..0b0add8 100644
--- a/src/kudu/common/generic_iterators.h
+++ b/src/kudu/common/generic_iterators.h
@@ -81,7 +81,7 @@ Status InitAndMaybeWrap(std::unique_ptr<RowwiseIterator>* base_iter,
 // Gets the predicates associated with a PredicateEvaluatingIterator.
 //
 // Only for use by tests.
-const std::vector<ColumnPredicate>& GetIteratorPredicatesForTests(
+std::vector<const ColumnPredicate*> GetIteratorPredicatesForTests(
     const std::unique_ptr<RowwiseIterator>& iter);
 
 } // namespace kudu
diff --git a/src/kudu/common/iterator_stats.cc b/src/kudu/common/iterator_stats.cc
index 878e9bb..77f4961 100644
--- a/src/kudu/common/iterator_stats.cc
+++ b/src/kudu/common/iterator_stats.cc
@@ -29,18 +29,20 @@ using strings::Substitute;
 IteratorStats::IteratorStats()
     : cells_read(0),
       bytes_read(0),
-      blocks_read(0) {
+      blocks_read(0),
+      predicates_disabled(0) {
 }
 
 string IteratorStats::ToString() const {
-  return Substitute("cells_read=$0 bytes_read=$1 blocks_read=$2",
-                    cells_read, bytes_read, blocks_read);
+  return Substitute("cells_read=$0 bytes_read=$1 blocks_read=$2 predicates_disabled=$3",
+                    cells_read, bytes_read, blocks_read, predicates_disabled);
 }
 
 IteratorStats& IteratorStats::operator+=(const IteratorStats& other) {
   cells_read += other.cells_read;
   bytes_read += other.bytes_read;
   blocks_read += other.blocks_read;
+  predicates_disabled += other.predicates_disabled;
   DCheckNonNegative();
   return *this;
 }
@@ -49,6 +51,7 @@ IteratorStats& IteratorStats::operator-=(const IteratorStats& other) {
   cells_read -= other.cells_read;
   bytes_read -= other.bytes_read;
   blocks_read -= other.blocks_read;
+  predicates_disabled -= other.predicates_disabled;
   DCheckNonNegative();
   return *this;
 }
@@ -69,5 +72,6 @@ void IteratorStats::DCheckNonNegative() const {
   DCHECK_GE(cells_read, 0);
   DCHECK_GE(bytes_read, 0);
   DCHECK_GE(blocks_read, 0);
+  DCHECK_GE(predicates_disabled, 0);
 }
 } // namespace kudu
diff --git a/src/kudu/common/iterator_stats.h b/src/kudu/common/iterator_stats.h
index df0b69e..2197423 100644
--- a/src/kudu/common/iterator_stats.h
+++ b/src/kudu/common/iterator_stats.h
@@ -37,6 +37,12 @@ struct IteratorStats {
   // The number of CFile data blocks read from disk (or cache) by the iterator.
   int64_t blocks_read;
 
+  // The number of column predicates disabled because they were determined to be
+  // ineffective.
+  // There is only one predicate per column (if any) so for a per column stat this would be 0 or 1.
+  // Using an integer helps the stat work well when aggregating or computing delta.
+  int64_t predicates_disabled;
+
   // Add statistics contained 'other' to this object (for each field
   // in this object, increment it by the value of the equivalent field
   // in 'other').
diff --git a/src/kudu/common/predicate_effectiveness.cc b/src/kudu/common/predicate_effectiveness.cc
new file mode 100644
index 0000000..6c78bab
--- /dev/null
+++ b/src/kudu/common/predicate_effectiveness.cc
@@ -0,0 +1,95 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/common/predicate_effectiveness.h"
+
+#include <ostream>
+
+#include <gflags/gflags.h>
+
+#include "kudu/common/iterator_stats.h"
+#include "kudu/gutil/macros.h"
+#include "kudu/util/flag_tags.h"
+
+using std::pair;
+using std::vector;
+
+DEFINE_bool(predicate_effectivess_enabled, true,
+            "Should ineffective predicate filtering be disabled");
+TAG_FLAG(predicate_effectivess_enabled, advanced);
+TAG_FLAG(predicate_effectivess_enabled, runtime);
+
+DEFINE_double(predicate_effectivess_reject_ratio, 0.10,
+              "Rejection ratio below which predicate filter is considered ineffective");
+TAG_FLAG(predicate_effectivess_reject_ratio, advanced);
+TAG_FLAG(predicate_effectivess_reject_ratio, runtime);
+
+DEFINE_int32(predicate_effectivess_num_skip_blocks, 128,
+             "Number of blocks to skip for every predicate effectiveness check");
+TAG_FLAG(predicate_effectivess_num_skip_blocks, advanced);
+TAG_FLAG(predicate_effectivess_num_skip_blocks, runtime);
+
+namespace kudu {
+
+bool IsColumnPredicateDisableable(PredicateType type) {
+  return FLAGS_predicate_effectivess_enabled &&
+         type == PredicateType::InBloomFilter;
+}
+
+bool PredicateEffectivenessContext::IsPredicateEnabled() const {
+  return !FLAGS_predicate_effectivess_enabled || enabled;
+}
+
+void IteratorPredicateEffectivenessContext::DisableIneffectivePredicates() {
+  if (!FLAGS_predicate_effectivess_enabled ||
+      (next_block_count_ % FLAGS_predicate_effectivess_num_skip_blocks) != 0) {
+    return;
+  }
+
+  for (auto& idx_and_pred_ctx : predicate_ctxs_) {
+    auto& pred_ctx = idx_and_pred_ctx.second;
+    DCHECK(IsColumnPredicateDisableable(pred_ctx.pred->predicate_type()));
+    if (!pred_ctx.enabled) {
+      // Predicate already disabled.
+      continue;
+    }
+    DCHECK(pred_ctx.rows_read != 0);
+    auto rejection_ratio = static_cast<double>(pred_ctx.rows_rejected) / pred_ctx.rows_read;
+    DCHECK_LE(rejection_ratio, 1.0);
+    if (rejection_ratio < FLAGS_predicate_effectivess_reject_ratio) {
+      pred_ctx.enabled = false;
+      VLOG(1) << "Disabling column predicate " << pred_ctx.pred->ToString();
+    }
+  }
+}
+
+void IteratorPredicateEffectivenessContext::PopulateIteratorStatsWithDisabledPredicates(
+    const vector<pair<int32_t, ColumnPredicate>>& col_idx_predicates,
+    vector<IteratorStats>* stats) const {
+
+  for (const auto& pred_idx_and_eff_ctx : predicate_ctxs_) {
+    const auto& pred_idx = pred_idx_and_eff_ctx.first;
+    const auto& eff_ctx = pred_idx_and_eff_ctx.second;
+    const auto& col_idx = col_idx_predicates[pred_idx].first;
+    DCHECK(IsColumnPredicateDisableable(eff_ctx.pred->predicate_type()))
+      << "Incorrectly tracked non-disableable predicate: " << eff_ctx.pred->ToString();
+    DCHECK_LT(col_idx, stats->size());
+    (*stats)[col_idx].predicates_disabled = !eff_ctx.enabled;
+  }
+}
+
+} // namespace kudu
diff --git a/src/kudu/common/predicate_effectiveness.h b/src/kudu/common/predicate_effectiveness.h
new file mode 100644
index 0000000..17302c8
--- /dev/null
+++ b/src/kudu/common/predicate_effectiveness.h
@@ -0,0 +1,116 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#pragma once
+
+#include <cstddef>
+#include <cstdint>
+#include <memory>
+#include <unordered_map>
+#include <utility>
+#include <vector>
+
+#include <glog/logging.h>
+
+#include "kudu/common/column_predicate.h"
+
+namespace kudu {
+class RowwiseIterator;
+struct IteratorStats;
+
+// Whether the column predicate can be checked for effectiveness and automatically disabled.
+// Currently only Bloom filter predicate qualifies.
+bool IsColumnPredicateDisableable(PredicateType type);
+
+// Per disableable column predicate effectiveness context.
+struct PredicateEffectivenessContext {
+  const ColumnPredicate* pred;
+  size_t rows_read;
+  size_t rows_rejected;
+  bool enabled;
+  explicit PredicateEffectivenessContext(const ColumnPredicate* pred)
+      : pred(pred),
+        rows_read(0),
+        rows_rejected(0),
+        enabled(true) {
+  }
+
+  // Is the column predicate tracked by this context enabled.
+  bool IsPredicateEnabled() const;
+};
+
+// Per iterator effectiveness context that wraps effectiveness contexts for multiple column
+// predicates.
+class IteratorPredicateEffectivenessContext {
+ public:
+  IteratorPredicateEffectivenessContext() : next_block_count_(0) {}
+
+  // Add a disableable column predicate to track, where 'idx' is the user supplied key.
+  void AddDisableablePredicate(int idx, const ColumnPredicate* pred) {
+    DCHECK(IsColumnPredicateDisableable(pred->predicate_type()));
+    predicate_ctxs_.emplace(idx, PredicateEffectivenessContext(pred));
+  }
+
+  void IncrementNextBlockCount() {
+    next_block_count_++;
+  }
+
+  // Get the effectiveness context associated with column predicate index 'idx'.
+  PredicateEffectivenessContext& operator[](int idx) {
+    auto it = predicate_ctxs_.find(idx);
+    CHECK(it != predicate_ctxs_.end());
+    return it->second;
+  }
+
+  const PredicateEffectivenessContext& operator[](int idx) const {
+    auto it = predicate_ctxs_.find(idx);
+    CHECK(it != predicate_ctxs_.end());
+    return it->second;
+  }
+
+  int num_predicate_ctxs() const {
+    return predicate_ctxs_.size();
+  }
+
+  // Checks effectiveness of the predicates using stats collected so far and disables
+  // ineffective disableable predicates.
+  // Heuristic: Check every FLAGS_predicate_effectivess_num_skip_blocks blocks,
+  // ratio of rows rejected by a predicate. If the rejection ratio is less than
+  // FLAGS_predicate_effectivess_reject_ratio then the predicate is disabled.
+  void DisableIneffectivePredicates();
+
+  // Populate the output 'stats' vector with disabled column predicates
+  // where indices of the vector correspond to the table's column indices.
+  //
+  // Input 'col_idx_predicates' helps map predicate index to the table's column index.
+  void PopulateIteratorStatsWithDisabledPredicates(
+      const std::vector<std::pair<int32_t, ColumnPredicate>>& col_idx_predicates,
+      std::vector<IteratorStats>* stats) const;
+
+ private:
+  int next_block_count_;
+  // Per column predicate effectiveness context where key is the index as specified
+  // in AddDisableablePredicate().
+  std::unordered_map<int, PredicateEffectivenessContext> predicate_ctxs_;
+};
+
+// Gets the predicate effectiveness context associated with the iterator.
+//
+// Only for use by tests.
+const IteratorPredicateEffectivenessContext& GetIteratorPredicateEffectivenessCtxForTests(
+    const std::unique_ptr<RowwiseIterator>& iter);
+
+} // namespace kudu
diff --git a/src/kudu/mini-cluster/internal_mini_cluster.cc b/src/kudu/mini-cluster/internal_mini_cluster.cc
index 3069353..ad6c527 100644
--- a/src/kudu/mini-cluster/internal_mini_cluster.cc
+++ b/src/kudu/mini-cluster/internal_mini_cluster.cc
@@ -26,6 +26,7 @@
 #include "kudu/client/client.h"
 #include "kudu/common/wire_protocol.pb.h"
 #include "kudu/fs/fs_manager.h"
+#include "kudu/gutil/ref_counted.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/master/catalog_manager.h"
 #include "kudu/master/master.h"
@@ -34,9 +35,12 @@
 #include "kudu/master/ts_descriptor.h"
 #include "kudu/master/ts_manager.h"
 #include "kudu/rpc/messenger.h"
+#include "kudu/tablet/tablet.h"
+#include "kudu/tablet/tablet_replica.h"
 #include "kudu/tserver/mini_tablet_server.h"
 #include "kudu/tserver/tablet_server.h"
 #include "kudu/tserver/tablet_server_options.h"
+#include "kudu/tserver/ts_tablet_manager.h"
 #include "kudu/tserver/tserver_service.proxy.h"
 #include "kudu/util/env.h"
 #include "kudu/util/monotime.h"
@@ -273,6 +277,23 @@ string InternalMiniCluster::GetTabletServerFsRoot(int idx) const {
   return JoinPathSegments(opts_.cluster_root, Substitute("ts-$0-root", idx));
 }
 
+Status InternalMiniCluster::FlushTablet(const string& tablet_id) {
+  // Flag to ensure specified tablet is found and flushed on at least one of the tablet servers.
+  bool tablet_found = false;
+  for (const auto& ts : mini_tablet_servers_) {
+    scoped_refptr<tablet::TabletReplica> tablet_replica;
+    if (ts->server()->tablet_manager()->LookupTablet(tablet_id, &tablet_replica)) {
+      tablet_found = true;
+      RETURN_NOT_OK(tablet_replica->tablet()->Flush());
+    }
+  }
+  if (!tablet_found) {
+    return Status::NotFound(Substitute("Tablet $0 not found on any of the tablet servers",
+                                       tablet_id));
+  }
+  return Status::OK();
+}
+
 Status InternalMiniCluster::WaitForTabletServerCount(int count) const {
   vector<shared_ptr<master::TSDescriptor>> descs;
   return WaitForTabletServerCount(count, MatchMode::MATCH_TSERVERS, &descs);
diff --git a/src/kudu/mini-cluster/internal_mini_cluster.h b/src/kudu/mini-cluster/internal_mini_cluster.h
index 284da93..bf205b1 100644
--- a/src/kudu/mini-cluster/internal_mini_cluster.h
+++ b/src/kudu/mini-cluster/internal_mini_cluster.h
@@ -160,6 +160,9 @@ class InternalMiniCluster : public MiniCluster {
 
   std::string GetTabletServerFsRoot(int idx) const;
 
+  // Flush the specified tablet on all tablet replicas in the cluster.
+  Status FlushTablet(const std::string& tablet_id);
+
   // Wait until the number of registered tablet servers reaches the given
   // count on all masters. Returns Status::TimedOut if the desired count is not
   // achieved within kRegistrationWaitTimeSeconds.
diff --git a/src/kudu/tablet/tablet_metrics.cc b/src/kudu/tablet/tablet_metrics.cc
index a8cccbc..fd98049 100644
--- a/src/kudu/tablet/tablet_metrics.cc
+++ b/src/kudu/tablet/tablet_metrics.cc
@@ -79,6 +79,13 @@ METRIC_DEFINE_counter(tablet, scanner_bytes_returned, "Scanner Bytes Returned",
                       "for consumption by clients, and thus is not "
                       "a reflection of the amount of work being done by scanners.",
                       kudu::MetricLevel::kDebug);
+METRIC_DEFINE_counter(tablet, scanner_predicates_disabled, "Scanner Column Predicates Disabled",
+                      kudu::MetricUnit::kUnits,
+                      "Number of column predicates disabled during scan requests. "
+                      "This count measures the number of disableable column predicates like "
+                      "Bloom filter predicate that are automatically disabled if determined to "
+                      "be ineffective.",
+                      kudu::MetricLevel::kInfo);
 
 METRIC_DEFINE_counter(tablet, scanner_rows_scanned, "Scanner Rows Scanned",
                       kudu::MetricUnit::kRows,
@@ -365,6 +372,7 @@ TabletMetrics::TabletMetrics(const scoped_refptr<MetricEntity>& entity)
     MINIT(scanner_rows_scanned),
     MINIT(scanner_cells_scanned_from_disk),
     MINIT(scanner_bytes_scanned_from_disk),
+    MINIT(scanner_predicates_disabled),
     MINIT(scans_started),
     GINIT(tablet_active_scanners),
     MINIT(bloom_lookups),
diff --git a/src/kudu/tablet/tablet_metrics.h b/src/kudu/tablet/tablet_metrics.h
index 1975779..9081fdc 100644
--- a/src/kudu/tablet/tablet_metrics.h
+++ b/src/kudu/tablet/tablet_metrics.h
@@ -59,6 +59,7 @@ struct TabletMetrics {
   scoped_refptr<Counter> scanner_rows_scanned;
   scoped_refptr<Counter> scanner_cells_scanned_from_disk;
   scoped_refptr<Counter> scanner_bytes_scanned_from_disk;
+  scoped_refptr<Counter> scanner_predicates_disabled;
   scoped_refptr<Counter> scans_started;
   scoped_refptr<AtomicGauge<size_t>> tablet_active_scanners;
 
diff --git a/src/kudu/tserver/tablet_service.cc b/src/kudu/tserver/tablet_service.cc
index 6a87a65..794d599 100644
--- a/src/kudu/tserver/tablet_service.cc
+++ b/src/kudu/tserver/tablet_service.cc
@@ -2882,6 +2882,7 @@ Status TabletServiceImpl::HandleContinueScanRequest(const ScanRequestPB* req,
     tablet->metrics()->scanner_rows_scanned->IncrementBy(rows_scanned);
     tablet->metrics()->scanner_cells_scanned_from_disk->IncrementBy(delta_stats.cells_read);
     tablet->metrics()->scanner_bytes_scanned_from_disk->IncrementBy(delta_stats.bytes_read);
+    tablet->metrics()->scanner_predicates_disabled->IncrementBy(delta_stats.predicates_disabled);
 
     // Last read timestamp.
     tablet->UpdateLastReadTime();