You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by da...@apache.org on 2018/10/11 22:50:51 UTC

kudu git commit: Implement BloomFilter Predicate in server side.

Repository: kudu
Updated Branches:
  refs/heads/master c8dd7b53f -> 8af288a26


Implement BloomFilter Predicate in server side.

Change-Id: I62c2de42667d0255d94e19db773240f7f9ee636c
Reviewed-on: http://gerrit.cloudera.org:8080/11100
Tested-by: Kudu Jenkins
Reviewed-by: Dan Burkert <da...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/8af288a2
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/8af288a2
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/8af288a2

Branch: refs/heads/master
Commit: 8af288a26a204e2acfc3aa4e642fba7de56b43bb
Parents: c8dd7b5
Author: triplesheep <tr...@gmail.com>
Authored: Wed Aug 1 10:54:21 2018 +0000
Committer: Dan Burkert <da...@apache.org>
Committed: Thu Oct 11 22:50:37 2018 +0000

----------------------------------------------------------------------
 src/kudu/common/column_predicate-test.cc | 287 ++++++++++++++++++++++++++
 src/kudu/common/column_predicate.cc      | 193 +++++++++++++++--
 src/kudu/common/column_predicate.h       | 140 ++++++++++++-
 src/kudu/common/common.proto             |  40 +++-
 src/kudu/common/key_util.cc              |   2 +
 src/kudu/common/scan_spec.cc             |   4 +
 src/kudu/common/wire_protocol-test.cc    | 128 +++++++++++-
 src/kudu/common/wire_protocol.cc         |  79 ++++++-
 src/kudu/tablet/cfile_set-test.cc        | 211 ++++++++++++++++++-
 src/kudu/util/bloom_filter.h             |  23 ++-
 10 files changed, 1068 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/8af288a2/src/kudu/common/column_predicate-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/common/column_predicate-test.cc b/src/kudu/common/column_predicate-test.cc
index d96cc41..e0b6b63 100644
--- a/src/kudu/common/column_predicate-test.cc
+++ b/src/kudu/common/column_predicate-test.cc
@@ -30,8 +30,10 @@
 #include "kudu/common/schema.h"
 #include "kudu/common/types.h"
 #include "kudu/gutil/strings/substitute.h"
+#include "kudu/util/bloom_filter.h"
 #include "kudu/util/int128.h"
 #include "kudu/util/memory/arena.h"
+#include "kudu/util/random.h"
 #include "kudu/util/slice.h"
 #include "kudu/util/test_util.h"
 
@@ -41,6 +43,7 @@ namespace kudu {
 
 class TestColumnPredicate : public KuduTest {
  public:
+  TestColumnPredicate() : rand_(SeedRandom()) {}
 
   // Test that when a is merged into b and vice versa, the result is equal to
   // expected, and the resulting type is equal to type.
@@ -68,6 +71,42 @@ class TestColumnPredicate : public KuduTest {
     ASSERT_EQ(b_base.predicate_type(), type);
   }
 
+  void FillBloomFilterAndValues(int n_keys,
+                                vector<uint64_t>* values,
+                                BloomFilterBuilder* bfb1,
+                                BloomFilterBuilder* bfb2) {
+    uint64_t current = 0;
+    for (int i = 0; i < 2; ++i) {
+      while (true) {
+        uint64_t key = rand_.Next();
+        if (key <= current) {
+          continue;
+        }
+        current = key;
+        Slice key_slice(reinterpret_cast<const uint8_t*>(&key), sizeof(key));
+        BloomKeyProbe probe(key_slice, MURMUR_HASH_2);
+        bfb1->AddKey(probe);
+        bfb2->AddKey(probe);
+        values->emplace_back(key);
+        break;
+      }
+    }
+    for (int i = 2; i < n_keys; ++i) {
+      while (true) {
+        uint64_t key = rand_.Next();
+        Slice key_slice(reinterpret_cast<const uint8_t*>(&key), sizeof(key));
+        BloomKeyProbe probe(key_slice, MURMUR_HASH_2);
+        BloomFilter bf(bfb1->slice(), bfb1->n_hashes());
+        if (!bf.MayContainKey(probe) && key > current) {
+          current = key;
+          values->emplace_back(key);
+          bfb2->AddKey(probe);
+          break;
+        }
+      }
+    }
+  }
+
   template <typename T>
   void TestMergeCombinations(const ColumnSchema& column, vector<T> values) {
     // Range + Range
@@ -744,6 +783,184 @@ class TestColumnPredicate : public KuduTest {
               ColumnPredicate::IsNull(column),
               PredicateType::IsNull);
   }
+
+  template <typename T>
+  void TestMergeBloomFilterCombinations(const ColumnSchema& column,
+                                        vector<ColumnPredicate::BloomFilterInner>* bf,
+                                        vector<T> values) {
+    vector<ColumnPredicate::BloomFilterInner> orig_bloom_filters = *bf;
+    // BloomFilter AND
+    // NONE
+    // =
+    // NONE
+    TestMerge(ColumnPredicate::InBloomFilter(column, bf, nullptr, nullptr),
+              ColumnPredicate::None(column),
+              ColumnPredicate::None(column),
+              PredicateType::None);
+
+    // BloomFilter AND
+    // Equality
+    // =
+    // Equality
+    *bf = orig_bloom_filters;
+    TestMerge(ColumnPredicate::InBloomFilter(column, bf, nullptr, nullptr),
+              ColumnPredicate::Equality(column, &values[0]),
+              ColumnPredicate::Equality(column, &values[0]),
+              PredicateType::Equality);
+
+    // BloomFilter AND
+    // Equality
+    // =
+    // None
+    *bf = orig_bloom_filters;
+    TestMerge(ColumnPredicate::InBloomFilter(column, bf, nullptr, nullptr),
+              ColumnPredicate::Equality(column, &values[2]),
+              ColumnPredicate::None(column),
+              PredicateType::None);
+
+    // BloomFilter AND
+    // IS NOT NULL
+    // =
+    // BloomFilter
+    *bf = orig_bloom_filters;
+    vector<ColumnPredicate::BloomFilterInner> bf_copy = *bf;
+    TestMerge(ColumnPredicate::InBloomFilter(column, bf, nullptr, nullptr),
+              ColumnPredicate::IsNotNull(column),
+              ColumnPredicate::InBloomFilter(column, &bf_copy, nullptr, nullptr),
+              PredicateType::InBloomFilter);
+
+    // BloomFilter AND
+    // IS NULL
+    // =
+    // None
+    *bf = orig_bloom_filters;
+    TestMerge(ColumnPredicate::InBloomFilter(column, bf, nullptr, nullptr),
+              ColumnPredicate::IsNull(column),
+              ColumnPredicate::None(column),
+              PredicateType::None);
+
+    // BloomFilter AND
+    // InList
+    // =
+    // None(the value in list can not hit bloom filter)
+    *bf = orig_bloom_filters;
+    vector<const void*> in_list = { &values[2], &values[3], &values[4] };
+    vector<const void*> hit_list;
+    TestMerge(ColumnPredicate::InBloomFilter(column, bf, nullptr, nullptr),
+              ColumnPredicate::InList(column, &in_list),
+              ColumnPredicate::None(column),
+              PredicateType::None);
+
+    // BloomFilter AND
+    // InList
+    // =
+    // InList(the value in list all hits bloom filter)
+    in_list = { &values[0], &values[1] };
+    hit_list = { &values[0], &values[1] };
+    *bf = orig_bloom_filters;
+    TestMerge(ColumnPredicate::InBloomFilter(column, bf, nullptr, nullptr),
+              ColumnPredicate::InList(column, &in_list),
+              ColumnPredicate::InList(column, &hit_list),
+              PredicateType::InList);
+
+    // BloomFilter AND
+    // InList
+    // =
+    // InList(only the some values in list hits bloom filter)
+    in_list = { &values[0], &values[1], &values[2], &values[3] };
+    hit_list = { &values[0], &values[1]};
+    *bf = orig_bloom_filters;
+    TestMerge(ColumnPredicate::InBloomFilter(column, bf, nullptr, nullptr),
+              ColumnPredicate::InList(column, &in_list),
+              ColumnPredicate::InList(column, &hit_list),
+              PredicateType::InList);
+
+    // BloomFilter AND
+    // InList
+    // =
+    // Equality(only the first value in list hits bloom filter, so it simplify to Equality)
+    in_list = { &values[0], &values[2], &values[3] };
+    *bf = orig_bloom_filters;
+    TestMerge(ColumnPredicate::InBloomFilter(column, bf, nullptr, nullptr),
+              ColumnPredicate::InList(column, &in_list),
+              ColumnPredicate::Equality(column, &values[0]),
+              PredicateType::Equality);
+
+    // Range AND
+    // BloomFilter
+    // =
+    // BloomFilter with lower and upper bound
+    *bf = orig_bloom_filters;
+    bf_copy = *bf;
+    TestMerge(ColumnPredicate::Range(column, &values[0], &values[4]),
+              ColumnPredicate::InBloomFilter(column, bf, nullptr, nullptr),
+              ColumnPredicate::InBloomFilter(column, &bf_copy, &values[0], &values[4]),
+              PredicateType::InBloomFilter);
+
+    // BloomFilter with lower and upper bound AND
+    // Range
+    // =
+    // BloomFilter with lower and upper bound
+    *bf = orig_bloom_filters;
+    bf_copy = *bf;
+    TestMerge(ColumnPredicate::InBloomFilter(column, bf, &values[0], &values[4]),
+              ColumnPredicate::Range(column, &values[1], &values[3]),
+              ColumnPredicate::InBloomFilter(column, &bf_copy, &values[1], &values[3]),
+              PredicateType::InBloomFilter);
+
+    // BloomFilter with lower and upper bound AND
+    // Range
+    // =
+    // None
+    *bf = orig_bloom_filters;
+    bf_copy = *bf;
+    TestMerge(ColumnPredicate::InBloomFilter(column, bf, &values[0], &values[2]),
+              ColumnPredicate::Range(column, &values[2], &values[4]),
+              ColumnPredicate::None(column),
+              PredicateType::None);
+
+    // BloomFilter AND
+    // BloomFilter with lower and upper bound
+    // =
+    // BloomFilter with lower and upper bound
+    *bf = orig_bloom_filters;
+    bf_copy = *bf;
+    vector<ColumnPredicate::BloomFilterInner> collect = *bf;
+    collect.insert(collect.end(), bf->begin(), bf->end());
+    TestMerge(ColumnPredicate::InBloomFilter(column, bf, nullptr, nullptr),
+              ColumnPredicate::InBloomFilter(column, &bf_copy, &values[0], &values[4]),
+              ColumnPredicate::InBloomFilter(column, &collect, &values[0], &values[4]),
+              PredicateType::InBloomFilter);
+
+    // BloomFilter with lower and upper bound AND
+    // BloomFilter with lower and upper bound
+    // =
+    // BloomFilter with lower and upper bound
+    *bf = orig_bloom_filters;
+    collect = *bf;
+    bf_copy = *bf;
+    collect.insert(collect.end(), bf->begin(), bf->end());
+    TestMerge(ColumnPredicate::InBloomFilter(column, bf, &values[1], &values[3]),
+              ColumnPredicate::InBloomFilter(column, &bf_copy, &values[0], &values[4]),
+              ColumnPredicate::InBloomFilter(column, &collect, &values[1], &values[3]),
+              PredicateType::InBloomFilter);
+
+    // BloomFilter with lower and upper bound AND
+    // BloomFilter with lower and upper bound
+    // =
+    // None
+    *bf = orig_bloom_filters;
+    collect = *bf;
+    bf_copy = *bf;
+    collect.insert(collect.end(), bf->begin(), bf->end());
+    TestMerge(ColumnPredicate::InBloomFilter(column, bf, &values[0], &values[2]),
+              ColumnPredicate::InBloomFilter(column, &bf_copy, &values[2], &values[4]),
+              ColumnPredicate::None(column),
+              PredicateType::None);
+  }
+
+ protected:
+  Random rand_;
 };
 
 TEST_F(TestColumnPredicate, TestMerge) {
@@ -1161,4 +1378,74 @@ TEST_F(TestColumnPredicate, TestRedaction) {
   ASSERT_EQ("a = <redacted>", ColumnPredicate::Equality(column_i32, &one_32).ToString());
 }
 
+TEST_F(TestColumnPredicate, TestBloomFilterMerge) {
+  int n_keys = 5; // 0 1 both hit bf1 and bf2, 2 3 4 only hit bf2.
+  // Test for UINT64 type.
+  BloomFilterBuilder bfb1(
+          BloomFilterSizing::ByCountAndFPRate(n_keys, 0.01));
+  double expected_fp_rate1 = bfb1.false_positive_rate();
+  ASSERT_NEAR(expected_fp_rate1, 0.01, 0.002);
+  ASSERT_EQ(9, bfb1.n_bits() / n_keys);
+  BloomFilterBuilder bfb2(
+          BloomFilterSizing::ByCountAndFPRate(n_keys, 0.01));
+  double expected_fp_rate2 = bfb2.false_positive_rate();
+  ASSERT_NEAR(expected_fp_rate2, 0.01, 0.002);
+  ASSERT_EQ(9, bfb2.n_bits() / n_keys);
+  vector<uint64_t> values_int;
+  FillBloomFilterAndValues(n_keys, &values_int, &bfb1, &bfb2);
+  const Slice slice1 = bfb1.slice();
+  const Slice slice2 = bfb2.slice();
+  ColumnPredicate::BloomFilterInner bf1(slice1, bfb1.n_hashes(), MURMUR_HASH_2);
+  ColumnPredicate::BloomFilterInner bf2(slice2, bfb2.n_hashes(), MURMUR_HASH_2);
+  vector<ColumnPredicate::BloomFilterInner> bfs;
+  bfs.emplace_back(bf1);
+  TestMergeBloomFilterCombinations(ColumnSchema("c", INT64, true), &bfs, values_int);
+  bfs.clear();
+  bfs.emplace_back(bf1);
+  bfs.emplace_back(bf2);
+  TestMergeBloomFilterCombinations(ColumnSchema("c", INT64, true), &bfs, values_int);
+
+  // Test for STRING type.
+  BloomFilterBuilder bfb3(
+          BloomFilterSizing::ByCountAndFPRate(n_keys, 0.01));
+  double expected_fp_rate3 = bfb3.false_positive_rate();
+  ASSERT_NEAR(expected_fp_rate3, 0.01, 0.002);
+  ASSERT_EQ(9, bfb3.n_bits() / n_keys);
+  // 0 1 both hit bf1 and bf2, 2 3 4 only hit bf2.
+  vector<std::string> keys = {"0", "00", "10", "100", "1100"};
+  vector<Slice> keys_slice;
+  for (int i = 0; i < keys.size(); ++i) {
+    Slice key_slice(keys[i]);
+    BloomKeyProbe probe(key_slice, MURMUR_HASH_2);
+    if (i < 2) {
+      bfb3.AddKey(probe);
+    }
+    keys_slice.emplace_back(key_slice);
+  }
+  bfs.clear();
+  bfs.emplace_back(bfb3.slice(), bfb3.n_hashes(), MURMUR_HASH_2);
+  TestMergeBloomFilterCombinations(ColumnSchema("c", STRING, true), &bfs, keys_slice);
+
+  // Test for BINARY type
+  BloomFilterBuilder bfb4(
+          BloomFilterSizing::ByCountAndFPRate(n_keys, 0.01));
+  double expected_fp_rate4 = bfb4.false_positive_rate();
+  ASSERT_NEAR(expected_fp_rate4, 0.01, 0.002);
+  ASSERT_EQ(9, bfb4.n_bits() / n_keys);
+  vector<Slice> binary_keys = { Slice("", 0),
+                                Slice("\0", 1),
+                                Slice("\0\0", 2),
+                                Slice("\0\0\0", 3),
+                                Slice("\0\0\0\0", 4) };
+  for (int i = 0; i < binary_keys.size(); ++i) {
+    BloomKeyProbe probe(binary_keys[i], MURMUR_HASH_2);
+    if (i < 2) {
+      bfb4.AddKey(probe);
+    }
+  }
+  bfs.clear();
+  bfs.emplace_back(bfb4.slice(), bfb4.n_hashes(), MURMUR_HASH_2);
+  TestMergeBloomFilterCombinations(ColumnSchema("c", STRING, true), &bfs, binary_keys);
+}
+
 } // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/8af288a2/src/kudu/common/column_predicate.cc
----------------------------------------------------------------------
diff --git a/src/kudu/common/column_predicate.cc b/src/kudu/common/column_predicate.cc
index 1c784e6..b923ff4 100644
--- a/src/kudu/common/column_predicate.cc
+++ b/src/kudu/common/column_predicate.cc
@@ -19,6 +19,7 @@
 
 #include <algorithm>
 #include <cstring>
+#include <iterator>
 
 #include <boost/optional/optional.hpp>
 
@@ -27,6 +28,7 @@
 #include "kudu/common/rowblock.h"
 #include "kudu/common/schema.h"
 #include "kudu/common/types.h"
+#include "kudu/gutil/macros.h"
 #include "kudu/gutil/strings/join.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/util/bitmap.h"
@@ -59,6 +61,18 @@ ColumnPredicate::ColumnPredicate(PredicateType predicate_type,
   values_.swap(*values);
 }
 
+ColumnPredicate::ColumnPredicate(PredicateType predicate_type,
+                                 ColumnSchema column,
+                                 std::vector<BloomFilterInner>* bfs,
+                                 const void* lower,
+                                 const void* upper)
+    : predicate_type_(predicate_type),
+      column_(move(column)),
+      lower_(lower),
+      upper_(upper) {
+  bloom_filters_.swap(*bfs);
+}
+
 ColumnPredicate ColumnPredicate::Equality(ColumnSchema column, const void* value) {
   CHECK(value != nullptr);
   return ColumnPredicate(PredicateType::Equality, move(column), value, nullptr);
@@ -93,6 +107,17 @@ ColumnPredicate ColumnPredicate::InList(ColumnSchema column,
   return pred;
 }
 
+ColumnPredicate ColumnPredicate::InBloomFilter(ColumnSchema column,
+                                               std::vector<BloomFilterInner>* bfs,
+                                               const void* lower,
+                                               const void* upper) {
+  CHECK(bfs != nullptr);
+  CHECK(!bfs->empty());
+  ColumnPredicate pred(PredicateType::InBloomFilter, move(column), bfs, lower, upper);
+  pred.Simplify();
+  return pred;
+}
+
 boost::optional<ColumnPredicate> ColumnPredicate::InclusiveRange(ColumnSchema column,
                                                                  const void* lower,
                                                                  const void* upper,
@@ -167,7 +192,7 @@ void ColumnPredicate::SetToNone() {
   upper_ = nullptr;
 }
 
-// TODO: For decimal columns, use column_.type_attributes().precision
+// TODO(granthenke): For decimal columns, use column_.type_attributes().precision
 // to calculate the "true" max/min values for improved simplification.
 void ColumnPredicate::Simplify() {
   auto type_info = column_.type_info();
@@ -193,17 +218,13 @@ void ColumnPredicate::Simplify() {
         if (type_info->IsMinValue(lower_)) {
           predicate_type_ = PredicateType::IsNotNull;
           lower_ = nullptr;
-          upper_ = nullptr;
         } else if (type_info->IsMaxValue(lower_)) {
           predicate_type_ = PredicateType::Equality;
-          upper_ = nullptr;
         }
       } else if (upper_ != nullptr) {
         // VALUE < _
         if (type_info->IsMinValue(upper_)) {
-          predicate_type_ = PredicateType::None;
-          lower_ = nullptr;
-          upper_ = nullptr;
+          SetToNone();
         }
       }
       return;
@@ -229,6 +250,42 @@ void ColumnPredicate::Simplify() {
       }
       return;
     };
+    case PredicateType::InBloomFilter: {
+      if (lower_ == nullptr && upper_ == nullptr) {
+        return;
+      }
+      // Merge the optional lower and upper bound.
+      if (lower_ != nullptr && upper_ != nullptr) {
+        if (type_info->Compare(lower_, upper_) >= 0) {
+          // If the range bounds are empty then no results can be returned.
+          SetToNone();
+        } else if (type_info->AreConsecutive(lower_, upper_)) {
+          if (CheckValueInBloomFilter(lower_)) {
+            predicate_type_ = PredicateType::Equality;
+            upper_ = nullptr;
+            bloom_filters_.clear();
+          } else {
+            SetToNone();
+          }
+        }
+      } else if (lower_ != nullptr) {
+        if (type_info->IsMinValue(lower_)) {
+          lower_ = nullptr;
+        } else if (type_info->IsMaxValue(lower_)) {
+          if (CheckValueInBloomFilter(lower_)) {
+            predicate_type_ = PredicateType::Equality;
+            bloom_filters_.clear();
+          } else {
+            SetToNone();
+          }
+        }
+      } else if (upper_ != nullptr) {
+        if (type_info->IsMinValue(upper_)) {
+          SetToNone();
+        }
+      }
+      return;
+    };
   }
   LOG(FATAL) << "unknown predicate type";
 }
@@ -257,6 +314,10 @@ void ColumnPredicate::Merge(const ColumnPredicate& other) {
       MergeIntoInList(other);
       return;
     };
+    case PredicateType::InBloomFilter: {
+      MergeIntoBloomFilter(other);
+      return;
+    };
   }
   LOG(FATAL) << "unknown predicate type";
 }
@@ -269,7 +330,11 @@ void ColumnPredicate::MergeIntoRange(const ColumnPredicate& other) {
       SetToNone();
       return;
     };
-
+    case PredicateType::InBloomFilter: {
+      bloom_filters_ = other.bloom_filters_;
+      predicate_type_ = PredicateType::InBloomFilter;
+      FALLTHROUGH_INTENDED;
+    }
     case PredicateType::Range: {
       // Set the lower bound to the larger of the two.
       if (other.lower_ != nullptr &&
@@ -286,7 +351,6 @@ void ColumnPredicate::MergeIntoRange(const ColumnPredicate& other) {
       Simplify();
       return;
     };
-
     case PredicateType::Equality: {
       if ((lower_ != nullptr && column_.type_info()->Compare(lower_, other.lower_) > 0) ||
           (upper_ != nullptr && column_.type_info()->Compare(upper_, other.lower_) <= 0)) {
@@ -303,7 +367,7 @@ void ColumnPredicate::MergeIntoRange(const ColumnPredicate& other) {
     case PredicateType::IsNull: {
       SetToNone();
       return;
-    }
+    };
     case PredicateType::InList : {
       // The InList predicate values are examined to check whether
       // they lie in the range.
@@ -360,6 +424,12 @@ void ColumnPredicate::MergeIntoEquality(const ColumnPredicate& other) {
       }
       return;
     };
+    case PredicateType::InBloomFilter: {
+      if (!other.CheckValueInBloomFilter(lower_)) {
+        SetToNone();
+      }
+      return;
+    };
   }
   LOG(FATAL) << "unknown predicate type";
 }
@@ -378,6 +448,7 @@ void ColumnPredicate::MergeIntoIsNotNull(const ColumnPredicate &other) {
       lower_ = other.lower_;
       upper_ = other.upper_;
       values_ = other.values_;
+      bloom_filters_ = other.bloom_filters_;
       return;
     }
   }
@@ -499,6 +570,77 @@ void ColumnPredicate::MergeIntoInList(const ColumnPredicate &other) {
       Simplify();
       return;
     };
+    case PredicateType::InBloomFilter: {
+      std::vector<const void*> new_values;
+      std::copy_if(values_.begin(), values_.end(), std::back_inserter(new_values),
+                   [&] (const void* value) {
+                     return other.CheckValueInBloomFilter(value);
+                   });
+      values_.swap(new_values);
+      Simplify();
+      return;
+    };
+  }
+  LOG(FATAL) << "unknown predicate type";
+}
+
+void ColumnPredicate::MergeIntoBloomFilter(const ColumnPredicate &other) {
+  CHECK(predicate_type_ == PredicateType::InBloomFilter);
+  DCHECK(!bloom_filters_.empty());
+
+  switch (other.predicate_type()) {
+    case PredicateType::None: {
+      SetToNone();
+      return;
+    };
+    case PredicateType::InBloomFilter: {
+      bloom_filters_.insert(bloom_filters_.end(), other.bloom_filters().begin(),
+                            other.bloom_filters().end());
+      FALLTHROUGH_INTENDED;
+    }
+    case PredicateType::Range: {
+      // Merge the optional lower and upper bound.
+      if (other.lower_ != nullptr &&
+          (lower_ == nullptr || column_.type_info()->Compare(lower_, other.lower_) < 0)) {
+        lower_ = other.lower_;
+      }
+      if (other.upper_ != nullptr &&
+          (upper_ == nullptr || column_.type_info()->Compare(upper_, other.upper_) > 0)) {
+        upper_ = other.upper_;
+      }
+      Simplify();
+      return;
+    }
+    case PredicateType::Equality: {
+      if (CheckValueInBloomFilter(other.lower_)) {
+        // Value falls in bloom filters so change to Equality predicate.
+        predicate_type_ = PredicateType::Equality;
+        lower_ = other.lower_;
+        upper_ = nullptr;
+        bloom_filters_.clear();
+      } else {
+        SetToNone(); // Value does not fall in bloom filters.
+      }
+      return;
+    }
+    case PredicateType::IsNotNull: return;
+    case PredicateType::IsNull: {
+      SetToNone();
+      return;
+    }
+    case PredicateType::InList: {
+      DCHECK(other.values_.size() > 1);
+      std::vector<const void*> new_values;
+      std::copy_if(other.values_.begin(), other.values_.end(), std::back_inserter(new_values),
+                   [&] (const void* value) {
+                       return CheckValueInBloomFilter(value);
+                   });
+      predicate_type_ = PredicateType::InList;
+      values_.swap(new_values);
+      bloom_filters_.clear();
+      Simplify();
+      return;
+    }
   }
   LOG(FATAL) << "unknown predicate type";
 }
@@ -588,6 +730,12 @@ void ColumnPredicate::EvaluateForPhysicalType(const ColumnBlock& block,
       return;
     };
     case PredicateType::None: LOG(FATAL) << "NONE predicate evaluation";
+    case PredicateType::InBloomFilter: {
+      ApplyPredicate(block, sel, [this] (const void* cell) {
+          return EvaluateCell<PhysicalType>(cell);
+      });
+      return;
+    };
   }
   LOG(FATAL) << "unknown predicate type";
 }
@@ -666,6 +814,9 @@ string ColumnPredicate::ToString() const {
       ss.append(")");
       return ss;
     };
+    case PredicateType::InBloomFilter: {
+      return strings::Substitute("`$0` IS InBloomFilter", column_.name());
+    };
   }
   LOG(FATAL) << "unknown predicate type";
 }
@@ -677,13 +828,18 @@ bool ColumnPredicate::operator==(const ColumnPredicate& other) const {
   }
   switch (predicate_type_) {
     case PredicateType::Equality: return column_.type_info()->Compare(lower_, other.lower_) == 0;
+    case PredicateType::InBloomFilter: {
+      if (bloom_filters_ != other.bloom_filters()) {
+        return false;
+      }
+      FALLTHROUGH_INTENDED;
+    };
     case PredicateType::Range: {
-      return (lower_ == other.lower_ ||
-              (lower_ != nullptr && other.lower_ != nullptr &&
-               column_.type_info()->Compare(lower_, other.lower_) == 0)) &&
-             (upper_ == other.upper_ ||
-              (upper_ != nullptr && other.upper_ != nullptr &&
-               column_.type_info()->Compare(upper_, other.upper_) == 0));
+      auto bound_equal = [&] (const void* eleml, const void* elemr) {
+          return (eleml == elemr || (eleml != nullptr && elemr != nullptr &&
+                                     column_.type_info()->Compare(eleml, elemr) == 0));
+      };
+      return bound_equal(lower_, other.lower_) && bound_equal(upper_, other.upper_);
     };
     case PredicateType::InList: {
       if (values_.size() != other.values_.size()) return false;
@@ -712,6 +868,10 @@ bool ColumnPredicate::CheckValueInList(const void* value) const {
                             });
 }
 
+bool ColumnPredicate::CheckValueInBloomFilter(const void* value) const {
+  return EvaluateCell(column_.type_info()->physical_type(), value);
+}
+
 namespace {
 int SelectivityRank(const ColumnPredicate& predicate) {
   int rank;
@@ -721,7 +881,8 @@ int SelectivityRank(const ColumnPredicate& predicate) {
     case PredicateType::Equality: rank = 2; break;
     case PredicateType::InList: rank = 3; break;
     case PredicateType::Range: rank = 4; break;
-    case PredicateType::IsNotNull: rank = 5; break;
+    case PredicateType::InBloomFilter: rank = 5; break;
+    case PredicateType::IsNotNull: rank = 6; break;
     default: LOG(FATAL) << "unknown predicate type";
   }
   return rank * (kLargestTypeSize + 1) + predicate.column().type_info()->size();

http://git-wip-us.apache.org/repos/asf/kudu/blob/8af288a2/src/kudu/common/column_predicate.h
----------------------------------------------------------------------
diff --git a/src/kudu/common/column_predicate.h b/src/kudu/common/column_predicate.h
index 0155185..ea8385b 100644
--- a/src/kudu/common/column_predicate.h
+++ b/src/kudu/common/column_predicate.h
@@ -17,6 +17,9 @@
 
 #pragma once
 
+#include <cstddef>
+#include <cstdint>
+
 #include <algorithm>
 #include <ostream>
 #include <string>
@@ -28,6 +31,8 @@
 #include "kudu/common/common.pb.h"
 #include "kudu/common/schema.h"
 #include "kudu/common/types.h"
+#include "kudu/util/bloom_filter.h"
+#include "kudu/util/slice.h"
 
 namespace kudu {
 
@@ -56,6 +61,10 @@ enum class PredicateType {
   // A predicate which evaluates to true if the column value is present in
   // a value list.
   InList,
+
+  // A predicate which evaluates to true if the column value is present in
+  // a bloom filter.
+  InBloomFilter,
 };
 
 // A predicate which can be evaluated over a block of column values.
@@ -73,6 +82,8 @@ enum class PredicateType {
 class ColumnPredicate {
  public:
 
+  class BloomFilterInner;
+
   // Creates a new equality predicate on the column and value.
   //
   // The value is not copied, and must outlive the returned predicate.
@@ -130,6 +141,12 @@ class ColumnPredicate {
   // The InList will be simplified into an Equality, Range or None if possible.
   static ColumnPredicate InList(ColumnSchema column, std::vector<const void*>* values);
 
+  // Create a new BloomFilter predicate for the column.
+  //
+  // The values are not copied, and must outlive the returned predicate.
+  static ColumnPredicate InBloomFilter(ColumnSchema column, std::vector<BloomFilterInner>* bfs,
+                                       const void* lower, const void* upper);
+
   // Creates a new predicate which matches no values.
   static ColumnPredicate None(ColumnSchema column);
 
@@ -174,12 +191,13 @@ class ColumnPredicate {
       case PredicateType::Range: {
         if (lower_ == nullptr) {
           return DataTypeTraits<PhysicalType>::Compare(cell, this->upper_) < 0;
-        } else if (upper_ == nullptr) {
+        }
+        if (upper_ == nullptr) {
           return DataTypeTraits<PhysicalType>::Compare(cell, this->lower_) >= 0;
-        } else {
-          return DataTypeTraits<PhysicalType>::Compare(cell, this->upper_) < 0 &&
-                 DataTypeTraits<PhysicalType>::Compare(cell, this->lower_) >= 0;
         }
+        return DataTypeTraits<PhysicalType>::Compare(cell, this->upper_) < 0 &&
+               DataTypeTraits<PhysicalType>::Compare(cell, this->lower_) >= 0;
+
       };
       case PredicateType::Equality: {
         return DataTypeTraits<PhysicalType>::Compare(cell, this->lower_) == 0;
@@ -196,6 +214,9 @@ class ColumnPredicate {
                                     return DataTypeTraits<PhysicalType>::Compare(lhs, rhs) < 0;
                                   });
       };
+      case PredicateType::InBloomFilter: {
+        return EvaluateCellForBloomFilter<PhysicalType>(cell);
+      };
     }
     LOG(FATAL) << "unknown predicate type";
   }
@@ -233,6 +254,64 @@ class ColumnPredicate {
   const std::vector<const void*>& raw_values() const {
     return values_;
   }
+  // Returns bloom filters if this is a bloom filter predicate.
+  const std::vector<BloomFilterInner>& bloom_filters() const {
+    return bloom_filters_;
+  }
+
+  // This class represents the bloom filter used in predicate.
+  class BloomFilterInner {
+   public:
+
+    BloomFilterInner(Slice bloom_data, size_t nhash, HashAlgorithm hash_algorithm) :
+            bloom_data_(bloom_data),
+            nhash_(nhash),
+            hash_algorithm_(hash_algorithm) {
+    }
+
+    BloomFilterInner() : nhash_(0), hash_algorithm_(CITY_HASH) {}
+
+    const Slice& bloom_data() const {
+      return bloom_data_;
+    }
+
+    size_t nhash() const {
+      return nhash_;
+    }
+
+    HashAlgorithm hash_algorithm() const {
+      return hash_algorithm_;
+    }
+
+    void set_nhash(size_t nhash) {
+      nhash_ = nhash;
+    }
+
+    void set_bloom_data(Slice bloom_data) {
+      bloom_data_ = bloom_data;
+    }
+
+    void set_hash_algorithm(HashAlgorithm hash_algorithm) {
+      hash_algorithm_ = hash_algorithm;
+    }
+
+    bool operator==(const BloomFilterInner& other) const {
+      return (bloom_data_ == other.bloom_data() &&
+              nhash_ == other.nhash() &&
+              hash_algorithm_ == other.hash_algorithm());
+    }
+
+   private:
+
+    // The slice of bloom filter.
+    Slice bloom_data_;
+
+    // The times of hash value used in bloom filter.
+    size_t nhash_;
+
+    // The hash algorithm used in bloom filter.
+    HashAlgorithm hash_algorithm_;
+  };
 
  private:
 
@@ -249,6 +328,13 @@ class ColumnPredicate {
                   ColumnSchema column,
                   std::vector<const void*>* values);
 
+  // Creates a new BloomFilter column predicate.
+  ColumnPredicate(PredicateType predicate_type,
+                  ColumnSchema column,
+                  std::vector<BloomFilterInner>* bfs,
+                  const void* lower,
+                  const void* upper);
+
   // Transition to a None predicate type.
   void SetToNone();
 
@@ -267,14 +353,49 @@ class ColumnPredicate {
   // Merge another predicate into this IS NULL predicate.
   void MergeIntoIsNull(const ColumnPredicate& other);
 
+  // Merge another predicate into this Bloom Fiter predicate.
+  void MergeIntoBloomFilter(const ColumnPredicate& other);
+
+  // Merge another predicate into this InList predicate.
+  void MergeIntoInList(const ColumnPredicate& other);
+
   // Templated evaluation to inline the dispatch of comparator. Templating this
   // allows dispatch to occur only once per batch.
   template <DataType PhysicalType>
   void EvaluateForPhysicalType(const ColumnBlock& block,
                                SelectionVector* sel) const;
 
-  // Merge another predicate into this InList predicate.
-  void MergeIntoInList(const ColumnPredicate& other);
+  // Evaluate the bloom filter and avoid the predicate type check on a single cell.
+  template <DataType PhysicalType>
+  bool EvaluateCellForBloomFilter(const void* cell) const {
+    typedef typename DataTypeTraits<PhysicalType>::cpp_type cpp_type;
+    size_t size = sizeof(cpp_type);
+    const void* data = cell;
+    if (PhysicalType == BINARY) {
+      const Slice *slice = reinterpret_cast<const Slice *>(cell);
+      size = slice->size();
+      data = slice->data();
+    }
+    Slice cell_slice(reinterpret_cast<const uint8_t*>(data), size);
+    for (const auto& bf : bloom_filters_) {
+      BloomKeyProbe probe(cell_slice, bf.hash_algorithm());
+      if (!BloomFilter(bf.bloom_data(), bf.nhash()).MayContainKey(probe)) {
+        return false;
+      }
+    }
+    // Check optional lower and upper bound.
+    if (lower_ != nullptr && upper_ != nullptr) {
+      return DataTypeTraits<PhysicalType>::Compare(cell, this->upper_) < 0 &&
+             DataTypeTraits<PhysicalType>::Compare(cell, this->lower_) >= 0;
+    }
+    if (upper_ != nullptr) {
+      return DataTypeTraits<PhysicalType>::Compare(cell, this->upper_) < 0;
+    }
+    if (lower_ != nullptr) {
+      return DataTypeTraits<PhysicalType>::Compare(cell, this->lower_) >= 0;
+    }
+    return true;
+  }
 
   // For a Range type predicate, this helper function checks
   // whether a given value is in the range.
@@ -284,6 +405,10 @@ class ColumnPredicate {
   // whether a given value is in the list.
   bool CheckValueInList(const void* value) const;
 
+  // For an BloomFilter type predicate, this helper function checks
+  // whether a given value is in the BloomFilter.
+  bool CheckValueInBloomFilter(const void* value) const;
+
   // The type of this predicate.
   PredicateType predicate_type_;
 
@@ -299,6 +424,9 @@ class ColumnPredicate {
 
   // The list of values to check column against if this is an InList predicate.
   std::vector<const void*> values_;
+
+  // The list of bloom filter in this predicate.
+  std::vector<BloomFilterInner> bloom_filters_;
 };
 
 // Compares predicates according to selectivity. Predicates that match fewer

http://git-wip-us.apache.org/repos/asf/kudu/blob/8af288a2/src/kudu/common/common.proto
----------------------------------------------------------------------
diff --git a/src/kudu/common/common.proto b/src/kudu/common/common.proto
index b3d323e..f0e9a33 100644
--- a/src/kudu/common/common.proto
+++ b/src/kudu/common/common.proto
@@ -285,6 +285,13 @@ enum ReplicaSelection {
   CLOSEST_REPLICA = 2;
 }
 
+// The hash algorithm used in bloom filter and hash bucket.
+enum HashAlgorithm {
+  UNKNOWN_HASH = 0;
+  MURMUR_HASH_2 = 1;
+  CITY_HASH = 2;
+}
+
 // The serialized format of a Kudu table partition schema.
 message PartitionSchemaPB {
 
@@ -319,11 +326,6 @@ message PartitionSchemaPB {
     // input.
     optional uint32 seed = 3;
 
-    enum HashAlgorithm {
-      UNKNOWN = 0;
-      MURMUR_HASH_2 = 1;
-    }
-
     // The hash algorithm to use for calculating the hash bucket.
     optional HashAlgorithm hash_algorithm = 4;
   }
@@ -348,6 +350,15 @@ message ColumnPredicatePB {
   // The predicate column name.
   optional string column = 1;
 
+  // Represent a bloom filter.
+  message BloomFilter {
+    // The hash times for bloom filter.
+    optional int32 nhash = 1;
+    // The bloom filter bitmap.
+    optional bytes bloom_data = 2 [(kudu.REDACT) = true];
+    optional HashAlgorithm hash_algorithm = 3 [default = CITY_HASH];
+  }
+
   message Range {
 
     // Bounds should be encoded as follows:
@@ -357,8 +368,7 @@ message ColumnPredicatePB {
     //
     // Note that this predicate type should not be used for NULL data --
     // NULL is defined to neither be greater than or less than other values
-    // for the comparison operator. We will eventually add a special
-    // predicate type for null-ness.
+    // for the comparison operator.
 
     // The inclusive lower bound.
     optional bytes lower = 1 [(kudu.REDACT) = true];
@@ -383,12 +393,28 @@ message ColumnPredicatePB {
 
   message IsNull {}
 
+  message InBloomFilter {
+    // A list of bloom filters for the field.
+    repeated BloomFilter bloom_filters = 1;
+
+    // Lower and Upper is optional for InBloomFilter.
+    // When use both InBloomFilter and Range predicate for the same column the
+    // merge result can be InBloomFilter whith range bound inside. And the lower
+    // and upper works just like they in Range predicate.
+    // The inclusive lower bound.
+    optional bytes lower = 2 [(kudu.REDACT) = true];
+
+    // The exclusive upper bound.
+    optional bytes upper = 3 [(kudu.REDACT) = true];
+  }
+
   oneof predicate {
     Range range = 2;
     Equality equality = 3;
     IsNotNull is_not_null = 4;
     InList in_list = 5;
     IsNull is_null = 6;
+    InBloomFilter in_bloom_filter = 7;
   }
 }
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/8af288a2/src/kudu/common/key_util.cc
----------------------------------------------------------------------
diff --git a/src/kudu/common/key_util.cc b/src/kudu/common/key_util.cc
index ed8dd86..ec98dec 100644
--- a/src/kudu/common/key_util.cc
+++ b/src/kudu/common/key_util.cc
@@ -231,6 +231,7 @@ int PushUpperBoundKeyPredicates(ColIdxIter first,
         memcpy(row->mutable_cell_ptr(*col_idx_it), predicate->raw_lower(), size);
         pushed_predicates++;
         break;
+      case PredicateType::InBloomFilter:  // Upper in InBloomFilter processed as upper in Range.
       case PredicateType::Range:
         if (predicate->raw_upper() != nullptr) {
           memcpy(row->mutable_cell_ptr(*col_idx_it), predicate->raw_upper(), size);
@@ -297,6 +298,7 @@ int PushLowerBoundKeyPredicates(ColIdxIter first,
     size_t size = column.type_info()->size();
 
     switch (predicate->predicate_type()) {
+      case PredicateType::InBloomFilter: // Lower in InBloomFilter processed as lower in Range.
       case PredicateType::Range:
         if (predicate->raw_lower() == nullptr) {
           break_loop = true;

http://git-wip-us.apache.org/repos/asf/kudu/blob/8af288a2/src/kudu/common/scan_spec.cc
----------------------------------------------------------------------
diff --git a/src/kudu/common/scan_spec.cc b/src/kudu/common/scan_spec.cc
index f5caab8..0660c10 100644
--- a/src/kudu/common/scan_spec.cc
+++ b/src/kudu/common/scan_spec.cc
@@ -193,6 +193,10 @@ void ScanSpec::PushPredicatesIntoPrimaryKeyBounds(const Schema& schema,
         // InList predicates should not be removed as the full constraints imposed by an InList
         // cannot be translated into only a single set of lower and upper bound primary keys
         break;
+      } else if (type == PredicateType::InBloomFilter) {
+        // InBloomFilter predicates should not be removed as the full constraints imposed by bloom
+        // filters cannot be translated into only a single set of lower and upper bound primary keys
+        break;
       } else {
         LOG(FATAL) << "Can not remove unknown predicate type";
       }

http://git-wip-us.apache.org/repos/asf/kudu/blob/8af288a2/src/kudu/common/wire_protocol-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/common/wire_protocol-test.cc b/src/kudu/common/wire_protocol-test.cc
index 3d06b22..9c11de3 100644
--- a/src/kudu/common/wire_protocol-test.cc
+++ b/src/kudu/common/wire_protocol-test.cc
@@ -15,8 +15,11 @@
 // specific language governing permissions and limitations
 // under the License.
 
+#include "kudu/common/wire_protocol.h"
+
 #include <cstddef>
 #include <cstdint>
+#include <memory>
 #include <string>
 #include <vector>
 
@@ -29,9 +32,10 @@
 #include "kudu/common/row.h"
 #include "kudu/common/rowblock.h"
 #include "kudu/common/schema.h"
-#include "kudu/common/wire_protocol.h"
 #include "kudu/common/wire_protocol.pb.h"
+#include "kudu/gutil/port.h"
 #include "kudu/util/bitmap.h"
+#include "kudu/util/bloom_filter.h"
 #include "kudu/util/faststring.h"
 #include "kudu/util/hexdump.h"
 #include "kudu/util/memory/arena.h"
@@ -43,6 +47,7 @@
 #include "kudu/util/test_util.h"
 
 using std::string;
+using std::unique_ptr;
 using std::vector;
 
 namespace kudu {
@@ -499,4 +504,125 @@ TEST_F(WireProtocolTest, TestColumnPredicateInList) {
     ASSERT_TRUE(ColumnPredicateFromPB(schema, &arena, pb, &predicate).IsInvalidArgument());
   }
 }
+
+class BFWireProtocolTest : public KuduTest {
+ public:
+  BFWireProtocolTest()
+      : schema_({ ColumnSchema("col1", INT32)}, 1),
+        arena_(1024),
+        n_keys_(100) {
+    bfb1_.reset(new BloomFilterBuilder(BloomFilterSizing::ByCountAndFPRate(n_keys_, 0.01)));
+    bfb2_.reset(new BloomFilterBuilder(BloomFilterSizing::ByCountAndFPRate(n_keys_, 0.01)));
+  }
+
+  virtual void SetUp() OVERRIDE {
+    double expected_fp_rate1 = bfb1()->false_positive_rate();
+    ASSERT_NEAR(expected_fp_rate1, 0.01, 0.002);
+    ASSERT_EQ(9, bfb1()->n_bits() / n_keys_);
+    double expected_fp_rate2 = bfb2()->false_positive_rate();
+    ASSERT_NEAR(expected_fp_rate2, 0.01, 0.002);
+    ASSERT_EQ(9, bfb2()->n_bits() / n_keys_);
+    for (int i = 0; i < n_keys_; ++i) {
+      Slice key_slice(reinterpret_cast<const uint8_t*>(&i), sizeof(i));
+      BloomKeyProbe probe(key_slice, MURMUR_HASH_2);
+      bfb1()->AddKey(probe);
+      bfb2()->AddKey(probe);
+    }
+  }
+
+  BloomFilterBuilder* bfb1() const { return bfb1_.get(); }
+
+  BloomFilterBuilder* bfb2() const { return bfb1_.get(); }
+
+protected:
+  Schema schema_;
+  Arena arena_;
+  int n_keys_;
+  unique_ptr<BloomFilterBuilder> bfb1_;
+  unique_ptr<BloomFilterBuilder> bfb2_;
+};
+
+TEST_F(BFWireProtocolTest, TestColumnPredicateBloomFilter) {
+  boost::optional<ColumnPredicate> predicate;
+  ColumnSchema col1 = schema_.column(0);
+  { // Single BloomFilter predicate.
+    vector<kudu::ColumnPredicate::BloomFilterInner> bfs;
+    bfs.emplace_back(bfb1()->slice(), bfb1()->n_hashes(), MURMUR_HASH_2);
+    kudu::ColumnPredicate ibf = kudu::ColumnPredicate::InBloomFilter(col1, &bfs, nullptr, nullptr);
+    ColumnPredicatePB pb;
+    NO_FATALS(ColumnPredicateToPB(ibf, &pb));
+    ASSERT_OK(ColumnPredicateFromPB(schema_, &arena_, pb, &predicate));
+    ASSERT_EQ(predicate->predicate_type(), PredicateType::InBloomFilter);
+    ASSERT_EQ(predicate, ibf);
+  }
+
+  { // Multi BloomFilter predicate.
+    vector<kudu::ColumnPredicate::BloomFilterInner> bfs;
+    bfs.emplace_back(bfb1()->slice(), bfb1()->n_hashes(), MURMUR_HASH_2);
+    bfs.emplace_back(bfb2()->slice(), bfb2()->n_hashes(), MURMUR_HASH_2);
+    kudu::ColumnPredicate ibf = kudu::ColumnPredicate::InBloomFilter(col1, &bfs, nullptr, nullptr);
+    ColumnPredicatePB pb;
+    NO_FATALS(ColumnPredicateToPB(ibf, &pb));
+    ASSERT_OK(ColumnPredicateFromPB(schema_, &arena_, pb, &predicate));
+    ASSERT_EQ(predicate->predicate_type(), PredicateType::InBloomFilter);
+    ASSERT_EQ(predicate, ibf);
+  }
+}
+
+TEST_F(BFWireProtocolTest, TestColumnPredicateBloomFilterWithBound) {
+  boost::optional<ColumnPredicate> predicate;
+  ColumnSchema col1 = schema_.column(0);
+  { // Simply BloomFilter with lower bound.
+    int lower = 1;
+    vector<kudu::ColumnPredicate::BloomFilterInner> bfs;
+    bfs.emplace_back(bfb1()->slice(), bfb1()->n_hashes(), MURMUR_HASH_2);
+    kudu::ColumnPredicate ibf = kudu::ColumnPredicate::InBloomFilter(col1, &bfs, &lower, nullptr);
+    ColumnPredicatePB pb;
+    NO_FATALS(ColumnPredicateToPB(ibf, &pb));
+    ASSERT_OK(ColumnPredicateFromPB(schema_, &arena_, pb, &predicate));
+    ASSERT_EQ(predicate->predicate_type(), PredicateType::InBloomFilter);
+    ASSERT_EQ(predicate, ibf);
+  }
+
+  { // Single bloom filter with upper bound.
+    int upper = 4;
+    vector<kudu::ColumnPredicate::BloomFilterInner> bfs;
+    bfs.emplace_back(bfb1()->slice(), bfb1()->n_hashes(), MURMUR_HASH_2);
+    kudu::ColumnPredicate ibf = kudu::ColumnPredicate::InBloomFilter(col1, &bfs, nullptr, &upper);
+    ColumnPredicatePB pb;
+    NO_FATALS(ColumnPredicateToPB(ibf, &pb));
+    ASSERT_OK(ColumnPredicateFromPB(schema_, &arena_, pb, &predicate));
+    ASSERT_EQ(predicate->predicate_type(), PredicateType::InBloomFilter);
+    ASSERT_EQ(predicate, ibf);
+  }
+
+  { // Single bloom filter with both lower and upper bound.
+    int lower = 1;
+    int upper = 4;
+    vector<kudu::ColumnPredicate::BloomFilterInner> bfs;
+    bfs.emplace_back(bfb1()->slice(), bfb1()->n_hashes(), MURMUR_HASH_2);
+    kudu::ColumnPredicate ibf = kudu::ColumnPredicate::InBloomFilter(col1, &bfs, &lower, &upper);
+    ColumnPredicatePB pb;
+    NO_FATALS(ColumnPredicateToPB(ibf, &pb));
+    ASSERT_OK(ColumnPredicateFromPB(schema_, &arena_, pb, &predicate));
+    ASSERT_EQ(predicate->predicate_type(), PredicateType::InBloomFilter);
+    ASSERT_EQ(predicate, ibf);
+  }
+
+  { // Multi bloom filter with both lower and upper bound.
+    int lower = 1;
+    int upper = 4;
+    vector<kudu::ColumnPredicate::BloomFilterInner> bfs;
+    bfs.emplace_back(bfb1()->slice(), bfb1()->n_hashes(), MURMUR_HASH_2);
+    bfs.emplace_back(bfb2()->slice(), bfb2()->n_hashes(), MURMUR_HASH_2);
+    kudu::ColumnPredicate ibf = kudu::ColumnPredicate::InBloomFilter(col1, &bfs, &lower, &upper);
+    ColumnPredicatePB pb;
+    NO_FATALS(ColumnPredicateToPB(ibf, &pb));
+    ASSERT_OK(ColumnPredicateFromPB(schema_, &arena_, pb, &predicate));
+    ASSERT_EQ(predicate->predicate_type(), PredicateType::InBloomFilter);
+    ASSERT_EQ(predicate->bloom_filters().size(), ibf.bloom_filters().size());
+    ASSERT_EQ(predicate, ibf);
+  }
+}
+
 } // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/8af288a2/src/kudu/common/wire_protocol.cc
----------------------------------------------------------------------
diff --git a/src/kudu/common/wire_protocol.cc b/src/kudu/common/wire_protocol.cc
index 0a5ce2a..2aab6f0 100644
--- a/src/kudu/common/wire_protocol.cc
+++ b/src/kudu/common/wire_protocol.cc
@@ -409,6 +409,16 @@ void CopyPredicateBoundToPB(const ColumnSchema& col, const void* bound_src, stri
   bound_dst->assign(reinterpret_cast<const char*>(src), size);
 }
 
+// Copies a predicate bloom filter data from 'bf_src' into 'bf_dst'.
+void CopyPredicateBloomFilterToPB(const ColumnPredicate::BloomFilterInner& bf_src,
+                                  ColumnPredicatePB::BloomFilter* bf_dst) {
+  bf_dst->set_nhash(bf_src.nhash());
+  const void* src = bf_src.bloom_data().data();
+  size_t size = bf_src.bloom_data().size();
+  bf_dst->mutable_bloom_data()->assign(reinterpret_cast<const char*>(src), size);
+  bf_dst->set_hash_algorithm(bf_src.hash_algorithm());
+}
+
 // Extract a void* pointer suitable for use in a ColumnRangePredicate from the
 // string protobuf bound. This validates that the pb_value has the correct
 // length, copies the data into 'arena', and sets *result to point to it.
@@ -439,6 +449,21 @@ Status CopyPredicateBoundFromPB(const ColumnSchema& schema,
 
   return Status::OK();
 }
+
+// Extract BloomFilterInner from bloom data for ColumnBloomFilterPredicate.
+Status CopyPredicateBloomFilterFromPB(const ColumnPredicatePB::BloomFilter& bf_src,
+                                      ColumnPredicate::BloomFilterInner* dst_src,
+                                      Arena* arena) {
+  size_t bloom_data_size = bf_src.bloom_data().size();
+  dst_src->set_nhash(bf_src.nhash());
+  // Copy the data from the protobuf into the Arena.
+  uint8_t* data_copy = static_cast<uint8_t*>(arena->AllocateBytes(bloom_data_size));
+  memcpy(data_copy, bf_src.bloom_data().data(), bloom_data_size);
+  dst_src->set_bloom_data(Slice(data_copy, bloom_data_size));
+  dst_src->set_hash_algorithm(bf_src.hash_algorithm());
+  return Status::OK();
+}
+
 } // anonymous namespace
 
 void ColumnPredicateToPB(const ColumnPredicate& predicate,
@@ -481,6 +506,25 @@ void ColumnPredicateToPB(const ColumnPredicate& predicate,
       return;
     };
     case PredicateType::None: LOG(FATAL) << "None predicate may not be converted to protobuf";
+    case PredicateType::InBloomFilter: {
+      auto* bloom_filter_pred = pb->mutable_in_bloom_filter();
+      for (const auto& bf : predicate.bloom_filters()) {
+        ColumnPredicatePB::BloomFilter* bloom_filter = bloom_filter_pred->add_bloom_filters();
+        CopyPredicateBloomFilterToPB(bf, bloom_filter);
+      }
+      // Form the optional lower and upper bound.
+      if (predicate.raw_lower() != nullptr) {
+        CopyPredicateBoundToPB(predicate.column(),
+                               predicate.raw_lower(),
+                               bloom_filter_pred->mutable_lower());
+      }
+      if (predicate.raw_upper() != nullptr) {
+        CopyPredicateBoundToPB(predicate.column(),
+                               predicate.raw_upper(),
+                               bloom_filter_pred->mutable_upper());
+      }
+      return;
+    }
   }
   LOG(FATAL) << "unknown predicate type";
 }
@@ -546,9 +590,40 @@ Status ColumnPredicateFromPB(const Schema& schema,
       break;
     };
     case ColumnPredicatePB::kIsNull: {
-        *predicate = ColumnPredicate::IsNull(col);
-        break;
+      *predicate = ColumnPredicate::IsNull(col);
+      break;
+    };
+    case ColumnPredicatePB::kInBloomFilter: {
+      const auto& in_bloom_filter = pb.in_bloom_filter();
+      vector<ColumnPredicate::BloomFilterInner> bloom_filters;
+      if (in_bloom_filter.bloom_filters_size() == 0) {
+        return Status::InvalidArgument("Invalid in bloom filter predicate on column: "
+                                       "no bloom filter contained", col.name());
+      }
+      for (const auto& bf : in_bloom_filter.bloom_filters()) {
+        if (!bf.has_nhash()
+            || !bf.has_bloom_data()
+            || !bf.has_hash_algorithm()
+            || bf.hash_algorithm() == UNKNOWN_HASH) {
+          return Status::InvalidArgument("Invalid in bloom filter predicate on column: "
+                                         "missing bloom filter details", col.name());
+        }
+        ColumnPredicate::BloomFilterInner bloom_filter;
+        RETURN_NOT_OK(CopyPredicateBloomFilterFromPB(bf, &bloom_filter, arena));
+        bloom_filters.emplace_back(bloom_filter);
+      }
+      // Extract the optional lower and upper bound.
+      const void* lower = nullptr;
+      const void* upper = nullptr;
+      if (in_bloom_filter.has_lower()) {
+        RETURN_NOT_OK(CopyPredicateBoundFromPB(col, in_bloom_filter.lower(), arena, &lower));
       }
+      if (in_bloom_filter.has_upper()) {
+        RETURN_NOT_OK(CopyPredicateBoundFromPB(col, in_bloom_filter.upper(), arena, &upper));
+      }
+      *predicate = ColumnPredicate::InBloomFilter(col, &bloom_filters, lower, upper);
+      break;
+    };
     default: return Status::InvalidArgument("Unknown predicate type for column", col.name());
   }
   return Status::OK();

http://git-wip-us.apache.org/repos/asf/kudu/blob/8af288a2/src/kudu/tablet/cfile_set-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/cfile_set-test.cc b/src/kudu/tablet/cfile_set-test.cc
index cef4b71..4ccdb17 100644
--- a/src/kudu/tablet/cfile_set-test.cc
+++ b/src/kudu/tablet/cfile_set-test.cc
@@ -15,8 +15,12 @@
 // specific language governing permissions and limitations
 // under the License.
 
+#include "kudu/tablet/cfile_set.h"
+
+#include <algorithm>
 #include <cstddef>
 #include <cstdint>
+#include <iterator>
 #include <memory>
 #include <ostream>
 #include <string>
@@ -24,8 +28,8 @@
 
 #include <gflags/gflags.h>
 #include <gflags/gflags_declare.h>
-#include <gtest/gtest.h>
 #include <glog/logging.h>
+#include <gtest/gtest.h>
 
 #include "kudu/common/column_materialization_context.h"
 #include "kudu/common/column_predicate.h"
@@ -44,13 +48,13 @@
 #include "kudu/gutil/port.h"
 #include "kudu/gutil/stringprintf.h"
 #include "kudu/gutil/strings/stringpiece.h"
-#include "kudu/tablet/cfile_set.h"
 #include "kudu/tablet/diskrowset.h"
 #include "kudu/tablet/tablet-test-util.h"
 #include "kudu/util/auto_release_pool.h"
 #include "kudu/util/bloom_filter.h"
 #include "kudu/util/mem_tracker.h"
 #include "kudu/util/memory/arena.h"
+#include "kudu/util/slice.h"
 #include "kudu/util/status.h"
 #include "kudu/util/test_macros.h"
 
@@ -100,6 +104,75 @@ class TestCFileSet : public KuduRowSetTest {
     ASSERT_OK(rsw.Finish());
   }
 
+  // Int32 type add probe to the bloom filter.
+  // bf1_contain: 0 2 4 6 8 ... (2n)th key for column 1 to form bloom filter.
+  // bf1_exclude: 1 3 5 7 9 ... (2n + 1)th key for column 1 to form bloom filter.
+  // bf2_contain: 0 2 4 6 8 ... (2n)th key for column 2 to form bloom filter.
+  // bf2_exclude: 1 3 5 7 9 ... (2n + 1)th key for column 2 to form bloom filter.
+  void FillBloomFilter(int nrows,
+                       BloomFilterBuilder* bf1_contain,
+                       BloomFilterBuilder* bf1_exclude,
+                       BloomFilterBuilder* bf2_contain,
+                       BloomFilterBuilder* bf2_exclude) {
+    int ratio[] = {2, 10, 100};
+    bool add = true;
+    for (int i = 0; i < nrows; ++i) {
+      int curr1 = i * ratio[0];
+      int curr2 = i * ratio[1];
+      Slice first(reinterpret_cast<const uint8_t*>(&curr1), sizeof(curr1));
+      Slice second(reinterpret_cast<const uint8_t*>(&curr2), sizeof(curr2));
+      BloomKeyProbe probe1(first, MURMUR_HASH_2);
+      BloomKeyProbe probe2(second, MURMUR_HASH_2);
+
+      if (add) {
+        bf1_contain->AddKey(probe1);
+        bf2_contain->AddKey(probe2);
+      } else {
+        bf1_exclude->AddKey(probe1);
+        bf2_exclude->AddKey(probe2);
+      }
+      add = !add;
+    }
+  }
+
+  // Int32 type add probe to the bloom filter.
+  // ret1_contain: to get the key hits in bf1_contain for column 1.
+  // ret1_exclude: to get the key hits in bf1_exclude for column 1.
+  // ret2_contain: to get the key hits in bf2_contain for column 2.
+  // ret2_exclude: to get the key hits in bf2_exclude for column 2.
+  // In some case key may hit both contain and exclude bloom filter
+  // so we get accurate item hits the bloom filter for test behind.
+  void GetBloomFilterResult(int nrows, BloomFilterBuilder* bf1_contain,
+                            BloomFilterBuilder* bf1_exclude,
+                            BloomFilterBuilder* bf2_contain,
+                            BloomFilterBuilder* bf2_exclude,
+                            vector<size_t>* ret1_contain,
+                            vector<size_t>* ret1_exclude,
+                            vector<size_t>* ret2_contain,
+                            vector<size_t>* ret2_exclude) {
+    int ratio[] = {2, 10, 100};
+    for (int i = 0; i < nrows; ++i) {
+      int curr1 = i * ratio[0];
+      int curr2 = i * ratio[1];
+      Slice first(reinterpret_cast<const uint8_t*>(&curr1), sizeof(curr1));
+      Slice second(reinterpret_cast<const uint8_t*>(&curr2), sizeof(curr2));
+      BloomKeyProbe probe1(first, MURMUR_HASH_2);
+      BloomKeyProbe probe2(second, MURMUR_HASH_2);
+      if (BloomFilter(bf1_contain->slice(), bf1_contain->n_hashes()).MayContainKey(probe1)) {
+        ret1_contain->push_back(i);
+      }
+      if (BloomFilter(bf1_exclude->slice(), bf1_exclude->n_hashes()).MayContainKey(probe1)) {
+        ret1_exclude->push_back(i);
+      }
+      if (BloomFilter(bf2_contain->slice(), bf2_contain->n_hashes()).MayContainKey(probe2)) {
+        ret2_contain->push_back(i);
+      }
+      if (BloomFilter(bf2_exclude->slice(), bf2_exclude->n_hashes()).MayContainKey(probe2)) {
+        ret2_exclude->push_back(i);
+      }
+    }
+  }
+
   // Issue a range scan between 'lower' and 'upper', and verify that all result
   // rows indeed fall inside that predicate.
   void DoTestRangeScan(const shared_ptr<CFileSet> &fileset,
@@ -135,6 +208,47 @@ class TestCFileSet : public KuduRowSetTest {
     }
   }
 
+  // Issue a BloomFilter scan and verify that all result
+  // rows indeed fall inside that predicate.
+  void DoTestBloomFilterScan(const shared_ptr<CFileSet>& fileset,
+                             vector<ColumnPredicate> predicates,
+                             vector<size_t> target) {
+    LOG(INFO) << "predicates size: " << predicates.size();
+    // Create iterator.
+    shared_ptr<CFileSet::Iterator> cfile_iter(fileset->NewIterator(&schema_));
+    gscoped_ptr<RowwiseIterator> iter(new MaterializingIterator(cfile_iter));
+    LOG(INFO) << "Target size: " << target.size();
+    // Create a scan with a range predicate on the key column.
+    ScanSpec spec;
+    for (const auto& pred : predicates) {
+      spec.AddPredicate(pred);
+    }
+    ASSERT_OK(iter->Init(&spec));
+    // Check that the range was respected on all the results.
+    Arena arena(1024);
+    RowBlock block(schema_, 100, &arena);
+    while (iter->HasNext()) {
+      ASSERT_OK_FAST(iter->NextBlock(&block));
+      for (size_t i = 0; i < block.nrows(); i++) {
+        if (block.selection_vector()->IsRowSelected(i)) {
+          RowBlockRow row = block.row(i);
+          size_t index = row.row_index();
+          vector<size_t>::iterator iter = std::find(target.begin(), target.end(), index);
+          if (iter == target.end()) {
+            FAIL() << "Row " << schema_.DebugRow(row) << " should not have "
+                   << "passed predicate ";
+          }
+          target.erase(iter);
+        }
+      }
+    }
+    LOG(INFO) << "Selected size: " << block.selection_vector()->CountSelected();
+    if (!target.empty()) {
+      FAIL() << "Target size " << target.size() << " should have "
+             << "passed predicate ";
+    }
+  }
+
   Status MaterializeColumn(CFileSet::Iterator *iter,
                            size_t col_idx,
                            ColumnBlock *cb) {
@@ -349,6 +463,99 @@ TEST_F(TestCFileSet, TestRangePredicates2) {
   DoTestRangeScan(fileset, kNumRows * 10, kNoBound);
 }
 
+TEST_F(TestCFileSet, TestBloomFilterPredicates) {
+  const int kNumRows = 100;
+  BloomFilterBuilder bfb1_contain(
+          BloomFilterSizing::ByCountAndFPRate(kNumRows, 0.01));
+  double expected_fp_rate1 = bfb1_contain.false_positive_rate();
+  ASSERT_NEAR(expected_fp_rate1, 0.01, 0.002);
+  ASSERT_EQ(9, bfb1_contain.n_bits() / kNumRows);
+
+  BloomFilterBuilder bfb1_exclude(
+          BloomFilterSizing::ByCountAndFPRate(kNumRows, 0.01));
+  double expected_fp_rate11 = bfb1_exclude.false_positive_rate();
+  ASSERT_NEAR(expected_fp_rate11, 0.01, 0.002);
+  ASSERT_EQ(9, bfb1_exclude.n_bits() / kNumRows);
+
+  BloomFilterBuilder bfb2_contain(
+          BloomFilterSizing::ByCountAndFPRate(kNumRows, 0.01));
+  double expected_fp_rate2 = bfb2_contain.false_positive_rate();
+  ASSERT_NEAR(expected_fp_rate2, 0.01, 0.002);
+  ASSERT_EQ(9, bfb2_contain.n_bits() / kNumRows);
+
+  BloomFilterBuilder bfb2_exclude(
+          BloomFilterSizing::ByCountAndFPRate(kNumRows, 0.01));
+  double expected_fp_rate22 = bfb2_exclude.false_positive_rate();
+  ASSERT_NEAR(expected_fp_rate22, 0.01, 0.002);
+  ASSERT_EQ(9, bfb2_exclude.n_bits() / kNumRows);
+
+  WriteTestRowSet(kNumRows);
+  vector<size_t> ret1_contain;
+  vector<size_t> ret1_exclude;
+  vector<size_t> ret2_contain;
+  vector<size_t> ret2_exclude;
+  FillBloomFilter(kNumRows, &bfb1_contain, &bfb1_exclude, &bfb2_contain, &bfb2_exclude);
+  GetBloomFilterResult(kNumRows, &bfb1_contain, &bfb1_exclude, &bfb2_contain, &bfb2_exclude,
+                       &ret1_contain, &ret1_exclude, &ret2_contain, &ret2_exclude);
+
+  shared_ptr<CFileSet> fileset;
+  ASSERT_OK(CFileSet::Open(rowset_meta_, MemTracker::GetRootTracker(), &fileset));
+
+  vector<ColumnPredicate::BloomFilterInner> bfs;
+  // BloomFilter of column 0 contain.
+  ColumnPredicate::BloomFilterInner bf1_contain(bfb1_contain.slice(),
+                                                bfb1_contain.n_hashes(), MURMUR_HASH_2);
+  bfs.push_back(bf1_contain);
+  auto pred1_contain = ColumnPredicate::InBloomFilter(schema_.column(0), &bfs, nullptr, nullptr);
+  DoTestBloomFilterScan(fileset, { pred1_contain }, ret1_contain);
+
+  // BloomFilter of column 1 contain.
+  ColumnPredicate::BloomFilterInner bf2_contain(bfb2_contain.slice(),
+                                                bfb2_contain.n_hashes(), MURMUR_HASH_2);
+  bfs.clear();
+  bfs.push_back(bf2_contain);
+  auto pred2_contain = ColumnPredicate::InBloomFilter(schema_.column(1), &bfs, nullptr, nullptr);
+  DoTestBloomFilterScan(fileset, { pred2_contain }, ret2_contain);
+
+  // BloomFilter of column 0 contain and exclude.
+  ColumnPredicate::BloomFilterInner bf1_exclude(bfb1_exclude.slice(),
+                                                bfb1_exclude.n_hashes(), MURMUR_HASH_2);
+  bfs.clear();
+  bfs.push_back(bf1_contain);
+  bfs.push_back(bf1_exclude);
+  vector<size_t> ret1_contain_exclude;
+  auto pred1_contain_exclude = ColumnPredicate::InBloomFilter(schema_.column(0),
+                                                              &bfs, nullptr, nullptr);
+  std::set_intersection(ret1_contain.begin(), ret1_contain.end(), ret1_exclude.begin(),
+                        ret1_exclude.end(), std::back_inserter(ret1_contain_exclude));
+  DoTestBloomFilterScan(fileset, { pred1_contain_exclude }, ret1_contain_exclude);
+  // BloomFilter of column 0 contain and column 1 contain.
+  vector<size_t> ret12_contain_contain;
+  std::set_intersection(ret1_contain.begin(), ret1_contain.end(), ret2_contain.begin(),
+                        ret2_contain.end(), std::back_inserter(ret12_contain_contain));
+  DoTestBloomFilterScan(fileset, { pred1_contain, pred2_contain }, ret12_contain_contain);
+
+  // BloomFilter of column 0 contain with lower and upper bound.
+  int32_t lower = 8;
+  int32_t upper = 58;
+  int32_t lower_row_index = lower / 2;
+  int32_t upper_row_index = upper / 2;
+  vector<size_t> ret1_contain_range = ret1_contain;
+  vector<size_t>::iterator left = std::lower_bound(ret1_contain_range.begin(),
+                                  ret1_contain_range.end(), lower_row_index);
+  ret1_contain_range.erase(ret1_contain_range.begin(), left); // don't erase left
+  vector<size_t>::iterator right = std::lower_bound(ret1_contain_range.begin(),
+                                   ret1_contain_range.end(), upper_row_index);
+  ret1_contain_range.erase(right, ret1_contain_range.end()); // earse right
+  auto range = ColumnPredicate::Range(schema_.column(0), &lower, &upper);
+  DoTestBloomFilterScan(fileset, { pred1_contain, range }, ret1_contain_range);
+
+  // BloomFilter of column 0 contain with Range with column.
+  bfs.clear();
+  bfs.push_back(bf1_contain);
+  auto bf_with_range = ColumnPredicate::InBloomFilter(schema_.column(0), &bfs, &lower, &upper);
+  DoTestBloomFilterScan(fileset, { bf_with_range }, ret1_contain_range);
+}
 
 } // namespace tablet
 } // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/8af288a2/src/kudu/util/bloom_filter.h
----------------------------------------------------------------------
diff --git a/src/kudu/util/bloom_filter.h b/src/kudu/util/bloom_filter.h
index ad4e3eb..0905fda 100644
--- a/src/kudu/util/bloom_filter.h
+++ b/src/kudu/util/bloom_filter.h
@@ -20,11 +20,13 @@
 #include <cstddef>
 #include <cstdint>
 
+#include "kudu/common/common.pb.h"
 #include "kudu/gutil/gscoped_ptr.h"
 #include "kudu/gutil/hash/city.h"
 #include "kudu/gutil/macros.h"
 #include "kudu/gutil/port.h"
 #include "kudu/util/bitmap.h"
+#include "kudu/util/hash_util.h"
 #include "kudu/util/slice.h"
 
 namespace kudu {
@@ -52,11 +54,22 @@ class BloomKeyProbe {
   //
   // NOTE: proper operation requires that the referenced memory remain
   // valid for the lifetime of this object.
-  explicit BloomKeyProbe(const Slice &key) : key_(key) {
-    uint64_t h = util_hash::CityHash64(
-      reinterpret_cast<const char *>(key.data()),
-      key.size());
-
+  explicit BloomKeyProbe(const Slice &key, HashAlgorithm hash_algorithm = CITY_HASH)
+      : key_(key) {
+    uint64_t h = 0;
+    switch (hash_algorithm) {
+      case MURMUR_HASH_2:
+        h = HashUtil::MurmurHash2_64(
+                reinterpret_cast<const char *>(key.data()),
+                key.size(),
+                /*seed=*/0);
+        break;
+      case CITY_HASH:
+      default:
+        h = util_hash::CityHash64(
+                reinterpret_cast<const char *>(key.data()),
+                key.size());
+    }
     // Use the top and bottom halves of the 64-bit hash
     // as the two independent hash functions for mixing.
     h_1_ = static_cast<uint32_t>(h);