You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by da...@apache.org on 2018/10/11 22:50:51 UTC
kudu git commit: Implement BloomFilter Predicate in server side.
Repository: kudu
Updated Branches:
refs/heads/master c8dd7b53f -> 8af288a26
Implement BloomFilter Predicate in server side.
Change-Id: I62c2de42667d0255d94e19db773240f7f9ee636c
Reviewed-on: http://gerrit.cloudera.org:8080/11100
Tested-by: Kudu Jenkins
Reviewed-by: Dan Burkert <da...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/8af288a2
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/8af288a2
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/8af288a2
Branch: refs/heads/master
Commit: 8af288a26a204e2acfc3aa4e642fba7de56b43bb
Parents: c8dd7b5
Author: triplesheep <tr...@gmail.com>
Authored: Wed Aug 1 10:54:21 2018 +0000
Committer: Dan Burkert <da...@apache.org>
Committed: Thu Oct 11 22:50:37 2018 +0000
----------------------------------------------------------------------
src/kudu/common/column_predicate-test.cc | 287 ++++++++++++++++++++++++++
src/kudu/common/column_predicate.cc | 193 +++++++++++++++--
src/kudu/common/column_predicate.h | 140 ++++++++++++-
src/kudu/common/common.proto | 40 +++-
src/kudu/common/key_util.cc | 2 +
src/kudu/common/scan_spec.cc | 4 +
src/kudu/common/wire_protocol-test.cc | 128 +++++++++++-
src/kudu/common/wire_protocol.cc | 79 ++++++-
src/kudu/tablet/cfile_set-test.cc | 211 ++++++++++++++++++-
src/kudu/util/bloom_filter.h | 23 ++-
10 files changed, 1068 insertions(+), 39 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kudu/blob/8af288a2/src/kudu/common/column_predicate-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/common/column_predicate-test.cc b/src/kudu/common/column_predicate-test.cc
index d96cc41..e0b6b63 100644
--- a/src/kudu/common/column_predicate-test.cc
+++ b/src/kudu/common/column_predicate-test.cc
@@ -30,8 +30,10 @@
#include "kudu/common/schema.h"
#include "kudu/common/types.h"
#include "kudu/gutil/strings/substitute.h"
+#include "kudu/util/bloom_filter.h"
#include "kudu/util/int128.h"
#include "kudu/util/memory/arena.h"
+#include "kudu/util/random.h"
#include "kudu/util/slice.h"
#include "kudu/util/test_util.h"
@@ -41,6 +43,7 @@ namespace kudu {
class TestColumnPredicate : public KuduTest {
public:
+ TestColumnPredicate() : rand_(SeedRandom()) {}
// Test that when a is merged into b and vice versa, the result is equal to
// expected, and the resulting type is equal to type.
@@ -68,6 +71,42 @@ class TestColumnPredicate : public KuduTest {
ASSERT_EQ(b_base.predicate_type(), type);
}
+ void FillBloomFilterAndValues(int n_keys,
+ vector<uint64_t>* values,
+ BloomFilterBuilder* bfb1,
+ BloomFilterBuilder* bfb2) {
+ uint64_t current = 0;
+ for (int i = 0; i < 2; ++i) {
+ while (true) {
+ uint64_t key = rand_.Next();
+ if (key <= current) {
+ continue;
+ }
+ current = key;
+ Slice key_slice(reinterpret_cast<const uint8_t*>(&key), sizeof(key));
+ BloomKeyProbe probe(key_slice, MURMUR_HASH_2);
+ bfb1->AddKey(probe);
+ bfb2->AddKey(probe);
+ values->emplace_back(key);
+ break;
+ }
+ }
+ for (int i = 2; i < n_keys; ++i) {
+ while (true) {
+ uint64_t key = rand_.Next();
+ Slice key_slice(reinterpret_cast<const uint8_t*>(&key), sizeof(key));
+ BloomKeyProbe probe(key_slice, MURMUR_HASH_2);
+ BloomFilter bf(bfb1->slice(), bfb1->n_hashes());
+ if (!bf.MayContainKey(probe) && key > current) {
+ current = key;
+ values->emplace_back(key);
+ bfb2->AddKey(probe);
+ break;
+ }
+ }
+ }
+ }
+
template <typename T>
void TestMergeCombinations(const ColumnSchema& column, vector<T> values) {
// Range + Range
@@ -744,6 +783,184 @@ class TestColumnPredicate : public KuduTest {
ColumnPredicate::IsNull(column),
PredicateType::IsNull);
}
+
+ template <typename T>
+ void TestMergeBloomFilterCombinations(const ColumnSchema& column,
+ vector<ColumnPredicate::BloomFilterInner>* bf,
+ vector<T> values) {
+ vector<ColumnPredicate::BloomFilterInner> orig_bloom_filters = *bf;
+ // BloomFilter AND
+ // NONE
+ // =
+ // NONE
+ TestMerge(ColumnPredicate::InBloomFilter(column, bf, nullptr, nullptr),
+ ColumnPredicate::None(column),
+ ColumnPredicate::None(column),
+ PredicateType::None);
+
+ // BloomFilter AND
+ // Equality
+ // =
+ // Equality
+ *bf = orig_bloom_filters;
+ TestMerge(ColumnPredicate::InBloomFilter(column, bf, nullptr, nullptr),
+ ColumnPredicate::Equality(column, &values[0]),
+ ColumnPredicate::Equality(column, &values[0]),
+ PredicateType::Equality);
+
+ // BloomFilter AND
+ // Equality
+ // =
+ // None
+ *bf = orig_bloom_filters;
+ TestMerge(ColumnPredicate::InBloomFilter(column, bf, nullptr, nullptr),
+ ColumnPredicate::Equality(column, &values[2]),
+ ColumnPredicate::None(column),
+ PredicateType::None);
+
+ // BloomFilter AND
+ // IS NOT NULL
+ // =
+ // BloomFilter
+ *bf = orig_bloom_filters;
+ vector<ColumnPredicate::BloomFilterInner> bf_copy = *bf;
+ TestMerge(ColumnPredicate::InBloomFilter(column, bf, nullptr, nullptr),
+ ColumnPredicate::IsNotNull(column),
+ ColumnPredicate::InBloomFilter(column, &bf_copy, nullptr, nullptr),
+ PredicateType::InBloomFilter);
+
+ // BloomFilter AND
+ // IS NULL
+ // =
+ // None
+ *bf = orig_bloom_filters;
+ TestMerge(ColumnPredicate::InBloomFilter(column, bf, nullptr, nullptr),
+ ColumnPredicate::IsNull(column),
+ ColumnPredicate::None(column),
+ PredicateType::None);
+
+ // BloomFilter AND
+ // InList
+ // =
+ // None(the value in list can not hit bloom filter)
+ *bf = orig_bloom_filters;
+ vector<const void*> in_list = { &values[2], &values[3], &values[4] };
+ vector<const void*> hit_list;
+ TestMerge(ColumnPredicate::InBloomFilter(column, bf, nullptr, nullptr),
+ ColumnPredicate::InList(column, &in_list),
+ ColumnPredicate::None(column),
+ PredicateType::None);
+
+ // BloomFilter AND
+ // InList
+ // =
+ // InList(the value in list all hits bloom filter)
+ in_list = { &values[0], &values[1] };
+ hit_list = { &values[0], &values[1] };
+ *bf = orig_bloom_filters;
+ TestMerge(ColumnPredicate::InBloomFilter(column, bf, nullptr, nullptr),
+ ColumnPredicate::InList(column, &in_list),
+ ColumnPredicate::InList(column, &hit_list),
+ PredicateType::InList);
+
+ // BloomFilter AND
+ // InList
+ // =
+ // InList(only the some values in list hits bloom filter)
+ in_list = { &values[0], &values[1], &values[2], &values[3] };
+ hit_list = { &values[0], &values[1]};
+ *bf = orig_bloom_filters;
+ TestMerge(ColumnPredicate::InBloomFilter(column, bf, nullptr, nullptr),
+ ColumnPredicate::InList(column, &in_list),
+ ColumnPredicate::InList(column, &hit_list),
+ PredicateType::InList);
+
+ // BloomFilter AND
+ // InList
+ // =
+ // Equality(only the first value in list hits bloom filter, so it simplify to Equality)
+ in_list = { &values[0], &values[2], &values[3] };
+ *bf = orig_bloom_filters;
+ TestMerge(ColumnPredicate::InBloomFilter(column, bf, nullptr, nullptr),
+ ColumnPredicate::InList(column, &in_list),
+ ColumnPredicate::Equality(column, &values[0]),
+ PredicateType::Equality);
+
+ // Range AND
+ // BloomFilter
+ // =
+ // BloomFilter with lower and upper bound
+ *bf = orig_bloom_filters;
+ bf_copy = *bf;
+ TestMerge(ColumnPredicate::Range(column, &values[0], &values[4]),
+ ColumnPredicate::InBloomFilter(column, bf, nullptr, nullptr),
+ ColumnPredicate::InBloomFilter(column, &bf_copy, &values[0], &values[4]),
+ PredicateType::InBloomFilter);
+
+ // BloomFilter with lower and upper bound AND
+ // Range
+ // =
+ // BloomFilter with lower and upper bound
+ *bf = orig_bloom_filters;
+ bf_copy = *bf;
+ TestMerge(ColumnPredicate::InBloomFilter(column, bf, &values[0], &values[4]),
+ ColumnPredicate::Range(column, &values[1], &values[3]),
+ ColumnPredicate::InBloomFilter(column, &bf_copy, &values[1], &values[3]),
+ PredicateType::InBloomFilter);
+
+ // BloomFilter with lower and upper bound AND
+ // Range
+ // =
+ // None
+ *bf = orig_bloom_filters;
+ bf_copy = *bf;
+ TestMerge(ColumnPredicate::InBloomFilter(column, bf, &values[0], &values[2]),
+ ColumnPredicate::Range(column, &values[2], &values[4]),
+ ColumnPredicate::None(column),
+ PredicateType::None);
+
+ // BloomFilter AND
+ // BloomFilter with lower and upper bound
+ // =
+ // BloomFilter with lower and upper bound
+ *bf = orig_bloom_filters;
+ bf_copy = *bf;
+ vector<ColumnPredicate::BloomFilterInner> collect = *bf;
+ collect.insert(collect.end(), bf->begin(), bf->end());
+ TestMerge(ColumnPredicate::InBloomFilter(column, bf, nullptr, nullptr),
+ ColumnPredicate::InBloomFilter(column, &bf_copy, &values[0], &values[4]),
+ ColumnPredicate::InBloomFilter(column, &collect, &values[0], &values[4]),
+ PredicateType::InBloomFilter);
+
+ // BloomFilter with lower and upper bound AND
+ // BloomFilter with lower and upper bound
+ // =
+ // BloomFilter with lower and upper bound
+ *bf = orig_bloom_filters;
+ collect = *bf;
+ bf_copy = *bf;
+ collect.insert(collect.end(), bf->begin(), bf->end());
+ TestMerge(ColumnPredicate::InBloomFilter(column, bf, &values[1], &values[3]),
+ ColumnPredicate::InBloomFilter(column, &bf_copy, &values[0], &values[4]),
+ ColumnPredicate::InBloomFilter(column, &collect, &values[1], &values[3]),
+ PredicateType::InBloomFilter);
+
+ // BloomFilter with lower and upper bound AND
+ // BloomFilter with lower and upper bound
+ // =
+ // None
+ *bf = orig_bloom_filters;
+ collect = *bf;
+ bf_copy = *bf;
+ collect.insert(collect.end(), bf->begin(), bf->end());
+ TestMerge(ColumnPredicate::InBloomFilter(column, bf, &values[0], &values[2]),
+ ColumnPredicate::InBloomFilter(column, &bf_copy, &values[2], &values[4]),
+ ColumnPredicate::None(column),
+ PredicateType::None);
+ }
+
+ protected:
+ Random rand_;
};
TEST_F(TestColumnPredicate, TestMerge) {
@@ -1161,4 +1378,74 @@ TEST_F(TestColumnPredicate, TestRedaction) {
ASSERT_EQ("a = <redacted>", ColumnPredicate::Equality(column_i32, &one_32).ToString());
}
+TEST_F(TestColumnPredicate, TestBloomFilterMerge) {
+ int n_keys = 5; // 0 1 both hit bf1 and bf2, 2 3 4 only hit bf2.
+ // Test for UINT64 type.
+ BloomFilterBuilder bfb1(
+ BloomFilterSizing::ByCountAndFPRate(n_keys, 0.01));
+ double expected_fp_rate1 = bfb1.false_positive_rate();
+ ASSERT_NEAR(expected_fp_rate1, 0.01, 0.002);
+ ASSERT_EQ(9, bfb1.n_bits() / n_keys);
+ BloomFilterBuilder bfb2(
+ BloomFilterSizing::ByCountAndFPRate(n_keys, 0.01));
+ double expected_fp_rate2 = bfb2.false_positive_rate();
+ ASSERT_NEAR(expected_fp_rate2, 0.01, 0.002);
+ ASSERT_EQ(9, bfb2.n_bits() / n_keys);
+ vector<uint64_t> values_int;
+ FillBloomFilterAndValues(n_keys, &values_int, &bfb1, &bfb2);
+ const Slice slice1 = bfb1.slice();
+ const Slice slice2 = bfb2.slice();
+ ColumnPredicate::BloomFilterInner bf1(slice1, bfb1.n_hashes(), MURMUR_HASH_2);
+ ColumnPredicate::BloomFilterInner bf2(slice2, bfb2.n_hashes(), MURMUR_HASH_2);
+ vector<ColumnPredicate::BloomFilterInner> bfs;
+ bfs.emplace_back(bf1);
+ TestMergeBloomFilterCombinations(ColumnSchema("c", INT64, true), &bfs, values_int);
+ bfs.clear();
+ bfs.emplace_back(bf1);
+ bfs.emplace_back(bf2);
+ TestMergeBloomFilterCombinations(ColumnSchema("c", INT64, true), &bfs, values_int);
+
+ // Test for STRING type.
+ BloomFilterBuilder bfb3(
+ BloomFilterSizing::ByCountAndFPRate(n_keys, 0.01));
+ double expected_fp_rate3 = bfb3.false_positive_rate();
+ ASSERT_NEAR(expected_fp_rate3, 0.01, 0.002);
+ ASSERT_EQ(9, bfb3.n_bits() / n_keys);
+ // 0 1 both hit bf1 and bf2, 2 3 4 only hit bf2.
+ vector<std::string> keys = {"0", "00", "10", "100", "1100"};
+ vector<Slice> keys_slice;
+ for (int i = 0; i < keys.size(); ++i) {
+ Slice key_slice(keys[i]);
+ BloomKeyProbe probe(key_slice, MURMUR_HASH_2);
+ if (i < 2) {
+ bfb3.AddKey(probe);
+ }
+ keys_slice.emplace_back(key_slice);
+ }
+ bfs.clear();
+ bfs.emplace_back(bfb3.slice(), bfb3.n_hashes(), MURMUR_HASH_2);
+ TestMergeBloomFilterCombinations(ColumnSchema("c", STRING, true), &bfs, keys_slice);
+
+ // Test for BINARY type
+ BloomFilterBuilder bfb4(
+ BloomFilterSizing::ByCountAndFPRate(n_keys, 0.01));
+ double expected_fp_rate4 = bfb4.false_positive_rate();
+ ASSERT_NEAR(expected_fp_rate4, 0.01, 0.002);
+ ASSERT_EQ(9, bfb4.n_bits() / n_keys);
+ vector<Slice> binary_keys = { Slice("", 0),
+ Slice("\0", 1),
+ Slice("\0\0", 2),
+ Slice("\0\0\0", 3),
+ Slice("\0\0\0\0", 4) };
+ for (int i = 0; i < binary_keys.size(); ++i) {
+ BloomKeyProbe probe(binary_keys[i], MURMUR_HASH_2);
+ if (i < 2) {
+ bfb4.AddKey(probe);
+ }
+ }
+ bfs.clear();
+ bfs.emplace_back(bfb4.slice(), bfb4.n_hashes(), MURMUR_HASH_2);
+ TestMergeBloomFilterCombinations(ColumnSchema("c", STRING, true), &bfs, binary_keys);
+}
+
} // namespace kudu
http://git-wip-us.apache.org/repos/asf/kudu/blob/8af288a2/src/kudu/common/column_predicate.cc
----------------------------------------------------------------------
diff --git a/src/kudu/common/column_predicate.cc b/src/kudu/common/column_predicate.cc
index 1c784e6..b923ff4 100644
--- a/src/kudu/common/column_predicate.cc
+++ b/src/kudu/common/column_predicate.cc
@@ -19,6 +19,7 @@
#include <algorithm>
#include <cstring>
+#include <iterator>
#include <boost/optional/optional.hpp>
@@ -27,6 +28,7 @@
#include "kudu/common/rowblock.h"
#include "kudu/common/schema.h"
#include "kudu/common/types.h"
+#include "kudu/gutil/macros.h"
#include "kudu/gutil/strings/join.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/util/bitmap.h"
@@ -59,6 +61,18 @@ ColumnPredicate::ColumnPredicate(PredicateType predicate_type,
values_.swap(*values);
}
+ColumnPredicate::ColumnPredicate(PredicateType predicate_type,
+ ColumnSchema column,
+ std::vector<BloomFilterInner>* bfs,
+ const void* lower,
+ const void* upper)
+ : predicate_type_(predicate_type),
+ column_(move(column)),
+ lower_(lower),
+ upper_(upper) {
+ bloom_filters_.swap(*bfs);
+}
+
ColumnPredicate ColumnPredicate::Equality(ColumnSchema column, const void* value) {
CHECK(value != nullptr);
return ColumnPredicate(PredicateType::Equality, move(column), value, nullptr);
@@ -93,6 +107,17 @@ ColumnPredicate ColumnPredicate::InList(ColumnSchema column,
return pred;
}
+ColumnPredicate ColumnPredicate::InBloomFilter(ColumnSchema column,
+ std::vector<BloomFilterInner>* bfs,
+ const void* lower,
+ const void* upper) {
+ CHECK(bfs != nullptr);
+ CHECK(!bfs->empty());
+ ColumnPredicate pred(PredicateType::InBloomFilter, move(column), bfs, lower, upper);
+ pred.Simplify();
+ return pred;
+}
+
boost::optional<ColumnPredicate> ColumnPredicate::InclusiveRange(ColumnSchema column,
const void* lower,
const void* upper,
@@ -167,7 +192,7 @@ void ColumnPredicate::SetToNone() {
upper_ = nullptr;
}
-// TODO: For decimal columns, use column_.type_attributes().precision
+// TODO(granthenke): For decimal columns, use column_.type_attributes().precision
// to calculate the "true" max/min values for improved simplification.
void ColumnPredicate::Simplify() {
auto type_info = column_.type_info();
@@ -193,17 +218,13 @@ void ColumnPredicate::Simplify() {
if (type_info->IsMinValue(lower_)) {
predicate_type_ = PredicateType::IsNotNull;
lower_ = nullptr;
- upper_ = nullptr;
} else if (type_info->IsMaxValue(lower_)) {
predicate_type_ = PredicateType::Equality;
- upper_ = nullptr;
}
} else if (upper_ != nullptr) {
// VALUE < _
if (type_info->IsMinValue(upper_)) {
- predicate_type_ = PredicateType::None;
- lower_ = nullptr;
- upper_ = nullptr;
+ SetToNone();
}
}
return;
@@ -229,6 +250,42 @@ void ColumnPredicate::Simplify() {
}
return;
};
+ case PredicateType::InBloomFilter: {
+ if (lower_ == nullptr && upper_ == nullptr) {
+ return;
+ }
+ // Merge the optional lower and upper bound.
+ if (lower_ != nullptr && upper_ != nullptr) {
+ if (type_info->Compare(lower_, upper_) >= 0) {
+ // If the range bounds are empty then no results can be returned.
+ SetToNone();
+ } else if (type_info->AreConsecutive(lower_, upper_)) {
+ if (CheckValueInBloomFilter(lower_)) {
+ predicate_type_ = PredicateType::Equality;
+ upper_ = nullptr;
+ bloom_filters_.clear();
+ } else {
+ SetToNone();
+ }
+ }
+ } else if (lower_ != nullptr) {
+ if (type_info->IsMinValue(lower_)) {
+ lower_ = nullptr;
+ } else if (type_info->IsMaxValue(lower_)) {
+ if (CheckValueInBloomFilter(lower_)) {
+ predicate_type_ = PredicateType::Equality;
+ bloom_filters_.clear();
+ } else {
+ SetToNone();
+ }
+ }
+ } else if (upper_ != nullptr) {
+ if (type_info->IsMinValue(upper_)) {
+ SetToNone();
+ }
+ }
+ return;
+ };
}
LOG(FATAL) << "unknown predicate type";
}
@@ -257,6 +314,10 @@ void ColumnPredicate::Merge(const ColumnPredicate& other) {
MergeIntoInList(other);
return;
};
+ case PredicateType::InBloomFilter: {
+ MergeIntoBloomFilter(other);
+ return;
+ };
}
LOG(FATAL) << "unknown predicate type";
}
@@ -269,7 +330,11 @@ void ColumnPredicate::MergeIntoRange(const ColumnPredicate& other) {
SetToNone();
return;
};
-
+ case PredicateType::InBloomFilter: {
+ bloom_filters_ = other.bloom_filters_;
+ predicate_type_ = PredicateType::InBloomFilter;
+ FALLTHROUGH_INTENDED;
+ }
case PredicateType::Range: {
// Set the lower bound to the larger of the two.
if (other.lower_ != nullptr &&
@@ -286,7 +351,6 @@ void ColumnPredicate::MergeIntoRange(const ColumnPredicate& other) {
Simplify();
return;
};
-
case PredicateType::Equality: {
if ((lower_ != nullptr && column_.type_info()->Compare(lower_, other.lower_) > 0) ||
(upper_ != nullptr && column_.type_info()->Compare(upper_, other.lower_) <= 0)) {
@@ -303,7 +367,7 @@ void ColumnPredicate::MergeIntoRange(const ColumnPredicate& other) {
case PredicateType::IsNull: {
SetToNone();
return;
- }
+ };
case PredicateType::InList : {
// The InList predicate values are examined to check whether
// they lie in the range.
@@ -360,6 +424,12 @@ void ColumnPredicate::MergeIntoEquality(const ColumnPredicate& other) {
}
return;
};
+ case PredicateType::InBloomFilter: {
+ if (!other.CheckValueInBloomFilter(lower_)) {
+ SetToNone();
+ }
+ return;
+ };
}
LOG(FATAL) << "unknown predicate type";
}
@@ -378,6 +448,7 @@ void ColumnPredicate::MergeIntoIsNotNull(const ColumnPredicate &other) {
lower_ = other.lower_;
upper_ = other.upper_;
values_ = other.values_;
+ bloom_filters_ = other.bloom_filters_;
return;
}
}
@@ -499,6 +570,77 @@ void ColumnPredicate::MergeIntoInList(const ColumnPredicate &other) {
Simplify();
return;
};
+ case PredicateType::InBloomFilter: {
+ std::vector<const void*> new_values;
+ std::copy_if(values_.begin(), values_.end(), std::back_inserter(new_values),
+ [&] (const void* value) {
+ return other.CheckValueInBloomFilter(value);
+ });
+ values_.swap(new_values);
+ Simplify();
+ return;
+ };
+ }
+ LOG(FATAL) << "unknown predicate type";
+}
+
+void ColumnPredicate::MergeIntoBloomFilter(const ColumnPredicate &other) {
+ CHECK(predicate_type_ == PredicateType::InBloomFilter);
+ DCHECK(!bloom_filters_.empty());
+
+ switch (other.predicate_type()) {
+ case PredicateType::None: {
+ SetToNone();
+ return;
+ };
+ case PredicateType::InBloomFilter: {
+ bloom_filters_.insert(bloom_filters_.end(), other.bloom_filters().begin(),
+ other.bloom_filters().end());
+ FALLTHROUGH_INTENDED;
+ }
+ case PredicateType::Range: {
+ // Merge the optional lower and upper bound.
+ if (other.lower_ != nullptr &&
+ (lower_ == nullptr || column_.type_info()->Compare(lower_, other.lower_) < 0)) {
+ lower_ = other.lower_;
+ }
+ if (other.upper_ != nullptr &&
+ (upper_ == nullptr || column_.type_info()->Compare(upper_, other.upper_) > 0)) {
+ upper_ = other.upper_;
+ }
+ Simplify();
+ return;
+ }
+ case PredicateType::Equality: {
+ if (CheckValueInBloomFilter(other.lower_)) {
+ // Value falls in bloom filters so change to Equality predicate.
+ predicate_type_ = PredicateType::Equality;
+ lower_ = other.lower_;
+ upper_ = nullptr;
+ bloom_filters_.clear();
+ } else {
+ SetToNone(); // Value does not fall in bloom filters.
+ }
+ return;
+ }
+ case PredicateType::IsNotNull: return;
+ case PredicateType::IsNull: {
+ SetToNone();
+ return;
+ }
+ case PredicateType::InList: {
+ DCHECK(other.values_.size() > 1);
+ std::vector<const void*> new_values;
+ std::copy_if(other.values_.begin(), other.values_.end(), std::back_inserter(new_values),
+ [&] (const void* value) {
+ return CheckValueInBloomFilter(value);
+ });
+ predicate_type_ = PredicateType::InList;
+ values_.swap(new_values);
+ bloom_filters_.clear();
+ Simplify();
+ return;
+ }
}
LOG(FATAL) << "unknown predicate type";
}
@@ -588,6 +730,12 @@ void ColumnPredicate::EvaluateForPhysicalType(const ColumnBlock& block,
return;
};
case PredicateType::None: LOG(FATAL) << "NONE predicate evaluation";
+ case PredicateType::InBloomFilter: {
+ ApplyPredicate(block, sel, [this] (const void* cell) {
+ return EvaluateCell<PhysicalType>(cell);
+ });
+ return;
+ };
}
LOG(FATAL) << "unknown predicate type";
}
@@ -666,6 +814,9 @@ string ColumnPredicate::ToString() const {
ss.append(")");
return ss;
};
+ case PredicateType::InBloomFilter: {
+ return strings::Substitute("`$0` IS InBloomFilter", column_.name());
+ };
}
LOG(FATAL) << "unknown predicate type";
}
@@ -677,13 +828,18 @@ bool ColumnPredicate::operator==(const ColumnPredicate& other) const {
}
switch (predicate_type_) {
case PredicateType::Equality: return column_.type_info()->Compare(lower_, other.lower_) == 0;
+ case PredicateType::InBloomFilter: {
+ if (bloom_filters_ != other.bloom_filters()) {
+ return false;
+ }
+ FALLTHROUGH_INTENDED;
+ };
case PredicateType::Range: {
- return (lower_ == other.lower_ ||
- (lower_ != nullptr && other.lower_ != nullptr &&
- column_.type_info()->Compare(lower_, other.lower_) == 0)) &&
- (upper_ == other.upper_ ||
- (upper_ != nullptr && other.upper_ != nullptr &&
- column_.type_info()->Compare(upper_, other.upper_) == 0));
+ auto bound_equal = [&] (const void* eleml, const void* elemr) {
+ return (eleml == elemr || (eleml != nullptr && elemr != nullptr &&
+ column_.type_info()->Compare(eleml, elemr) == 0));
+ };
+ return bound_equal(lower_, other.lower_) && bound_equal(upper_, other.upper_);
};
case PredicateType::InList: {
if (values_.size() != other.values_.size()) return false;
@@ -712,6 +868,10 @@ bool ColumnPredicate::CheckValueInList(const void* value) const {
});
}
+bool ColumnPredicate::CheckValueInBloomFilter(const void* value) const {
+ return EvaluateCell(column_.type_info()->physical_type(), value);
+}
+
namespace {
int SelectivityRank(const ColumnPredicate& predicate) {
int rank;
@@ -721,7 +881,8 @@ int SelectivityRank(const ColumnPredicate& predicate) {
case PredicateType::Equality: rank = 2; break;
case PredicateType::InList: rank = 3; break;
case PredicateType::Range: rank = 4; break;
- case PredicateType::IsNotNull: rank = 5; break;
+ case PredicateType::InBloomFilter: rank = 5; break;
+ case PredicateType::IsNotNull: rank = 6; break;
default: LOG(FATAL) << "unknown predicate type";
}
return rank * (kLargestTypeSize + 1) + predicate.column().type_info()->size();
http://git-wip-us.apache.org/repos/asf/kudu/blob/8af288a2/src/kudu/common/column_predicate.h
----------------------------------------------------------------------
diff --git a/src/kudu/common/column_predicate.h b/src/kudu/common/column_predicate.h
index 0155185..ea8385b 100644
--- a/src/kudu/common/column_predicate.h
+++ b/src/kudu/common/column_predicate.h
@@ -17,6 +17,9 @@
#pragma once
+#include <cstddef>
+#include <cstdint>
+
#include <algorithm>
#include <ostream>
#include <string>
@@ -28,6 +31,8 @@
#include "kudu/common/common.pb.h"
#include "kudu/common/schema.h"
#include "kudu/common/types.h"
+#include "kudu/util/bloom_filter.h"
+#include "kudu/util/slice.h"
namespace kudu {
@@ -56,6 +61,10 @@ enum class PredicateType {
// A predicate which evaluates to true if the column value is present in
// a value list.
InList,
+
+ // A predicate which evaluates to true if the column value is present in
+ // a bloom filter.
+ InBloomFilter,
};
// A predicate which can be evaluated over a block of column values.
@@ -73,6 +82,8 @@ enum class PredicateType {
class ColumnPredicate {
public:
+ class BloomFilterInner;
+
// Creates a new equality predicate on the column and value.
//
// The value is not copied, and must outlive the returned predicate.
@@ -130,6 +141,12 @@ class ColumnPredicate {
// The InList will be simplified into an Equality, Range or None if possible.
static ColumnPredicate InList(ColumnSchema column, std::vector<const void*>* values);
+ // Create a new BloomFilter predicate for the column.
+ //
+ // The values are not copied, and must outlive the returned predicate.
+ static ColumnPredicate InBloomFilter(ColumnSchema column, std::vector<BloomFilterInner>* bfs,
+ const void* lower, const void* upper);
+
// Creates a new predicate which matches no values.
static ColumnPredicate None(ColumnSchema column);
@@ -174,12 +191,13 @@ class ColumnPredicate {
case PredicateType::Range: {
if (lower_ == nullptr) {
return DataTypeTraits<PhysicalType>::Compare(cell, this->upper_) < 0;
- } else if (upper_ == nullptr) {
+ }
+ if (upper_ == nullptr) {
return DataTypeTraits<PhysicalType>::Compare(cell, this->lower_) >= 0;
- } else {
- return DataTypeTraits<PhysicalType>::Compare(cell, this->upper_) < 0 &&
- DataTypeTraits<PhysicalType>::Compare(cell, this->lower_) >= 0;
}
+ return DataTypeTraits<PhysicalType>::Compare(cell, this->upper_) < 0 &&
+ DataTypeTraits<PhysicalType>::Compare(cell, this->lower_) >= 0;
+
};
case PredicateType::Equality: {
return DataTypeTraits<PhysicalType>::Compare(cell, this->lower_) == 0;
@@ -196,6 +214,9 @@ class ColumnPredicate {
return DataTypeTraits<PhysicalType>::Compare(lhs, rhs) < 0;
});
};
+ case PredicateType::InBloomFilter: {
+ return EvaluateCellForBloomFilter<PhysicalType>(cell);
+ };
}
LOG(FATAL) << "unknown predicate type";
}
@@ -233,6 +254,64 @@ class ColumnPredicate {
const std::vector<const void*>& raw_values() const {
return values_;
}
+ // Returns bloom filters if this is a bloom filter predicate.
+ const std::vector<BloomFilterInner>& bloom_filters() const {
+ return bloom_filters_;
+ }
+
+ // This class represents the bloom filter used in predicate.
+ class BloomFilterInner {
+ public:
+
+ BloomFilterInner(Slice bloom_data, size_t nhash, HashAlgorithm hash_algorithm) :
+ bloom_data_(bloom_data),
+ nhash_(nhash),
+ hash_algorithm_(hash_algorithm) {
+ }
+
+ BloomFilterInner() : nhash_(0), hash_algorithm_(CITY_HASH) {}
+
+ const Slice& bloom_data() const {
+ return bloom_data_;
+ }
+
+ size_t nhash() const {
+ return nhash_;
+ }
+
+ HashAlgorithm hash_algorithm() const {
+ return hash_algorithm_;
+ }
+
+ void set_nhash(size_t nhash) {
+ nhash_ = nhash;
+ }
+
+ void set_bloom_data(Slice bloom_data) {
+ bloom_data_ = bloom_data;
+ }
+
+ void set_hash_algorithm(HashAlgorithm hash_algorithm) {
+ hash_algorithm_ = hash_algorithm;
+ }
+
+ bool operator==(const BloomFilterInner& other) const {
+ return (bloom_data_ == other.bloom_data() &&
+ nhash_ == other.nhash() &&
+ hash_algorithm_ == other.hash_algorithm());
+ }
+
+ private:
+
+ // The slice of bloom filter.
+ Slice bloom_data_;
+
+ // The times of hash value used in bloom filter.
+ size_t nhash_;
+
+ // The hash algorithm used in bloom filter.
+ HashAlgorithm hash_algorithm_;
+ };
private:
@@ -249,6 +328,13 @@ class ColumnPredicate {
ColumnSchema column,
std::vector<const void*>* values);
+ // Creates a new BloomFilter column predicate.
+ ColumnPredicate(PredicateType predicate_type,
+ ColumnSchema column,
+ std::vector<BloomFilterInner>* bfs,
+ const void* lower,
+ const void* upper);
+
// Transition to a None predicate type.
void SetToNone();
@@ -267,14 +353,49 @@ class ColumnPredicate {
// Merge another predicate into this IS NULL predicate.
void MergeIntoIsNull(const ColumnPredicate& other);
+ // Merge another predicate into this Bloom Fiter predicate.
+ void MergeIntoBloomFilter(const ColumnPredicate& other);
+
+ // Merge another predicate into this InList predicate.
+ void MergeIntoInList(const ColumnPredicate& other);
+
// Templated evaluation to inline the dispatch of comparator. Templating this
// allows dispatch to occur only once per batch.
template <DataType PhysicalType>
void EvaluateForPhysicalType(const ColumnBlock& block,
SelectionVector* sel) const;
- // Merge another predicate into this InList predicate.
- void MergeIntoInList(const ColumnPredicate& other);
+ // Evaluate the bloom filter and avoid the predicate type check on a single cell.
+ template <DataType PhysicalType>
+ bool EvaluateCellForBloomFilter(const void* cell) const {
+ typedef typename DataTypeTraits<PhysicalType>::cpp_type cpp_type;
+ size_t size = sizeof(cpp_type);
+ const void* data = cell;
+ if (PhysicalType == BINARY) {
+ const Slice *slice = reinterpret_cast<const Slice *>(cell);
+ size = slice->size();
+ data = slice->data();
+ }
+ Slice cell_slice(reinterpret_cast<const uint8_t*>(data), size);
+ for (const auto& bf : bloom_filters_) {
+ BloomKeyProbe probe(cell_slice, bf.hash_algorithm());
+ if (!BloomFilter(bf.bloom_data(), bf.nhash()).MayContainKey(probe)) {
+ return false;
+ }
+ }
+ // Check optional lower and upper bound.
+ if (lower_ != nullptr && upper_ != nullptr) {
+ return DataTypeTraits<PhysicalType>::Compare(cell, this->upper_) < 0 &&
+ DataTypeTraits<PhysicalType>::Compare(cell, this->lower_) >= 0;
+ }
+ if (upper_ != nullptr) {
+ return DataTypeTraits<PhysicalType>::Compare(cell, this->upper_) < 0;
+ }
+ if (lower_ != nullptr) {
+ return DataTypeTraits<PhysicalType>::Compare(cell, this->lower_) >= 0;
+ }
+ return true;
+ }
// For a Range type predicate, this helper function checks
// whether a given value is in the range.
@@ -284,6 +405,10 @@ class ColumnPredicate {
// whether a given value is in the list.
bool CheckValueInList(const void* value) const;
+ // For an BloomFilter type predicate, this helper function checks
+ // whether a given value is in the BloomFilter.
+ bool CheckValueInBloomFilter(const void* value) const;
+
// The type of this predicate.
PredicateType predicate_type_;
@@ -299,6 +424,9 @@ class ColumnPredicate {
// The list of values to check column against if this is an InList predicate.
std::vector<const void*> values_;
+
+ // The list of bloom filter in this predicate.
+ std::vector<BloomFilterInner> bloom_filters_;
};
// Compares predicates according to selectivity. Predicates that match fewer
http://git-wip-us.apache.org/repos/asf/kudu/blob/8af288a2/src/kudu/common/common.proto
----------------------------------------------------------------------
diff --git a/src/kudu/common/common.proto b/src/kudu/common/common.proto
index b3d323e..f0e9a33 100644
--- a/src/kudu/common/common.proto
+++ b/src/kudu/common/common.proto
@@ -285,6 +285,13 @@ enum ReplicaSelection {
CLOSEST_REPLICA = 2;
}
+// The hash algorithm used in bloom filter and hash bucket.
+enum HashAlgorithm {
+ UNKNOWN_HASH = 0;
+ MURMUR_HASH_2 = 1;
+ CITY_HASH = 2;
+}
+
// The serialized format of a Kudu table partition schema.
message PartitionSchemaPB {
@@ -319,11 +326,6 @@ message PartitionSchemaPB {
// input.
optional uint32 seed = 3;
- enum HashAlgorithm {
- UNKNOWN = 0;
- MURMUR_HASH_2 = 1;
- }
-
// The hash algorithm to use for calculating the hash bucket.
optional HashAlgorithm hash_algorithm = 4;
}
@@ -348,6 +350,15 @@ message ColumnPredicatePB {
// The predicate column name.
optional string column = 1;
+ // Represent a bloom filter.
+ message BloomFilter {
+ // The hash times for bloom filter.
+ optional int32 nhash = 1;
+ // The bloom filter bitmap.
+ optional bytes bloom_data = 2 [(kudu.REDACT) = true];
+ optional HashAlgorithm hash_algorithm = 3 [default = CITY_HASH];
+ }
+
message Range {
// Bounds should be encoded as follows:
@@ -357,8 +368,7 @@ message ColumnPredicatePB {
//
// Note that this predicate type should not be used for NULL data --
// NULL is defined to neither be greater than or less than other values
- // for the comparison operator. We will eventually add a special
- // predicate type for null-ness.
+ // for the comparison operator.
// The inclusive lower bound.
optional bytes lower = 1 [(kudu.REDACT) = true];
@@ -383,12 +393,28 @@ message ColumnPredicatePB {
message IsNull {}
+ message InBloomFilter {
+ // A list of bloom filters for the field.
+ repeated BloomFilter bloom_filters = 1;
+
+ // Lower and Upper is optional for InBloomFilter.
+ // When use both InBloomFilter and Range predicate for the same column the
+ // merge result can be InBloomFilter whith range bound inside. And the lower
+ // and upper works just like they in Range predicate.
+ // The inclusive lower bound.
+ optional bytes lower = 2 [(kudu.REDACT) = true];
+
+ // The exclusive upper bound.
+ optional bytes upper = 3 [(kudu.REDACT) = true];
+ }
+
oneof predicate {
Range range = 2;
Equality equality = 3;
IsNotNull is_not_null = 4;
InList in_list = 5;
IsNull is_null = 6;
+ InBloomFilter in_bloom_filter = 7;
}
}
http://git-wip-us.apache.org/repos/asf/kudu/blob/8af288a2/src/kudu/common/key_util.cc
----------------------------------------------------------------------
diff --git a/src/kudu/common/key_util.cc b/src/kudu/common/key_util.cc
index ed8dd86..ec98dec 100644
--- a/src/kudu/common/key_util.cc
+++ b/src/kudu/common/key_util.cc
@@ -231,6 +231,7 @@ int PushUpperBoundKeyPredicates(ColIdxIter first,
memcpy(row->mutable_cell_ptr(*col_idx_it), predicate->raw_lower(), size);
pushed_predicates++;
break;
+ case PredicateType::InBloomFilter: // Upper in InBloomFilter processed as upper in Range.
case PredicateType::Range:
if (predicate->raw_upper() != nullptr) {
memcpy(row->mutable_cell_ptr(*col_idx_it), predicate->raw_upper(), size);
@@ -297,6 +298,7 @@ int PushLowerBoundKeyPredicates(ColIdxIter first,
size_t size = column.type_info()->size();
switch (predicate->predicate_type()) {
+ case PredicateType::InBloomFilter: // Lower in InBloomFilter processed as lower in Range.
case PredicateType::Range:
if (predicate->raw_lower() == nullptr) {
break_loop = true;
http://git-wip-us.apache.org/repos/asf/kudu/blob/8af288a2/src/kudu/common/scan_spec.cc
----------------------------------------------------------------------
diff --git a/src/kudu/common/scan_spec.cc b/src/kudu/common/scan_spec.cc
index f5caab8..0660c10 100644
--- a/src/kudu/common/scan_spec.cc
+++ b/src/kudu/common/scan_spec.cc
@@ -193,6 +193,10 @@ void ScanSpec::PushPredicatesIntoPrimaryKeyBounds(const Schema& schema,
// InList predicates should not be removed as the full constraints imposed by an InList
// cannot be translated into only a single set of lower and upper bound primary keys
break;
+ } else if (type == PredicateType::InBloomFilter) {
+ // InBloomFilter predicates should not be removed as the full constraints imposed by bloom
+ // filters cannot be translated into only a single set of lower and upper bound primary keys
+ break;
} else {
LOG(FATAL) << "Can not remove unknown predicate type";
}
http://git-wip-us.apache.org/repos/asf/kudu/blob/8af288a2/src/kudu/common/wire_protocol-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/common/wire_protocol-test.cc b/src/kudu/common/wire_protocol-test.cc
index 3d06b22..9c11de3 100644
--- a/src/kudu/common/wire_protocol-test.cc
+++ b/src/kudu/common/wire_protocol-test.cc
@@ -15,8 +15,11 @@
// specific language governing permissions and limitations
// under the License.
+#include "kudu/common/wire_protocol.h"
+
#include <cstddef>
#include <cstdint>
+#include <memory>
#include <string>
#include <vector>
@@ -29,9 +32,10 @@
#include "kudu/common/row.h"
#include "kudu/common/rowblock.h"
#include "kudu/common/schema.h"
-#include "kudu/common/wire_protocol.h"
#include "kudu/common/wire_protocol.pb.h"
+#include "kudu/gutil/port.h"
#include "kudu/util/bitmap.h"
+#include "kudu/util/bloom_filter.h"
#include "kudu/util/faststring.h"
#include "kudu/util/hexdump.h"
#include "kudu/util/memory/arena.h"
@@ -43,6 +47,7 @@
#include "kudu/util/test_util.h"
using std::string;
+using std::unique_ptr;
using std::vector;
namespace kudu {
@@ -499,4 +504,125 @@ TEST_F(WireProtocolTest, TestColumnPredicateInList) {
ASSERT_TRUE(ColumnPredicateFromPB(schema, &arena, pb, &predicate).IsInvalidArgument());
}
}
+
+class BFWireProtocolTest : public KuduTest {
+ public:
+ BFWireProtocolTest()
+ : schema_({ ColumnSchema("col1", INT32)}, 1),
+ arena_(1024),
+ n_keys_(100) {
+ bfb1_.reset(new BloomFilterBuilder(BloomFilterSizing::ByCountAndFPRate(n_keys_, 0.01)));
+ bfb2_.reset(new BloomFilterBuilder(BloomFilterSizing::ByCountAndFPRate(n_keys_, 0.01)));
+ }
+
+ virtual void SetUp() OVERRIDE {
+ double expected_fp_rate1 = bfb1()->false_positive_rate();
+ ASSERT_NEAR(expected_fp_rate1, 0.01, 0.002);
+ ASSERT_EQ(9, bfb1()->n_bits() / n_keys_);
+ double expected_fp_rate2 = bfb2()->false_positive_rate();
+ ASSERT_NEAR(expected_fp_rate2, 0.01, 0.002);
+ ASSERT_EQ(9, bfb2()->n_bits() / n_keys_);
+ for (int i = 0; i < n_keys_; ++i) {
+ Slice key_slice(reinterpret_cast<const uint8_t*>(&i), sizeof(i));
+ BloomKeyProbe probe(key_slice, MURMUR_HASH_2);
+ bfb1()->AddKey(probe);
+ bfb2()->AddKey(probe);
+ }
+ }
+
+ BloomFilterBuilder* bfb1() const { return bfb1_.get(); }
+
+ BloomFilterBuilder* bfb2() const { return bfb1_.get(); }
+
+protected:
+ Schema schema_;
+ Arena arena_;
+ int n_keys_;
+ unique_ptr<BloomFilterBuilder> bfb1_;
+ unique_ptr<BloomFilterBuilder> bfb2_;
+};
+
+TEST_F(BFWireProtocolTest, TestColumnPredicateBloomFilter) {
+ boost::optional<ColumnPredicate> predicate;
+ ColumnSchema col1 = schema_.column(0);
+ { // Single BloomFilter predicate.
+ vector<kudu::ColumnPredicate::BloomFilterInner> bfs;
+ bfs.emplace_back(bfb1()->slice(), bfb1()->n_hashes(), MURMUR_HASH_2);
+ kudu::ColumnPredicate ibf = kudu::ColumnPredicate::InBloomFilter(col1, &bfs, nullptr, nullptr);
+ ColumnPredicatePB pb;
+ NO_FATALS(ColumnPredicateToPB(ibf, &pb));
+ ASSERT_OK(ColumnPredicateFromPB(schema_, &arena_, pb, &predicate));
+ ASSERT_EQ(predicate->predicate_type(), PredicateType::InBloomFilter);
+ ASSERT_EQ(predicate, ibf);
+ }
+
+ { // Multi BloomFilter predicate.
+ vector<kudu::ColumnPredicate::BloomFilterInner> bfs;
+ bfs.emplace_back(bfb1()->slice(), bfb1()->n_hashes(), MURMUR_HASH_2);
+ bfs.emplace_back(bfb2()->slice(), bfb2()->n_hashes(), MURMUR_HASH_2);
+ kudu::ColumnPredicate ibf = kudu::ColumnPredicate::InBloomFilter(col1, &bfs, nullptr, nullptr);
+ ColumnPredicatePB pb;
+ NO_FATALS(ColumnPredicateToPB(ibf, &pb));
+ ASSERT_OK(ColumnPredicateFromPB(schema_, &arena_, pb, &predicate));
+ ASSERT_EQ(predicate->predicate_type(), PredicateType::InBloomFilter);
+ ASSERT_EQ(predicate, ibf);
+ }
+}
+
+TEST_F(BFWireProtocolTest, TestColumnPredicateBloomFilterWithBound) {
+ boost::optional<ColumnPredicate> predicate;
+ ColumnSchema col1 = schema_.column(0);
+ { // Simply BloomFilter with lower bound.
+ int lower = 1;
+ vector<kudu::ColumnPredicate::BloomFilterInner> bfs;
+ bfs.emplace_back(bfb1()->slice(), bfb1()->n_hashes(), MURMUR_HASH_2);
+ kudu::ColumnPredicate ibf = kudu::ColumnPredicate::InBloomFilter(col1, &bfs, &lower, nullptr);
+ ColumnPredicatePB pb;
+ NO_FATALS(ColumnPredicateToPB(ibf, &pb));
+ ASSERT_OK(ColumnPredicateFromPB(schema_, &arena_, pb, &predicate));
+ ASSERT_EQ(predicate->predicate_type(), PredicateType::InBloomFilter);
+ ASSERT_EQ(predicate, ibf);
+ }
+
+ { // Single bloom filter with upper bound.
+ int upper = 4;
+ vector<kudu::ColumnPredicate::BloomFilterInner> bfs;
+ bfs.emplace_back(bfb1()->slice(), bfb1()->n_hashes(), MURMUR_HASH_2);
+ kudu::ColumnPredicate ibf = kudu::ColumnPredicate::InBloomFilter(col1, &bfs, nullptr, &upper);
+ ColumnPredicatePB pb;
+ NO_FATALS(ColumnPredicateToPB(ibf, &pb));
+ ASSERT_OK(ColumnPredicateFromPB(schema_, &arena_, pb, &predicate));
+ ASSERT_EQ(predicate->predicate_type(), PredicateType::InBloomFilter);
+ ASSERT_EQ(predicate, ibf);
+ }
+
+ { // Single bloom filter with both lower and upper bound.
+ int lower = 1;
+ int upper = 4;
+ vector<kudu::ColumnPredicate::BloomFilterInner> bfs;
+ bfs.emplace_back(bfb1()->slice(), bfb1()->n_hashes(), MURMUR_HASH_2);
+ kudu::ColumnPredicate ibf = kudu::ColumnPredicate::InBloomFilter(col1, &bfs, &lower, &upper);
+ ColumnPredicatePB pb;
+ NO_FATALS(ColumnPredicateToPB(ibf, &pb));
+ ASSERT_OK(ColumnPredicateFromPB(schema_, &arena_, pb, &predicate));
+ ASSERT_EQ(predicate->predicate_type(), PredicateType::InBloomFilter);
+ ASSERT_EQ(predicate, ibf);
+ }
+
+ { // Multi bloom filter with both lower and upper bound.
+ int lower = 1;
+ int upper = 4;
+ vector<kudu::ColumnPredicate::BloomFilterInner> bfs;
+ bfs.emplace_back(bfb1()->slice(), bfb1()->n_hashes(), MURMUR_HASH_2);
+ bfs.emplace_back(bfb2()->slice(), bfb2()->n_hashes(), MURMUR_HASH_2);
+ kudu::ColumnPredicate ibf = kudu::ColumnPredicate::InBloomFilter(col1, &bfs, &lower, &upper);
+ ColumnPredicatePB pb;
+ NO_FATALS(ColumnPredicateToPB(ibf, &pb));
+ ASSERT_OK(ColumnPredicateFromPB(schema_, &arena_, pb, &predicate));
+ ASSERT_EQ(predicate->predicate_type(), PredicateType::InBloomFilter);
+ ASSERT_EQ(predicate->bloom_filters().size(), ibf.bloom_filters().size());
+ ASSERT_EQ(predicate, ibf);
+ }
+}
+
} // namespace kudu
http://git-wip-us.apache.org/repos/asf/kudu/blob/8af288a2/src/kudu/common/wire_protocol.cc
----------------------------------------------------------------------
diff --git a/src/kudu/common/wire_protocol.cc b/src/kudu/common/wire_protocol.cc
index 0a5ce2a..2aab6f0 100644
--- a/src/kudu/common/wire_protocol.cc
+++ b/src/kudu/common/wire_protocol.cc
@@ -409,6 +409,16 @@ void CopyPredicateBoundToPB(const ColumnSchema& col, const void* bound_src, stri
bound_dst->assign(reinterpret_cast<const char*>(src), size);
}
+// Copies a predicate bloom filter data from 'bf_src' into 'bf_dst'.
+void CopyPredicateBloomFilterToPB(const ColumnPredicate::BloomFilterInner& bf_src,
+ ColumnPredicatePB::BloomFilter* bf_dst) {
+ bf_dst->set_nhash(bf_src.nhash());
+ const void* src = bf_src.bloom_data().data();
+ size_t size = bf_src.bloom_data().size();
+ bf_dst->mutable_bloom_data()->assign(reinterpret_cast<const char*>(src), size);
+ bf_dst->set_hash_algorithm(bf_src.hash_algorithm());
+}
+
// Extract a void* pointer suitable for use in a ColumnRangePredicate from the
// string protobuf bound. This validates that the pb_value has the correct
// length, copies the data into 'arena', and sets *result to point to it.
@@ -439,6 +449,21 @@ Status CopyPredicateBoundFromPB(const ColumnSchema& schema,
return Status::OK();
}
+
+// Extract BloomFilterInner from bloom data for ColumnBloomFilterPredicate.
+Status CopyPredicateBloomFilterFromPB(const ColumnPredicatePB::BloomFilter& bf_src,
+ ColumnPredicate::BloomFilterInner* dst_src,
+ Arena* arena) {
+ size_t bloom_data_size = bf_src.bloom_data().size();
+ dst_src->set_nhash(bf_src.nhash());
+ // Copy the data from the protobuf into the Arena.
+ uint8_t* data_copy = static_cast<uint8_t*>(arena->AllocateBytes(bloom_data_size));
+ memcpy(data_copy, bf_src.bloom_data().data(), bloom_data_size);
+ dst_src->set_bloom_data(Slice(data_copy, bloom_data_size));
+ dst_src->set_hash_algorithm(bf_src.hash_algorithm());
+ return Status::OK();
+}
+
} // anonymous namespace
void ColumnPredicateToPB(const ColumnPredicate& predicate,
@@ -481,6 +506,25 @@ void ColumnPredicateToPB(const ColumnPredicate& predicate,
return;
};
case PredicateType::None: LOG(FATAL) << "None predicate may not be converted to protobuf";
+ case PredicateType::InBloomFilter: {
+ auto* bloom_filter_pred = pb->mutable_in_bloom_filter();
+ for (const auto& bf : predicate.bloom_filters()) {
+ ColumnPredicatePB::BloomFilter* bloom_filter = bloom_filter_pred->add_bloom_filters();
+ CopyPredicateBloomFilterToPB(bf, bloom_filter);
+ }
+ // Form the optional lower and upper bound.
+ if (predicate.raw_lower() != nullptr) {
+ CopyPredicateBoundToPB(predicate.column(),
+ predicate.raw_lower(),
+ bloom_filter_pred->mutable_lower());
+ }
+ if (predicate.raw_upper() != nullptr) {
+ CopyPredicateBoundToPB(predicate.column(),
+ predicate.raw_upper(),
+ bloom_filter_pred->mutable_upper());
+ }
+ return;
+ }
}
LOG(FATAL) << "unknown predicate type";
}
@@ -546,9 +590,40 @@ Status ColumnPredicateFromPB(const Schema& schema,
break;
};
case ColumnPredicatePB::kIsNull: {
- *predicate = ColumnPredicate::IsNull(col);
- break;
+ *predicate = ColumnPredicate::IsNull(col);
+ break;
+ };
+ case ColumnPredicatePB::kInBloomFilter: {
+ const auto& in_bloom_filter = pb.in_bloom_filter();
+ vector<ColumnPredicate::BloomFilterInner> bloom_filters;
+ if (in_bloom_filter.bloom_filters_size() == 0) {
+ return Status::InvalidArgument("Invalid in bloom filter predicate on column: "
+ "no bloom filter contained", col.name());
+ }
+ for (const auto& bf : in_bloom_filter.bloom_filters()) {
+ if (!bf.has_nhash()
+ || !bf.has_bloom_data()
+ || !bf.has_hash_algorithm()
+ || bf.hash_algorithm() == UNKNOWN_HASH) {
+ return Status::InvalidArgument("Invalid in bloom filter predicate on column: "
+ "missing bloom filter details", col.name());
+ }
+ ColumnPredicate::BloomFilterInner bloom_filter;
+ RETURN_NOT_OK(CopyPredicateBloomFilterFromPB(bf, &bloom_filter, arena));
+ bloom_filters.emplace_back(bloom_filter);
+ }
+ // Extract the optional lower and upper bound.
+ const void* lower = nullptr;
+ const void* upper = nullptr;
+ if (in_bloom_filter.has_lower()) {
+ RETURN_NOT_OK(CopyPredicateBoundFromPB(col, in_bloom_filter.lower(), arena, &lower));
}
+ if (in_bloom_filter.has_upper()) {
+ RETURN_NOT_OK(CopyPredicateBoundFromPB(col, in_bloom_filter.upper(), arena, &upper));
+ }
+ *predicate = ColumnPredicate::InBloomFilter(col, &bloom_filters, lower, upper);
+ break;
+ };
default: return Status::InvalidArgument("Unknown predicate type for column", col.name());
}
return Status::OK();
http://git-wip-us.apache.org/repos/asf/kudu/blob/8af288a2/src/kudu/tablet/cfile_set-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/cfile_set-test.cc b/src/kudu/tablet/cfile_set-test.cc
index cef4b71..4ccdb17 100644
--- a/src/kudu/tablet/cfile_set-test.cc
+++ b/src/kudu/tablet/cfile_set-test.cc
@@ -15,8 +15,12 @@
// specific language governing permissions and limitations
// under the License.
+#include "kudu/tablet/cfile_set.h"
+
+#include <algorithm>
#include <cstddef>
#include <cstdint>
+#include <iterator>
#include <memory>
#include <ostream>
#include <string>
@@ -24,8 +28,8 @@
#include <gflags/gflags.h>
#include <gflags/gflags_declare.h>
-#include <gtest/gtest.h>
#include <glog/logging.h>
+#include <gtest/gtest.h>
#include "kudu/common/column_materialization_context.h"
#include "kudu/common/column_predicate.h"
@@ -44,13 +48,13 @@
#include "kudu/gutil/port.h"
#include "kudu/gutil/stringprintf.h"
#include "kudu/gutil/strings/stringpiece.h"
-#include "kudu/tablet/cfile_set.h"
#include "kudu/tablet/diskrowset.h"
#include "kudu/tablet/tablet-test-util.h"
#include "kudu/util/auto_release_pool.h"
#include "kudu/util/bloom_filter.h"
#include "kudu/util/mem_tracker.h"
#include "kudu/util/memory/arena.h"
+#include "kudu/util/slice.h"
#include "kudu/util/status.h"
#include "kudu/util/test_macros.h"
@@ -100,6 +104,75 @@ class TestCFileSet : public KuduRowSetTest {
ASSERT_OK(rsw.Finish());
}
+ // Int32 type add probe to the bloom filter.
+ // bf1_contain: 0 2 4 6 8 ... (2n)th key for column 1 to form bloom filter.
+ // bf1_exclude: 1 3 5 7 9 ... (2n + 1)th key for column 1 to form bloom filter.
+ // bf2_contain: 0 2 4 6 8 ... (2n)th key for column 2 to form bloom filter.
+ // bf2_exclude: 1 3 5 7 9 ... (2n + 1)th key for column 2 to form bloom filter.
+ void FillBloomFilter(int nrows,
+ BloomFilterBuilder* bf1_contain,
+ BloomFilterBuilder* bf1_exclude,
+ BloomFilterBuilder* bf2_contain,
+ BloomFilterBuilder* bf2_exclude) {
+ int ratio[] = {2, 10, 100};
+ bool add = true;
+ for (int i = 0; i < nrows; ++i) {
+ int curr1 = i * ratio[0];
+ int curr2 = i * ratio[1];
+ Slice first(reinterpret_cast<const uint8_t*>(&curr1), sizeof(curr1));
+ Slice second(reinterpret_cast<const uint8_t*>(&curr2), sizeof(curr2));
+ BloomKeyProbe probe1(first, MURMUR_HASH_2);
+ BloomKeyProbe probe2(second, MURMUR_HASH_2);
+
+ if (add) {
+ bf1_contain->AddKey(probe1);
+ bf2_contain->AddKey(probe2);
+ } else {
+ bf1_exclude->AddKey(probe1);
+ bf2_exclude->AddKey(probe2);
+ }
+ add = !add;
+ }
+ }
+
+ // Int32 type add probe to the bloom filter.
+ // ret1_contain: to get the key hits in bf1_contain for column 1.
+ // ret1_exclude: to get the key hits in bf1_exclude for column 1.
+ // ret2_contain: to get the key hits in bf2_contain for column 2.
+ // ret2_exclude: to get the key hits in bf2_exclude for column 2.
+ // In some case key may hit both contain and exclude bloom filter
+ // so we get accurate item hits the bloom filter for test behind.
+ void GetBloomFilterResult(int nrows, BloomFilterBuilder* bf1_contain,
+ BloomFilterBuilder* bf1_exclude,
+ BloomFilterBuilder* bf2_contain,
+ BloomFilterBuilder* bf2_exclude,
+ vector<size_t>* ret1_contain,
+ vector<size_t>* ret1_exclude,
+ vector<size_t>* ret2_contain,
+ vector<size_t>* ret2_exclude) {
+ int ratio[] = {2, 10, 100};
+ for (int i = 0; i < nrows; ++i) {
+ int curr1 = i * ratio[0];
+ int curr2 = i * ratio[1];
+ Slice first(reinterpret_cast<const uint8_t*>(&curr1), sizeof(curr1));
+ Slice second(reinterpret_cast<const uint8_t*>(&curr2), sizeof(curr2));
+ BloomKeyProbe probe1(first, MURMUR_HASH_2);
+ BloomKeyProbe probe2(second, MURMUR_HASH_2);
+ if (BloomFilter(bf1_contain->slice(), bf1_contain->n_hashes()).MayContainKey(probe1)) {
+ ret1_contain->push_back(i);
+ }
+ if (BloomFilter(bf1_exclude->slice(), bf1_exclude->n_hashes()).MayContainKey(probe1)) {
+ ret1_exclude->push_back(i);
+ }
+ if (BloomFilter(bf2_contain->slice(), bf2_contain->n_hashes()).MayContainKey(probe2)) {
+ ret2_contain->push_back(i);
+ }
+ if (BloomFilter(bf2_exclude->slice(), bf2_exclude->n_hashes()).MayContainKey(probe2)) {
+ ret2_exclude->push_back(i);
+ }
+ }
+ }
+
// Issue a range scan between 'lower' and 'upper', and verify that all result
// rows indeed fall inside that predicate.
void DoTestRangeScan(const shared_ptr<CFileSet> &fileset,
@@ -135,6 +208,47 @@ class TestCFileSet : public KuduRowSetTest {
}
}
+ // Issue a BloomFilter scan and verify that all result
+ // rows indeed fall inside that predicate.
+ void DoTestBloomFilterScan(const shared_ptr<CFileSet>& fileset,
+ vector<ColumnPredicate> predicates,
+ vector<size_t> target) {
+ LOG(INFO) << "predicates size: " << predicates.size();
+ // Create iterator.
+ shared_ptr<CFileSet::Iterator> cfile_iter(fileset->NewIterator(&schema_));
+ gscoped_ptr<RowwiseIterator> iter(new MaterializingIterator(cfile_iter));
+ LOG(INFO) << "Target size: " << target.size();
+ // Create a scan with a range predicate on the key column.
+ ScanSpec spec;
+ for (const auto& pred : predicates) {
+ spec.AddPredicate(pred);
+ }
+ ASSERT_OK(iter->Init(&spec));
+ // Check that the range was respected on all the results.
+ Arena arena(1024);
+ RowBlock block(schema_, 100, &arena);
+ while (iter->HasNext()) {
+ ASSERT_OK_FAST(iter->NextBlock(&block));
+ for (size_t i = 0; i < block.nrows(); i++) {
+ if (block.selection_vector()->IsRowSelected(i)) {
+ RowBlockRow row = block.row(i);
+ size_t index = row.row_index();
+ vector<size_t>::iterator iter = std::find(target.begin(), target.end(), index);
+ if (iter == target.end()) {
+ FAIL() << "Row " << schema_.DebugRow(row) << " should not have "
+ << "passed predicate ";
+ }
+ target.erase(iter);
+ }
+ }
+ }
+ LOG(INFO) << "Selected size: " << block.selection_vector()->CountSelected();
+ if (!target.empty()) {
+ FAIL() << "Target size " << target.size() << " should have "
+ << "passed predicate ";
+ }
+ }
+
Status MaterializeColumn(CFileSet::Iterator *iter,
size_t col_idx,
ColumnBlock *cb) {
@@ -349,6 +463,99 @@ TEST_F(TestCFileSet, TestRangePredicates2) {
DoTestRangeScan(fileset, kNumRows * 10, kNoBound);
}
+TEST_F(TestCFileSet, TestBloomFilterPredicates) {
+ const int kNumRows = 100;
+ BloomFilterBuilder bfb1_contain(
+ BloomFilterSizing::ByCountAndFPRate(kNumRows, 0.01));
+ double expected_fp_rate1 = bfb1_contain.false_positive_rate();
+ ASSERT_NEAR(expected_fp_rate1, 0.01, 0.002);
+ ASSERT_EQ(9, bfb1_contain.n_bits() / kNumRows);
+
+ BloomFilterBuilder bfb1_exclude(
+ BloomFilterSizing::ByCountAndFPRate(kNumRows, 0.01));
+ double expected_fp_rate11 = bfb1_exclude.false_positive_rate();
+ ASSERT_NEAR(expected_fp_rate11, 0.01, 0.002);
+ ASSERT_EQ(9, bfb1_exclude.n_bits() / kNumRows);
+
+ BloomFilterBuilder bfb2_contain(
+ BloomFilterSizing::ByCountAndFPRate(kNumRows, 0.01));
+ double expected_fp_rate2 = bfb2_contain.false_positive_rate();
+ ASSERT_NEAR(expected_fp_rate2, 0.01, 0.002);
+ ASSERT_EQ(9, bfb2_contain.n_bits() / kNumRows);
+
+ BloomFilterBuilder bfb2_exclude(
+ BloomFilterSizing::ByCountAndFPRate(kNumRows, 0.01));
+ double expected_fp_rate22 = bfb2_exclude.false_positive_rate();
+ ASSERT_NEAR(expected_fp_rate22, 0.01, 0.002);
+ ASSERT_EQ(9, bfb2_exclude.n_bits() / kNumRows);
+
+ WriteTestRowSet(kNumRows);
+ vector<size_t> ret1_contain;
+ vector<size_t> ret1_exclude;
+ vector<size_t> ret2_contain;
+ vector<size_t> ret2_exclude;
+ FillBloomFilter(kNumRows, &bfb1_contain, &bfb1_exclude, &bfb2_contain, &bfb2_exclude);
+ GetBloomFilterResult(kNumRows, &bfb1_contain, &bfb1_exclude, &bfb2_contain, &bfb2_exclude,
+ &ret1_contain, &ret1_exclude, &ret2_contain, &ret2_exclude);
+
+ shared_ptr<CFileSet> fileset;
+ ASSERT_OK(CFileSet::Open(rowset_meta_, MemTracker::GetRootTracker(), &fileset));
+
+ vector<ColumnPredicate::BloomFilterInner> bfs;
+ // BloomFilter of column 0 contain.
+ ColumnPredicate::BloomFilterInner bf1_contain(bfb1_contain.slice(),
+ bfb1_contain.n_hashes(), MURMUR_HASH_2);
+ bfs.push_back(bf1_contain);
+ auto pred1_contain = ColumnPredicate::InBloomFilter(schema_.column(0), &bfs, nullptr, nullptr);
+ DoTestBloomFilterScan(fileset, { pred1_contain }, ret1_contain);
+
+ // BloomFilter of column 1 contain.
+ ColumnPredicate::BloomFilterInner bf2_contain(bfb2_contain.slice(),
+ bfb2_contain.n_hashes(), MURMUR_HASH_2);
+ bfs.clear();
+ bfs.push_back(bf2_contain);
+ auto pred2_contain = ColumnPredicate::InBloomFilter(schema_.column(1), &bfs, nullptr, nullptr);
+ DoTestBloomFilterScan(fileset, { pred2_contain }, ret2_contain);
+
+ // BloomFilter of column 0 contain and exclude.
+ ColumnPredicate::BloomFilterInner bf1_exclude(bfb1_exclude.slice(),
+ bfb1_exclude.n_hashes(), MURMUR_HASH_2);
+ bfs.clear();
+ bfs.push_back(bf1_contain);
+ bfs.push_back(bf1_exclude);
+ vector<size_t> ret1_contain_exclude;
+ auto pred1_contain_exclude = ColumnPredicate::InBloomFilter(schema_.column(0),
+ &bfs, nullptr, nullptr);
+ std::set_intersection(ret1_contain.begin(), ret1_contain.end(), ret1_exclude.begin(),
+ ret1_exclude.end(), std::back_inserter(ret1_contain_exclude));
+ DoTestBloomFilterScan(fileset, { pred1_contain_exclude }, ret1_contain_exclude);
+ // BloomFilter of column 0 contain and column 1 contain.
+ vector<size_t> ret12_contain_contain;
+ std::set_intersection(ret1_contain.begin(), ret1_contain.end(), ret2_contain.begin(),
+ ret2_contain.end(), std::back_inserter(ret12_contain_contain));
+ DoTestBloomFilterScan(fileset, { pred1_contain, pred2_contain }, ret12_contain_contain);
+
+ // BloomFilter of column 0 contain with lower and upper bound.
+ int32_t lower = 8;
+ int32_t upper = 58;
+ int32_t lower_row_index = lower / 2;
+ int32_t upper_row_index = upper / 2;
+ vector<size_t> ret1_contain_range = ret1_contain;
+ vector<size_t>::iterator left = std::lower_bound(ret1_contain_range.begin(),
+ ret1_contain_range.end(), lower_row_index);
+ ret1_contain_range.erase(ret1_contain_range.begin(), left); // don't erase left
+ vector<size_t>::iterator right = std::lower_bound(ret1_contain_range.begin(),
+ ret1_contain_range.end(), upper_row_index);
+ ret1_contain_range.erase(right, ret1_contain_range.end()); // earse right
+ auto range = ColumnPredicate::Range(schema_.column(0), &lower, &upper);
+ DoTestBloomFilterScan(fileset, { pred1_contain, range }, ret1_contain_range);
+
+ // BloomFilter of column 0 contain with Range with column.
+ bfs.clear();
+ bfs.push_back(bf1_contain);
+ auto bf_with_range = ColumnPredicate::InBloomFilter(schema_.column(0), &bfs, &lower, &upper);
+ DoTestBloomFilterScan(fileset, { bf_with_range }, ret1_contain_range);
+}
} // namespace tablet
} // namespace kudu
http://git-wip-us.apache.org/repos/asf/kudu/blob/8af288a2/src/kudu/util/bloom_filter.h
----------------------------------------------------------------------
diff --git a/src/kudu/util/bloom_filter.h b/src/kudu/util/bloom_filter.h
index ad4e3eb..0905fda 100644
--- a/src/kudu/util/bloom_filter.h
+++ b/src/kudu/util/bloom_filter.h
@@ -20,11 +20,13 @@
#include <cstddef>
#include <cstdint>
+#include "kudu/common/common.pb.h"
#include "kudu/gutil/gscoped_ptr.h"
#include "kudu/gutil/hash/city.h"
#include "kudu/gutil/macros.h"
#include "kudu/gutil/port.h"
#include "kudu/util/bitmap.h"
+#include "kudu/util/hash_util.h"
#include "kudu/util/slice.h"
namespace kudu {
@@ -52,11 +54,22 @@ class BloomKeyProbe {
//
// NOTE: proper operation requires that the referenced memory remain
// valid for the lifetime of this object.
- explicit BloomKeyProbe(const Slice &key) : key_(key) {
- uint64_t h = util_hash::CityHash64(
- reinterpret_cast<const char *>(key.data()),
- key.size());
-
+ explicit BloomKeyProbe(const Slice &key, HashAlgorithm hash_algorithm = CITY_HASH)
+ : key_(key) {
+ uint64_t h = 0;
+ switch (hash_algorithm) {
+ case MURMUR_HASH_2:
+ h = HashUtil::MurmurHash2_64(
+ reinterpret_cast<const char *>(key.data()),
+ key.size(),
+ /*seed=*/0);
+ break;
+ case CITY_HASH:
+ default:
+ h = util_hash::CityHash64(
+ reinterpret_cast<const char *>(key.data()),
+ key.size());
+ }
// Use the top and bottom halves of the 64-bit hash
// as the two independent hash functions for mixing.
h_1_ = static_cast<uint32_t>(h);