You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by ab...@apache.org on 2020/03/19 16:51:37 UTC

[kudu] branch master updated (a80d747 -> c0e922d)

This is an automated email from the ASF dual-hosted git repository.

abukor pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git.


    from a80d747  [util] KUDU-3067 add OpenStack metadata detector
     new 74e0be2  [client] Add C++ API to accept BlockBloomFilter directly
     new c0e922d  [client] Mark the Bloom filter predicate method as experimental

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 src/kudu/client/client.cc                 |  44 ++++++-
 src/kudu/client/client.h                  |  41 +++++++
 src/kudu/client/predicate-test.cc         | 198 ++++++++++++++++++++++--------
 src/kudu/client/scan_predicate-internal.h |  47 +++++++
 src/kudu/client/scan_predicate.cc         |  51 +++++++-
 5 files changed, 324 insertions(+), 57 deletions(-)


[kudu] 02/02: [client] Mark the Bloom filter predicate method as experimental

Posted by ab...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

abukor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit c0e922d24d451fed66e309c954900d7035856c9a
Author: Bankim Bhavsar <ba...@cloudera.com>
AuthorDate: Wed Mar 18 16:03:53 2020 -0700

    [client] Mark the Bloom filter predicate method as experimental
    
    This is the method meant for general consumption by C++ client
    users. The method that directly accepts BlockBloomFilter
    is already marked Advanced/Unstable.
    
    This allows changes in future in case we need to incorporate
    feedback discovered when testing integration with Impala
    
    Tests:
    - Verified the generated doxygen HTML file.
    
    Change-Id: Ie1572228dc041d2e5cba68ee47afbb082752b759
    Reviewed-on: http://gerrit.cloudera.org:8080/15486
    Reviewed-by: Adar Dembo <ad...@cloudera.com>
    Tested-by: Kudu Jenkins
---
 src/kudu/client/client.h | 2 ++
 1 file changed, 2 insertions(+)

diff --git a/src/kudu/client/client.h b/src/kudu/client/client.h
index 87dd697..582abc9 100644
--- a/src/kudu/client/client.h
+++ b/src/kudu/client/client.h
@@ -1089,6 +1089,8 @@ class KUDU_EXPORT KuduTable : public sp::enable_shared_from_this<KuduTable> {
   /// Create a new IN Bloom filter predicate which can be used for scanners on
   /// this table.
   ///
+  /// @note This method is experimental and may change or disappear in future.
+  ///
   /// A Bloom filter is a space-efficient probabilistic data structure used to
   /// test set membership with a possibility of false positive matches.
   /// See @c KuduBloomFilter for creating Bloom filters.


[kudu] 01/02: [client] Add C++ API to accept BlockBloomFilter directly

Posted by ab...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

abukor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit 74e0be274da1df2653f7a72175b3e30e0c7a527e
Author: Bankim Bhavsar <ba...@cloudera.com>
AuthorDate: Wed Mar 11 12:10:38 2020 -0700

    [client] Add C++ API to accept BlockBloomFilter directly
    
    For performance reasons, ability to use its own allocator etc.,
    callers may choose to supply BlockBloomFilter directly
    instead of using client::KuduBloomFilter. Case in point Impala.
    
    The allocator and BlockBloomFilter are accepted as Slice
    parameters that wrap the raw pointers.
    
    This API is marked Advanced/Unstable as supplying incorrect/incompatible
    parameters can lead to undefined results and it could change in
    future in backward incompatible manner.
    
    Change-Id: Iea85e3da8fe55f78e49a80439e6af723aa3d970b
    Reviewed-on: http://gerrit.cloudera.org:8080/15424
    Reviewed-by: Adar Dembo <ad...@cloudera.com>
    Tested-by: Kudu Jenkins
    Reviewed-by: Wenzhe Zhou <wz...@cloudera.com>
---
 src/kudu/client/client.cc                 |  44 ++++++-
 src/kudu/client/client.h                  |  39 ++++++
 src/kudu/client/predicate-test.cc         | 198 ++++++++++++++++++++++--------
 src/kudu/client/scan_predicate-internal.h |  47 +++++++
 src/kudu/client/scan_predicate.cc         |  51 +++++++-
 5 files changed, 322 insertions(+), 57 deletions(-)

diff --git a/src/kudu/client/client.cc b/src/kudu/client/client.cc
index 8fdc40b..c3cb8e6 100644
--- a/src/kudu/client/client.cc
+++ b/src/kudu/client/client.cc
@@ -92,6 +92,7 @@
 #include "kudu/tserver/tserver.pb.h"
 #include "kudu/tserver/tserver_service.proxy.h"
 #include "kudu/util/async_util.h"
+#include "kudu/util/block_bloom_filter.h"
 #include "kudu/util/debug-util.h"
 #include "kudu/util/init.h"
 #include "kudu/util/logging.h"
@@ -971,7 +972,7 @@ KuduPredicate* KuduTable::NewComparisonPredicate(const Slice& col_name,
 }
 
 KuduPredicate* KuduTable::NewInBloomFilterPredicate(const Slice& col_name,
-                                                    std::vector<KuduBloomFilter*>* bloom_filters) {
+                                                    vector<KuduBloomFilter*>* bloom_filters) {
   // We always take ownership of values; this ensures cleanup if the predicate is invalid.
   auto cleanup = MakeScopedCleanup([&]() {
     STLDeleteElements(bloom_filters);
@@ -1004,6 +1005,47 @@ KuduPredicate* KuduTable::NewInBloomFilterPredicate(const Slice& col_name,
   });
 }
 
+KuduPredicate* KuduTable::NewInBloomFilterPredicate(const Slice& col_name,
+                                                    const Slice& allocator,
+                                                    const vector<Slice>& bloom_filters) {
+  // Empty vector of bloom filters will select all rows. Hence disallowed.
+  if (bloom_filters.empty()) {
+    return new KuduPredicate(
+        new ErrorPredicateData(Status::InvalidArgument("No Bloom filters supplied")));
+  }
+
+  // In this case, the Block Bloom filters and allocator are supplied as opaque pointers
+  // and the predicate will convert them to well-defined pointer types
+  // but will NOT take ownership of those pointers. Hence a custom deleter,
+  // DirectBloomFilterDataDeleter, is used that gives control over ownership.
+
+  // Extract the allocator.
+  auto* bbf_allocator =
+      // const_cast<> can be avoided with mutable_data() on a Slice. However mutable_data() is a
+      // non-const function which can't be used with the const allocator Slice.
+      // Same for BlockBloomFilter below.
+      reinterpret_cast<BlockBloomFilterBufferAllocatorIf*>(const_cast<uint8_t*>(allocator.data()));
+  std::shared_ptr<BlockBloomFilterBufferAllocatorIf> bbf_allocator_shared(
+      bbf_allocator,
+      DirectBloomFilterDataDeleter<BlockBloomFilterBufferAllocatorIf>(false /*owned*/));
+
+  // Extract the Block Bloom filters.
+  vector<DirectBlockBloomFilterUniqPtr> bbf_vec;
+  for (const auto& bf_slice : bloom_filters) {
+    auto* bbf =
+        reinterpret_cast<BlockBloomFilter*>(const_cast<uint8_t*>(bf_slice.data()));
+    DirectBlockBloomFilterUniqPtr bf_uniq_ptr(
+        bbf, DirectBloomFilterDataDeleter<BlockBloomFilter>(false /*owned*/));
+    bbf_vec.emplace_back(std::move(bf_uniq_ptr));
+  }
+
+  return data_->MakePredicate(col_name, [&](const ColumnSchema& col_schema) {
+    return new KuduPredicate(
+        new InDirectBloomFilterPredicateData(col_schema, std::move(bbf_allocator_shared),
+                                             std::move(bbf_vec)));
+  });
+}
+
 KuduPredicate* KuduTable::NewInListPredicate(const Slice& col_name,
                                              vector<KuduValue*>* values) {
   // We always take ownership of values; this ensures cleanup if the predicate is invalid.
diff --git a/src/kudu/client/client.h b/src/kudu/client/client.h
index dc4ad95..87dd697 100644
--- a/src/kudu/client/client.h
+++ b/src/kudu/client/client.h
@@ -1113,6 +1113,45 @@ class KUDU_EXPORT KuduTable : public sp::enable_shared_from_this<KuduTable> {
   KuduPredicate* NewInBloomFilterPredicate(const Slice& col_name,
                                            std::vector<KuduBloomFilter*>* bloom_filters);
 
+  /// @name Advanced/Unstable API
+  ///
+  /// There are no guarantees on the stability of this client API.
+  ///
+  ///@{
+  /// Create a new IN Bloom filter predicate using direct BlockBloomFilter
+  /// pointers which can be used for scanners on this table.
+  ///
+  /// A Bloom filter is a space-efficient probabilistic data structure used to
+  /// test set membership with a possibility of false positive matches.
+  ///
+  /// IN list predicate can be used with small number of values; on the other
+  /// hand with IN Bloom filter predicate large number of values can be tested
+  /// for membership in a space-efficient manner.
+  ///
+  /// @param [in] col_name
+  ///   Name of the column to which the predicate applies.
+  /// @param [in] allocator
+  ///   Pointer to the BlockBloomFilterBufferAllocatorIf allocator used with
+  ///   Bloom filters.
+  /// @param bloom_filters
+  ///   Vector of BlockBloomFilter pointers that contain the values inserted to
+  ///   match against the column. The column value must match against all the
+  ///   supplied Bloom filters to be returned by the scanner. On return,
+  ///   regardless of success or error, the returned predicate will NOT take
+  ///   ownership of the pointers contained in @c bloom_filters and caller is
+  ///   responsible for the lifecycle management of the Bloom filters.
+  ///   The supplied Bloom filters are expected to remain valid for the
+  ///   lifetime of the KuduScanner.
+  /// @return Raw pointer to an IN Bloom filter predicate. The caller owns the
+  ///   predicate until it is passed into KuduScanner::AddConjunctPredicate().
+  ///   In the case of an error (e.g. an invalid column name),
+  ///   a non-NULL value is still returned. The error will be returned when
+  ///   attempting to add this predicate to a KuduScanner.
+  KuduPredicate* NewInBloomFilterPredicate(const Slice& col_name,
+                                           const Slice& allocator,
+                                           const std::vector<Slice>& bloom_filters);
+  ///@}
+
   /// Create a new IN list predicate which can be used for scanners on this
   /// table.
   ///
diff --git a/src/kudu/client/predicate-test.cc b/src/kudu/client/predicate-test.cc
index 150775a..eeb84f6 100644
--- a/src/kudu/client/predicate-test.cc
+++ b/src/kudu/client/predicate-test.cc
@@ -17,6 +17,7 @@
 
 #include <algorithm>
 #include <cmath>
+#include <cstddef>
 #include <cstdint>
 #include <initializer_list>
 #include <iterator>
@@ -44,7 +45,9 @@
 #include "kudu/gutil/strings/escaping.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/mini-cluster/internal_mini_cluster.h"
+#include "kudu/util/block_bloom_filter.h"
 #include "kudu/util/decimal_util.h"
+#include "kudu/util/hash.pb.h"
 #include "kudu/util/int128.h"
 #include "kudu/util/random.h"
 #include "kudu/util/random_util.h"
@@ -326,6 +329,16 @@ class PredicateTest : public KuduTest {
     return bf;
   }
 
+  static unique_ptr<BlockBloomFilter> CreateDirectBloomFilter(
+      int nkeys,
+      float false_positive_prob = kBloomFilterFalsePositiveProb) {
+    unique_ptr<BlockBloomFilter> bf(
+        new BlockBloomFilter(DefaultBlockBloomFilterBufferAllocator::GetSingleton()));
+    CHECK_OK(bf->Init(BlockBloomFilter::MinLogSpace(nkeys, false_positive_prob),
+                      kudu::FAST_HASH, 0));
+    return bf;
+  }
+
   void CheckInBloomFilterPredicate(const shared_ptr<KuduTable>& table,
                                    KuduBloomFilter* in_bloom_filter,
                                    int expected_count) {
@@ -1260,23 +1273,9 @@ TEST_F(PredicateTest, TestDecimalPredicates) {
 }
 
 class BloomFilterPredicateTest : public PredicateTest {
+ public:
+  BloomFilterPredicateTest() : rand_(Random(SeedRandom())) {}
  protected:
-  template<class Collection>
-  static KuduBloomFilter* CreateBloomFilterWithValues(const Collection& values) {
-    auto* bloom_filter = CreateBloomFilter(values.size());
-    for (const auto& v : values) {
-      bloom_filter->Insert(Slice(reinterpret_cast<const uint8_t*>(&v), sizeof(v)));
-    }
-    return bloom_filter;
-  }
-};
-
-TEST_F(BloomFilterPredicateTest, TestBloomFilterPredicate) {
-  shared_ptr<KuduTable> table = CreateAndOpenTable(KuduColumnSchema::INT32);
-  shared_ptr<KuduSession> session = CreateSession();
-
-  auto seed = SeedRandom();
-  Random rand(seed);
   // Number of values to be written to the table.
   static constexpr int kNumAllValues = 100000;
   // Subset of values from the table that'll be inserted in BloomFilter and searched against
@@ -1287,58 +1286,155 @@ TEST_F(BloomFilterPredicateTest, TestBloomFilterPredicate) {
   // Number of false positives based on the number of values that'll be searched against.
   static constexpr int kFalsePositives = kNumAllValues * kBloomFilterFalsePositiveProb;
 
-  const unordered_set<int32_t> empty_set;
-  auto all_values = CreateRandomUniqueIntegers<int32_t>(kNumAllValues, empty_set, &rand);
-  auto min_max_pair = std::minmax_element(all_values.begin(), all_values.end());
-  vector<int32_t> inclusive_values;
-  ReservoirSample(all_values, kNumInclusiveValues, empty_set, &rand, &inclusive_values);
-  auto* inclusive_bf = CreateBloomFilterWithValues(inclusive_values);
-  auto exclusive_values = CreateRandomUniqueIntegers<int32_t>(kNumExclusiveValues, all_values,
-                                                              &rand);
-  auto* exclusive_bf = CreateBloomFilterWithValues(exclusive_values);
+  shared_ptr<KuduTable> table_;
+  shared_ptr<KuduSession> session_;
+  Random rand_;
+  unordered_set<int32_t> all_values_;
+  int32_t min_value_, max_value_;
+  // Subset of "all_values_".
+  vector<int32_t> inclusive_values_;
+  // Values that are not contained in "all_values_".
+  unordered_set<int32_t> exclusive_values_;
 
-  int i = 0;
-  for (auto value : all_values) {
-    unique_ptr<KuduInsert> insert(table->NewInsert());
-    ASSERT_OK(insert->mutable_row()->SetInt64("key", i++));
-    ASSERT_OK(insert->mutable_row()->SetInt32("value", value));
-    ASSERT_OK(session->Apply(insert.release()));
+  void SetUp() override {
+    PredicateTest::SetUp();
+
+    table_ = CreateAndOpenTable(KuduColumnSchema::INT32);
+    session_ = CreateSession();
+
+    const unordered_set<int32_t> empty_set;
+    all_values_ = CreateRandomUniqueIntegers<int32_t>(kNumAllValues, empty_set, &rand_);
+    auto min_max_pair = std::minmax_element(all_values_.begin(), all_values_.end());
+    min_value_ = *min_max_pair.first;
+    max_value_ = *min_max_pair.second;
+    ReservoirSample(all_values_, kNumInclusiveValues, empty_set, &rand_, &inclusive_values_);
+    exclusive_values_ = CreateRandomUniqueIntegers<int32_t>(kNumExclusiveValues, all_values_,
+                                                            &rand_);
+  }
+
+  template<typename BloomFilterType, typename Collection>
+  static void InsertValues(BloomFilterType* bloom_filter, const Collection& values) {
+    for (const auto& v : values) {
+      bloom_filter->Insert(Slice(reinterpret_cast<const uint8_t*>(&v), sizeof(v)));
+    }
+  }
+
+  template<class Collection>
+  static KuduBloomFilter* CreateBloomFilterWithValues(const Collection& values) {
+    KuduBloomFilter* bloom_filter = CreateBloomFilter(values.size());
+    InsertValues(bloom_filter, values);
+    return bloom_filter;
   }
-  ASSERT_OK(session->Flush());
+
+  template<class Collection>
+  static unique_ptr<BlockBloomFilter> CreateDirectBloomFilterWithValues(const Collection& values) {
+    unique_ptr<BlockBloomFilter> bloom_filter = CreateDirectBloomFilter(values.size());
+    InsertValues(bloom_filter.get(), values);
+    return bloom_filter;
+  }
+
+  void InsertAllValuesInTable() {
+    int i = 0;
+    for (auto value : all_values_) {
+      unique_ptr<KuduInsert> insert(table_->NewInsert());
+      ASSERT_OK(insert->mutable_row()->SetInt64("key", i++));
+      ASSERT_OK(insert->mutable_row()->SetInt32("value", value));
+      ASSERT_OK(session_->Apply(insert.release()));
+    }
+    ASSERT_OK(session_->Flush());
+  }
+
+  // Combine supplied Bloom filter predicates that contains inclusive values
+  // with Range predicate.
+  void TestWithRangePredicate(KuduPredicate* inclusive_predicate1,
+                              KuduPredicate* inclusive_predicate2) {
+    auto* less_predicate = table_->NewComparisonPredicate("value", KuduPredicate::LESS,
+                                                          KuduValue::FromInt(min_value_));
+    int actual_count_less = CountRows(table_, {inclusive_predicate1, less_predicate });
+    EXPECT_EQ(0, actual_count_less);
+
+    auto* ge_predicate = table_->NewComparisonPredicate("value", KuduPredicate::GREATER_EQUAL,
+                                                        KuduValue::FromInt(min_value_));
+    auto* le_predicate = table_->NewComparisonPredicate("value", KuduPredicate::LESS_EQUAL,
+                                                        KuduValue::FromInt(max_value_));
+    int actual_count_range = CountRows(table_,
+                                       { inclusive_predicate2, ge_predicate, le_predicate });
+    EXPECT_LE(inclusive_values_.size(), actual_count_range);
+    EXPECT_GE(inclusive_values_.size() + kFalsePositives, actual_count_range);
+  }
+};
+
+// Though this static constexpr expression is initialized in the class declaration, it needs to be
+// defined because it involves some computation.
+constexpr int BloomFilterPredicateTest::kFalsePositives;
+
+TEST_F(BloomFilterPredicateTest, TestKuduBloomFilterPredicate) {
+  KuduBloomFilter* inclusive_bf = CreateBloomFilterWithValues(inclusive_values_);
+  KuduBloomFilter* exclusive_bf = CreateBloomFilterWithValues(exclusive_values_);
+
+  InsertAllValuesInTable();
 
   vector<KuduBloomFilter*> inclusive_bf_vec = { inclusive_bf };
   auto* inclusive_predicate =
-      table->NewInBloomFilterPredicate("value", &inclusive_bf_vec);
+      table_->NewInBloomFilterPredicate("value", &inclusive_bf_vec);
   auto* inclusive_predicate_clone1 = inclusive_predicate->Clone();
   auto* inclusive_predicate_clone2 = inclusive_predicate->Clone();
 
   ASSERT_TRUE(inclusive_bf_vec.empty());
-  int actual_count_inclusive = CountRows(table, { inclusive_predicate });
-  EXPECT_LE(inclusive_values.size(), actual_count_inclusive);
-  EXPECT_GE(inclusive_values.size() + kFalsePositives, actual_count_inclusive);
+  int actual_count_inclusive = CountRows(table_, { inclusive_predicate });
+  EXPECT_LE(inclusive_values_.size(), actual_count_inclusive);
+  EXPECT_GE(inclusive_values_.size() + kFalsePositives, actual_count_inclusive);
 
   vector<KuduBloomFilter*> exclusive_bf_vec = { exclusive_bf };
   auto* exclusive_predicate =
-      table->NewInBloomFilterPredicate("value", &exclusive_bf_vec);
+      table_->NewInBloomFilterPredicate("value", &exclusive_bf_vec);
   ASSERT_TRUE(exclusive_bf_vec.empty());
-  int actual_count_exclusive = CountRows(table, { exclusive_predicate });
+  int actual_count_exclusive = CountRows(table_, { exclusive_predicate });
+  EXPECT_LE(0, actual_count_exclusive);
+  EXPECT_GE(kFalsePositives, actual_count_exclusive);
+
+  // Combine Range predicate with Bloom filter predicate.
+  TestWithRangePredicate(inclusive_predicate_clone1, inclusive_predicate_clone2);
+}
+
+// Same as TestKuduBloomFilterPredicate above but using the overloaded NewInBloomFilterPredicate()
+// client API with direct BlockBloomFilter pointer.
+TEST_F(BloomFilterPredicateTest, TestDirectBlockBloomFilterPredicate) {
+  unique_ptr<BlockBloomFilter> inclusive_bf = CreateDirectBloomFilterWithValues(inclusive_values_);
+  unique_ptr<BlockBloomFilter> exclusive_bf = CreateDirectBloomFilterWithValues(exclusive_values_);
+
+  InsertAllValuesInTable();
+
+  auto* allocator = DefaultBlockBloomFilterBufferAllocator::GetSingleton();
+  Slice allocator_slice(reinterpret_cast<const uint8_t*>(allocator), sizeof(*allocator));
+
+  vector<Slice> inclusive_bf_vec =
+      { Slice(reinterpret_cast<const uint8_t*>(inclusive_bf.get()), sizeof(*inclusive_bf)) };
+  const size_t inclusive_bf_vec_size = inclusive_bf_vec.size();
+
+  auto* inclusive_predicate =
+      table_->NewInBloomFilterPredicate("value", allocator_slice, inclusive_bf_vec);
+  auto* inclusive_predicate_clone1 = inclusive_predicate->Clone();
+  auto* inclusive_predicate_clone2 = inclusive_predicate->Clone();
+
+  ASSERT_EQ(inclusive_bf_vec_size, inclusive_bf_vec.size());
+  int actual_count_inclusive = CountRows(table_, { inclusive_predicate });
+  EXPECT_LE(inclusive_values_.size(), actual_count_inclusive);
+  EXPECT_GE(inclusive_values_.size() + kFalsePositives, actual_count_inclusive);
+
+  vector<Slice> exclusive_bf_vec =
+      { Slice(reinterpret_cast<const uint8_t*>(exclusive_bf.get()), sizeof(*exclusive_bf)) };
+  const size_t exclusive_bf_vec_size = exclusive_bf_vec.size();
+  auto* exclusive_predicate =
+      table_->NewInBloomFilterPredicate("value", allocator_slice, exclusive_bf_vec);
+  ASSERT_EQ(exclusive_bf_vec_size, exclusive_bf_vec.size());
+
+  int actual_count_exclusive = CountRows(table_, { exclusive_predicate });
   EXPECT_LE(0, actual_count_exclusive);
   EXPECT_GE(kFalsePositives, actual_count_exclusive);
 
   // Combine Range predicate with Bloom filter predicate.
-  auto* less_predicate = table->NewComparisonPredicate("value", KuduPredicate::LESS,
-                                                       KuduValue::FromInt(*min_max_pair.first));
-  int actual_count_less = CountRows(table, {inclusive_predicate_clone1, less_predicate });
-  EXPECT_EQ(0, actual_count_less);
-
-  auto* ge_predicate = table->NewComparisonPredicate("value", KuduPredicate::GREATER_EQUAL,
-                                                     KuduValue::FromInt(*min_max_pair.first));
-  auto* le_predicate = table->NewComparisonPredicate("value", KuduPredicate::LESS_EQUAL,
-                                                     KuduValue::FromInt(*min_max_pair.second));
-  int actual_count_range = CountRows(table,
-                                     { inclusive_predicate_clone2, ge_predicate, le_predicate });
-  EXPECT_LE(inclusive_values.size(), actual_count_range);
-  EXPECT_GE(inclusive_values.size() + kFalsePositives, actual_count_range);
+  TestWithRangePredicate(inclusive_predicate_clone1, inclusive_predicate_clone2);
 }
 
 class ParameterizedPredicateTest : public PredicateTest,
diff --git a/src/kudu/client/scan_predicate-internal.h b/src/kudu/client/scan_predicate-internal.h
index 5ee276e..543ae32 100644
--- a/src/kudu/client/scan_predicate-internal.h
+++ b/src/kudu/client/scan_predicate-internal.h
@@ -109,6 +109,53 @@ class InBloomFilterPredicateData : public KuduPredicate::Data {
   std::vector<std::unique_ptr<KuduBloomFilter>> bloom_filters_;
 };
 
+// Custom deleter to be used with smart pointers in InDirectBloomFilterPredicateData.
+// Allows use of smart pointers when the underlying raw pointer may or may not be
+// owned by the user. See comment in InDirectBloomFilterPredicateData for more context.
+template<typename T>
+struct DirectBloomFilterDataDeleter {
+  explicit DirectBloomFilterDataDeleter(bool owned) : owned_(owned) {
+  }
+
+  void operator()(T* ptr) {
+    if (owned_) {
+      delete ptr;
+    }
+  }
+ private:
+  bool owned_;
+};
+
+typedef std::unique_ptr<BlockBloomFilter, DirectBloomFilterDataDeleter<BlockBloomFilter>>
+    DirectBlockBloomFilterUniqPtr;
+
+class InDirectBloomFilterPredicateData : public KuduPredicate::Data {
+ public:
+  InDirectBloomFilterPredicateData(
+      ColumnSchema col,
+      std::shared_ptr<BlockBloomFilterBufferAllocatorIf> allocator,
+      std::vector<DirectBlockBloomFilterUniqPtr> bloom_filters)
+      : col_(std::move(col)),
+        allocator_(std::move(allocator)),
+        bloom_filters_(std::move(bloom_filters)) {
+  }
+
+  Status AddToScanSpec(ScanSpec* spec, Arena* arena) override;
+
+  InDirectBloomFilterPredicateData* Clone() const override;
+
+ private:
+  ColumnSchema col_;
+  // This class is designed to accept BlockBloomFilter directly as raw pointer
+  // from consumers like Impala. In such a case, the data is owned by the caller and not
+  // by the instance of this class. So storing raw pointers and not destructing the pointers would
+  // have worked fine. However for the case when predicate data is Clone()'d the internal data
+  // is owned by the instance of this class. Hence using smart pointers with custom deleter
+  // DirectBloomFilterDataDeleter to keep track of ownership.
+  std::shared_ptr<BlockBloomFilterBufferAllocatorIf> allocator_;
+  std::vector<DirectBlockBloomFilterUniqPtr> bloom_filters_;
+};
+
 // A list predicate for a column and a list of constant values.
 class InListPredicateData : public KuduPredicate::Data {
  public:
diff --git a/src/kudu/client/scan_predicate.cc b/src/kudu/client/scan_predicate.cc
index 94abe4c..fd4f352 100644
--- a/src/kudu/client/scan_predicate.cc
+++ b/src/kudu/client/scan_predicate.cc
@@ -153,18 +153,30 @@ Status InListPredicateData::AddToScanSpec(ScanSpec* spec, Arena* /*arena*/) {
   return Status::OK();
 }
 
-Status InBloomFilterPredicateData::AddToScanSpec(ScanSpec* spec, Arena* /*arena*/) {
+// Helper function to add Bloom filters of different types to the scan spec.
+// "func" is a functor that provides access to the underlying BlockBloomFilter ptr.
+template<typename BloomFilterType, typename BloomFilterPtrFuncType>
+static Status AddBloomFiltersToScanSpec(const ColumnSchema& col, ScanSpec* spec,
+                                        const vector<BloomFilterType>& bloom_filters,
+                                        BloomFilterPtrFuncType func) {
   // Extract the BlockBloomFilters.
   vector<BlockBloomFilter*> block_bloom_filters;
-  block_bloom_filters.reserve(bloom_filters_.size());
-  for (const auto& bf : bloom_filters_) {
-    block_bloom_filters.push_back(bf->data_->bloom_filter_.get());
+  block_bloom_filters.reserve(bloom_filters.size());
+  for (const auto& bf : bloom_filters) {
+    block_bloom_filters.push_back(func(bf));
   }
 
-  spec->AddPredicate(ColumnPredicate::InBloomFilter(col_, std::move(block_bloom_filters)));
+  spec->AddPredicate(ColumnPredicate::InBloomFilter(col, std::move(block_bloom_filters)));
   return Status::OK();
 }
 
+Status InBloomFilterPredicateData::AddToScanSpec(ScanSpec* spec, Arena* /*arena*/) {
+  return AddBloomFiltersToScanSpec(col_, spec, bloom_filters_,
+                                   [](const unique_ptr<KuduBloomFilter>& bf) {
+                                     return bf->data_->bloom_filter_.get();
+                                   });
+}
+
 InBloomFilterPredicateData* client::InBloomFilterPredicateData::Clone() const {
   vector<unique_ptr<KuduBloomFilter>> bloom_filter_clones;
   bloom_filter_clones.reserve(bloom_filters_.size());
@@ -175,6 +187,35 @@ InBloomFilterPredicateData* client::InBloomFilterPredicateData::Clone() const {
   return new InBloomFilterPredicateData(col_, std::move(bloom_filter_clones));
 }
 
+Status InDirectBloomFilterPredicateData::AddToScanSpec(ScanSpec* spec, Arena* /*arena*/) {
+  return AddBloomFiltersToScanSpec(col_, spec, bloom_filters_,
+                                   [](const DirectBlockBloomFilterUniqPtr& bf) {
+                                     return bf.get();
+                                   });
+}
+
+InDirectBloomFilterPredicateData* InDirectBloomFilterPredicateData::Clone() const {
+  // In the clone case, the objects are owned by InDirectBloomFilterPredicateData
+  // and hence use the default deleter with shared_ptr.
+  shared_ptr<BlockBloomFilterBufferAllocatorIf> allocator_clone =
+      CHECK_NOTNULL(allocator_->Clone());
+
+  vector<DirectBlockBloomFilterUniqPtr> bloom_filter_clones;
+  bloom_filter_clones.reserve(bloom_filters_.size());
+  for (const auto& bf : bloom_filters_) {
+    unique_ptr<BlockBloomFilter> bf_clone;
+    CHECK_OK(bf->Clone(allocator_clone.get(), &bf_clone));
+
+    // Similarly for unique_ptr, specify that the BlockBloomFilter is owned by
+    // InDirectBloomFilterPredicateData.
+    DirectBlockBloomFilterUniqPtr direct_bf_clone(
+        bf_clone.release(), DirectBloomFilterDataDeleter<BlockBloomFilter>(true /*owned*/));
+    bloom_filter_clones.emplace_back(std::move(direct_bf_clone));
+  }
+  return new InDirectBloomFilterPredicateData(col_, std::move(allocator_clone),
+                                              std::move(bloom_filter_clones));
+}
+
 KuduBloomFilter::KuduBloomFilter()  {
   data_ = new Data();
 }