You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@parquet.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2018/08/15 12:58:00 UTC

[jira] [Commented] (PARQUET-1332) [C++] Add bloom filter utility class

    [ https://issues.apache.org/jira/browse/PARQUET-1332?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16581039#comment-16581039 ] 

ASF GitHub Bot commented on PARQUET-1332:
-----------------------------------------

wesm closed pull request #432: PARQUET-1332: Add bloom filter for parquet
URL: https://github.com/apache/parquet-cpp/pull/432
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/CMakeLists.txt b/CMakeLists.txt
index b53f5980..f03c53f4 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -674,6 +674,7 @@ set(LIBPARQUET_SRCS
   src/parquet/arrow/record_reader.cc
   src/parquet/arrow/schema.cc
   src/parquet/arrow/writer.cc
+  src/parquet/bloom_filter.cc
   src/parquet/column_reader.cc
   src/parquet/column_scanner.cc
   src/parquet/column_writer.cc
@@ -681,6 +682,7 @@ set(LIBPARQUET_SRCS
   src/parquet/file_reader.cc
   src/parquet/file_writer.cc
   src/parquet/metadata.cc
+  src/parquet/murmur3.cc
   src/parquet/parquet_constants.cpp
   src/parquet/parquet_types.cpp
   src/parquet/printer.cc
diff --git a/data/bloom_filter.bin b/data/bloom_filter.bin
new file mode 100644
index 00000000..c0e30ce7
Binary files /dev/null and b/data/bloom_filter.bin differ
diff --git a/src/parquet/CMakeLists.txt b/src/parquet/CMakeLists.txt
index bc16d8bd..93a242c6 100644
--- a/src/parquet/CMakeLists.txt
+++ b/src/parquet/CMakeLists.txt
@@ -17,6 +17,7 @@
 
 # Headers: top level
 install(FILES
+  bloom_filter.h
   column_reader.h
   column_page.h
   column_scanner.h
@@ -25,7 +26,9 @@ install(FILES
   exception.h
   file_reader.h
   file_writer.h
+  hasher.h
   metadata.h
+  murmur3.h
   printer.h
   properties.h
   schema.h
@@ -50,6 +53,7 @@ install(FILES
   "${CMAKE_CURRENT_BINARY_DIR}/parquet.pc"
   DESTINATION "${CMAKE_INSTALL_LIBDIR}/pkgconfig/")
 
+ADD_PARQUET_TEST(bloom_filter-test)
 ADD_PARQUET_TEST(column_reader-test)
 ADD_PARQUET_TEST(column_scanner-test)
 ADD_PARQUET_TEST(column_writer-test)
diff --git a/src/parquet/README b/src/parquet/README
new file mode 100644
index 00000000..fc16a46c
--- /dev/null
+++ b/src/parquet/README
@@ -0,0 +1,10 @@
+The CompatibilityTest of bloom_filter-test.cc is used to test cross compatibility of
+Bloom filters between parquet-mr and parquet-cpp. It reads the Bloom filter binary
+generated by the Bloom filter class in the parquet-mr project and tests whether the
+values inserted before could be filtered or not.
+
+The Bloom filter binary is generated by three steps from Parquet-mr:
+Step 1: Construct a Bloom filter with 1024 bytes of bitset.
+Step 2: Insert hashes of "hello", "parquet", "bloom", "filter" strings to Bloom filter
+by calling hash and insert APIs.
+Step 3: Call writeTo API to write to File.
diff --git a/src/parquet/bloom_filter-test.cc b/src/parquet/bloom_filter-test.cc
new file mode 100644
index 00000000..dbef8c8b
--- /dev/null
+++ b/src/parquet/bloom_filter-test.cc
@@ -0,0 +1,241 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gtest/gtest.h>
+
+#include <algorithm>
+#include <random>
+#include <string>
+
+#include "arrow/io/file.h"
+#include "parquet/bloom_filter.h"
+#include "parquet/murmur3.h"
+#include "parquet/util/memory.h"
+#include "parquet/util/test-common.h"
+
+namespace parquet {
+namespace test {
+TEST(Murmur3Test, TestBloomFilter) {
+  uint64_t result;
+  const uint8_t bitset[8] = {0x0, 0x1, 0x2, 0x3, 0x4, 0x5, 0x6, 0x7};
+  ByteArray byteArray(8, bitset);
+  MurmurHash3 murmur3;
+  result = murmur3.Hash(&byteArray);
+  EXPECT_EQ(result, UINT64_C(913737700387071329));
+}
+
+TEST(ConstructorTest, TestBloomFilter) {
+  BlockSplitBloomFilter bloom_filter;
+  EXPECT_NO_THROW(bloom_filter.Init(1000));
+
+  // It throws because the length cannot be zero
+  std::unique_ptr<uint8_t[]> bitset1(new uint8_t[1024]());
+  EXPECT_THROW(bloom_filter.Init(bitset1.get(), 0), ParquetException);
+
+  // It throws because the number of bytes of Bloom filter bitset must be a power of 2.
+  std::unique_ptr<uint8_t[]> bitset2(new uint8_t[1024]());
+  EXPECT_THROW(bloom_filter.Init(bitset2.get(), 1023), ParquetException);
+}
+
+// The BasicTest is used to test basic operations including InsertHash, FindHash and
+// serializing and de-serializing.
+TEST(BasicTest, TestBloomFilter) {
+  BlockSplitBloomFilter bloom_filter;
+  bloom_filter.Init(1024);
+
+  for (int i = 0; i < 10; i++) {
+    bloom_filter.InsertHash(bloom_filter.Hash(i));
+  }
+
+  for (int i = 0; i < 10; i++) {
+    EXPECT_TRUE(bloom_filter.FindHash(bloom_filter.Hash(i)));
+  }
+
+  // Serialize Bloom filter to memory output stream
+  InMemoryOutputStream sink;
+  bloom_filter.WriteTo(&sink);
+
+  // Deserialize Bloom filter from memory
+  InMemoryInputStream source(sink.GetBuffer());
+
+  BlockSplitBloomFilter de_bloom = std::move(BlockSplitBloomFilter::Deserialize(&source));
+
+  for (int i = 0; i < 10; i++) {
+    EXPECT_TRUE(de_bloom.FindHash(de_bloom.Hash(i)));
+  }
+}
+
+// Helper function to generate random string.
+std::string GetRandomString(uint32_t length) {
+  // Character set used to generate random string
+  const std::string charset =
+      "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz";
+
+  // The uuid_seed was generated by "uuidgen -r"
+  const std::string uuid_seed = "8de406aa-fb59-4195-a81c-5152af26433f";
+  std::seed_seq seed(uuid_seed.begin(), uuid_seed.end());
+  std::mt19937 generator(seed);
+  std::uniform_int_distribution<uint32_t> dist(0, static_cast<int>(charset.size() - 1));
+  std::string ret = "";
+
+  for (uint32_t i = 0; i < length; i++) {
+    ret += charset[dist(generator)];
+  }
+
+  return ret;
+}
+
+TEST(FPPTest, TestBloomFilter) {
+  // It counts the number of times FindHash returns true.
+  int exist = 0;
+
+  // Total count of elements that will be used
+  const int total_count = 100000;
+
+  // Bloom filter fpp parameter
+  const double fpp = 0.01;
+
+  std::vector<std::string> members;
+  BlockSplitBloomFilter bloom_filter;
+  bloom_filter.Init(BlockSplitBloomFilter::OptimalNumOfBits(total_count, fpp));
+
+  // Insert elements into the Bloom filter
+  for (int i = 0; i < total_count; i++) {
+    // Insert random string which length is 8
+    std::string tmp = GetRandomString(8);
+    const ByteArray byte_array(8, reinterpret_cast<const uint8_t*>(tmp.c_str()));
+    members.push_back(tmp);
+    bloom_filter.InsertHash(bloom_filter.Hash(&byte_array));
+  }
+
+  for (int i = 0; i < total_count; i++) {
+    const ByteArray byte_array1(8, reinterpret_cast<const uint8_t*>(members[i].c_str()));
+    ASSERT_TRUE(bloom_filter.FindHash(bloom_filter.Hash(&byte_array1)));
+    std::string tmp = GetRandomString(7);
+    const ByteArray byte_array2(7, reinterpret_cast<const uint8_t*>(tmp.c_str()));
+
+    if (bloom_filter.FindHash(bloom_filter.Hash(&byte_array2))) {
+      exist++;
+    }
+  }
+
+  // The exist should be probably less than 1000 according default FPP 0.01.
+  EXPECT_TRUE(exist < total_count * fpp);
+}
+
+// The CompatibilityTest is used to test cross compatibility with parquet-mr, it reads
+// the Bloom filter binary generated by the Bloom filter class in the parquet-mr project
+// and tests whether the values inserted before could be filtered or not.
+
+// The Bloom filter binary is generated by three steps in from Parquet-mr.
+// Step 1: Construct a Bloom filter with 1024 bytes bitset.
+// Step 2: Insert "hello", "parquet", "bloom", "filter" to Bloom filter.
+// Step 3: Call writeTo API to write to File.
+TEST(CompatibilityTest, TestBloomFilter) {
+  const std::string test_string[4] = {"hello", "parquet", "bloom", "filter"};
+  const std::string bloom_filter_test_binary =
+      std::string(test::get_data_dir()) + "/bloom_filter.bin";
+  std::shared_ptr<::arrow::io::ReadableFile> handle;
+
+  int64_t size;
+  PARQUET_THROW_NOT_OK(
+      ::arrow::io::ReadableFile::Open(bloom_filter_test_binary, &handle));
+  PARQUET_THROW_NOT_OK(handle->GetSize(&size));
+
+  // 1024 bytes (bitset) + 4 bytes (hash) + 4 bytes (algorithm) + 4 bytes (length)
+  EXPECT_EQ(size, 1036);
+
+  std::unique_ptr<uint8_t[]> bitset(new uint8_t[size]());
+  std::shared_ptr<Buffer> buffer(new Buffer(bitset.get(), size));
+  handle->Read(size, &buffer);
+
+  InMemoryInputStream source(buffer);
+  BlockSplitBloomFilter bloom_filter1 =
+      std::move(BlockSplitBloomFilter::Deserialize(&source));
+
+  for (int i = 0; i < 4; i++) {
+    const ByteArray tmp(static_cast<uint32_t>(test_string[i].length()),
+                        reinterpret_cast<const uint8_t*>(test_string[i].c_str()));
+    EXPECT_TRUE(bloom_filter1.FindHash(bloom_filter1.Hash(&tmp)));
+  }
+
+  // The following is used to check whether the new created Bloom filter in parquet-cpp is
+  // byte-for-byte identical to file at bloom_data_path which is created from parquet-mr
+  // with same inserted hashes.
+  BlockSplitBloomFilter bloom_filter2;
+  bloom_filter2.Init(bloom_filter1.GetBitsetSize());
+  for (int i = 0; i < 4; i++) {
+    const ByteArray byte_array(static_cast<uint32_t>(test_string[i].length()),
+                               reinterpret_cast<const uint8_t*>(test_string[i].c_str()));
+    bloom_filter2.InsertHash(bloom_filter2.Hash(&byte_array));
+  }
+
+  // Serialize Bloom filter to memory output stream
+  InMemoryOutputStream sink;
+  bloom_filter2.WriteTo(&sink);
+  std::shared_ptr<Buffer> buffer1 = sink.GetBuffer();
+
+  handle->Seek(0);
+  handle->GetSize(&size);
+  std::shared_ptr<Buffer> buffer2;
+  handle->Read(size, &buffer2);
+
+  EXPECT_TRUE((*buffer1).Equals(*buffer2));
+}
+
+// OptmialValueTest is used to test whether OptimalNumOfBits returns expected
+// numbers according to formula:
+//     num_of_bits = -8.0 * ndv / log(1 - pow(fpp, 1.0 / 8.0))
+// where ndv is the number of distinct values and fpp is the false positive probability.
+// Also it is used to test whether OptimalNumOfBits returns value between
+// [MINIMUM_BLOOM_FILTER_SIZE, MAXIMUM_BLOOM_FILTER_SIZE].
+TEST(OptimalValueTest, TestBloomFilter) {
+  EXPECT_EQ(BlockSplitBloomFilter::OptimalNumOfBits(256, 0.01), UINT32_C(4096));
+  EXPECT_EQ(BlockSplitBloomFilter::OptimalNumOfBits(512, 0.01), UINT32_C(8192));
+  EXPECT_EQ(BlockSplitBloomFilter::OptimalNumOfBits(1024, 0.01), UINT32_C(16384));
+  EXPECT_EQ(BlockSplitBloomFilter::OptimalNumOfBits(2048, 0.01), UINT32_C(32768));
+
+  EXPECT_EQ(BlockSplitBloomFilter::OptimalNumOfBits(200, 0.01), UINT32_C(2048));
+  EXPECT_EQ(BlockSplitBloomFilter::OptimalNumOfBits(300, 0.01), UINT32_C(4096));
+  EXPECT_EQ(BlockSplitBloomFilter::OptimalNumOfBits(700, 0.01), UINT32_C(8192));
+  EXPECT_EQ(BlockSplitBloomFilter::OptimalNumOfBits(1500, 0.01), UINT32_C(16384));
+
+  EXPECT_EQ(BlockSplitBloomFilter::OptimalNumOfBits(200, 0.025), UINT32_C(2048));
+  EXPECT_EQ(BlockSplitBloomFilter::OptimalNumOfBits(300, 0.025), UINT32_C(4096));
+  EXPECT_EQ(BlockSplitBloomFilter::OptimalNumOfBits(700, 0.025), UINT32_C(8192));
+  EXPECT_EQ(BlockSplitBloomFilter::OptimalNumOfBits(1500, 0.025), UINT32_C(16384));
+
+  EXPECT_EQ(BlockSplitBloomFilter::OptimalNumOfBits(200, 0.05), UINT32_C(2048));
+  EXPECT_EQ(BlockSplitBloomFilter::OptimalNumOfBits(300, 0.05), UINT32_C(4096));
+  EXPECT_EQ(BlockSplitBloomFilter::OptimalNumOfBits(700, 0.05), UINT32_C(8192));
+  EXPECT_EQ(BlockSplitBloomFilter::OptimalNumOfBits(1500, 0.05), UINT32_C(16384));
+
+  // Boundary check
+  EXPECT_EQ(BlockSplitBloomFilter::OptimalNumOfBits(4, 0.01), UINT32_C(256));
+  EXPECT_EQ(BlockSplitBloomFilter::OptimalNumOfBits(4, 0.25), UINT32_C(256));
+
+  EXPECT_EQ(
+      BlockSplitBloomFilter::OptimalNumOfBits(std::numeric_limits<uint32_t>::max(), 0.01),
+      UINT32_C(1073741824));
+  EXPECT_EQ(
+      BlockSplitBloomFilter::OptimalNumOfBits(std::numeric_limits<uint32_t>::max(), 0.25),
+      UINT32_C(1073741824));
+}
+
+}  // namespace test
+
+}  // namespace parquet
diff --git a/src/parquet/bloom_filter.cc b/src/parquet/bloom_filter.cc
new file mode 100644
index 00000000..faa344cb
--- /dev/null
+++ b/src/parquet/bloom_filter.cc
@@ -0,0 +1,166 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <algorithm>
+#include <cmath>
+#include <cstdint>
+
+#include "arrow/status.h"
+#include "arrow/util/bit-util.h"
+#include "parquet/bloom_filter.h"
+#include "parquet/exception.h"
+#include "parquet/murmur3.h"
+#include "parquet/types.h"
+#include "parquet/util/logging.h"
+
+namespace parquet {
+constexpr uint32_t BlockSplitBloomFilter::SALT[kBitsSetPerBlock];
+
+BlockSplitBloomFilter::BlockSplitBloomFilter()
+    : pool_(::arrow::default_memory_pool()),
+      hash_strategy_(HashStrategy::MURMUR3_X64_128),
+      algorithm_(Algorithm::BLOCK) {}
+
+void BlockSplitBloomFilter::Init(uint32_t num_bytes) {
+  if (num_bytes < kMinimumBloomFilterBytes) {
+    num_bytes = kMinimumBloomFilterBytes;
+  }
+
+  // Get next power of 2 if it is not power of 2.
+  if ((num_bytes & (num_bytes - 1)) != 0) {
+    num_bytes = static_cast<uint32_t>(::arrow::BitUtil::NextPower2(num_bytes));
+  }
+
+  if (num_bytes > kMaximumBloomFilterBytes) {
+    num_bytes = kMaximumBloomFilterBytes;
+  }
+
+  num_bytes_ = num_bytes;
+  PARQUET_THROW_NOT_OK(::arrow::AllocateBuffer(pool_, num_bytes_, &data_));
+  memset(data_->mutable_data(), 0, num_bytes_);
+
+  this->hasher_.reset(new MurmurHash3());
+}
+
+void BlockSplitBloomFilter::Init(const uint8_t* bitset, uint32_t num_bytes) {
+  DCHECK(bitset != nullptr);
+
+  if (num_bytes < kMinimumBloomFilterBytes || num_bytes > kMaximumBloomFilterBytes ||
+      (num_bytes & (num_bytes - 1)) != 0) {
+    throw ParquetException("Given length of bitset is illegal");
+  }
+
+  num_bytes_ = num_bytes;
+  PARQUET_THROW_NOT_OK(::arrow::AllocateBuffer(pool_, num_bytes_, &data_));
+  memcpy(data_->mutable_data(), bitset, num_bytes_);
+
+  this->hasher_.reset(new MurmurHash3());
+}
+
+BlockSplitBloomFilter BlockSplitBloomFilter::Deserialize(InputStream* input) {
+  int64_t bytes_available;
+
+  const uint8_t* read_buffer = NULL;
+  read_buffer = input->Read(sizeof(uint32_t), &bytes_available);
+  if (static_cast<uint32_t>(bytes_available) != sizeof(uint32_t) || !read_buffer) {
+    throw ParquetException("Failed to deserialize from input stream");
+  }
+  uint32_t len;
+  memcpy(&len, read_buffer, sizeof(uint32_t));
+
+  read_buffer = input->Read(sizeof(uint32_t), &bytes_available);
+  if (static_cast<uint32_t>(bytes_available) != sizeof(uint32_t) || !read_buffer) {
+    throw ParquetException("Failed to deserialize from input stream");
+  }
+  uint32_t hash;
+  memcpy(&hash, read_buffer, sizeof(uint32_t));
+  if (static_cast<HashStrategy>(hash) != HashStrategy::MURMUR3_X64_128) {
+    throw ParquetException("Unsupported hash strategy");
+  }
+
+  read_buffer = input->Read(sizeof(uint32_t), &bytes_available);
+  if (static_cast<uint32_t>(bytes_available) != sizeof(uint32_t) || !read_buffer) {
+    throw ParquetException("Failed to deserialize from input stream");
+  }
+  uint32_t algorithm;
+  memcpy(&algorithm, read_buffer, sizeof(uint32_t));
+  if (static_cast<Algorithm>(algorithm) != BloomFilter::Algorithm::BLOCK) {
+    throw ParquetException("Unsupported Bloom filter algorithm");
+  }
+
+  BlockSplitBloomFilter bloom_filter;
+  bloom_filter.Init(input->Read(len, &bytes_available), len);
+  return bloom_filter;
+}
+
+void BlockSplitBloomFilter::WriteTo(OutputStream* sink) const {
+  DCHECK(sink != nullptr);
+
+  sink->Write(reinterpret_cast<const uint8_t*>(&num_bytes_), sizeof(num_bytes_));
+  sink->Write(reinterpret_cast<const uint8_t*>(&hash_strategy_), sizeof(hash_strategy_));
+  sink->Write(reinterpret_cast<const uint8_t*>(&algorithm_), sizeof(algorithm_));
+  sink->Write(data_->mutable_data(), num_bytes_);
+}
+
+void BlockSplitBloomFilter::SetMask(uint32_t key, BlockMask& block_mask) const {
+  for (int i = 0; i < kBitsSetPerBlock; ++i) {
+    block_mask.item[i] = key * SALT[i];
+  }
+
+  for (int i = 0; i < kBitsSetPerBlock; ++i) {
+    block_mask.item[i] = block_mask.item[i] >> 27;
+  }
+
+  for (int i = 0; i < kBitsSetPerBlock; ++i) {
+    block_mask.item[i] = UINT32_C(0x1) << block_mask.item[i];
+  }
+}
+
+bool BlockSplitBloomFilter::FindHash(uint64_t hash) const {
+  const uint32_t bucket_index =
+      static_cast<uint32_t>((hash >> 32) & (num_bytes_ / kBytesPerFilterBlock - 1));
+  uint32_t key = static_cast<uint32_t>(hash);
+  uint32_t* bitset32 = reinterpret_cast<uint32_t*>(data_->mutable_data());
+
+  // Calculate mask for bucket.
+  BlockMask block_mask;
+  SetMask(key, block_mask);
+
+  for (int i = 0; i < kBitsSetPerBlock; ++i) {
+    if (0 == (bitset32[kBitsSetPerBlock * bucket_index + i] & block_mask.item[i])) {
+      return false;
+    }
+  }
+  return true;
+}
+
+void BlockSplitBloomFilter::InsertHash(uint64_t hash) {
+  const uint32_t bucket_index =
+      static_cast<uint32_t>(hash >> 32) & (num_bytes_ / kBytesPerFilterBlock - 1);
+  uint32_t key = static_cast<uint32_t>(hash);
+  uint32_t* bitset32 = reinterpret_cast<uint32_t*>(data_->mutable_data());
+
+  // Calculate mask for bucket.
+  BlockMask block_mask;
+  SetMask(key, block_mask);
+
+  for (int i = 0; i < kBitsSetPerBlock; i++) {
+    bitset32[bucket_index * kBitsSetPerBlock + i] |= block_mask.item[i];
+  }
+}
+
+}  // namespace parquet
diff --git a/src/parquet/bloom_filter.h b/src/parquet/bloom_filter.h
new file mode 100644
index 00000000..e39370a4
--- /dev/null
+++ b/src/parquet/bloom_filter.h
@@ -0,0 +1,244 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef PARQUET_BLOOM_FILTER_H
+#define PARQUET_BLOOM_FILTER_H
+
+#include <cstdint>
+
+#include "parquet/exception.h"
+#include "parquet/hasher.h"
+#include "parquet/types.h"
+#include "parquet/util/logging.h"
+#include "parquet/util/memory.h"
+
+namespace parquet {
+class OutputStream;
+
+// A Bloom filter is a compact structure to indicate whether an item is not in a set or
+// probably in a set. The Bloom filter usually consists of a bit set that represents a
+// set of elements, a hash strategy and a Bloom filter algorithm.
+class BloomFilter {
+ public:
+  // Maximum Bloom filter size, it sets to HDFS default block size 128MB
+  // This value will be reconsidered when implementing Bloom filter producer.
+  static constexpr uint32_t kMaximumBloomFilterBytes = 128 * 1024 * 1024;
+
+  /// Determine whether an element exist in set or not.
+  ///
+  /// @param hash the element to contain.
+  /// @return false if value is definitely not in set, and true means PROBABLY
+  /// in set.
+  virtual bool FindHash(uint64_t hash) const = 0;
+
+  /// Insert element to set represented by Bloom filter bitset.
+  /// @param hash the hash of value to insert into Bloom filter.
+  virtual void InsertHash(uint64_t hash) = 0;
+
+  /// Write this Bloom filter to an output stream. A Bloom filter structure should
+  /// include bitset length, hash strategy, algorithm, and bitset.
+  ///
+  /// @param sink the output stream to write
+  virtual void WriteTo(OutputStream* sink) const = 0;
+
+  /// Get the number of bytes of bitset
+  virtual uint32_t GetBitsetSize() const = 0;
+
+  /// Compute hash for 32 bits value by using its plain encoding result.
+  ///
+  /// @param value the value to hash.
+  /// @return hash result.
+  virtual uint64_t Hash(int32_t value) const = 0;
+
+  /// Compute hash for 64 bits value by using its plain encoding result.
+  ///
+  /// @param value the value to hash.
+  /// @return hash result.
+  virtual uint64_t Hash(int64_t value) const = 0;
+
+  /// Compute hash for float value by using its plain encoding result.
+  ///
+  /// @param value the value to hash.
+  /// @return hash result.
+  virtual uint64_t Hash(float value) const = 0;
+
+  /// Compute hash for double value by using its plain encoding result.
+  ///
+  /// @param value the value to hash.
+  /// @return hash result.
+  virtual uint64_t Hash(double value) const = 0;
+
+  /// Compute hash for Int96 value by using its plain encoding result.
+  ///
+  /// @param value the value to hash.
+  /// @return hash result.
+  virtual uint64_t Hash(const Int96* value) const = 0;
+
+  /// Compute hash for ByteArray value by using its plain encoding result.
+  ///
+  /// @param value the value to hash.
+  /// @return hash result.
+  virtual uint64_t Hash(const ByteArray* value) const = 0;
+
+  /// Compute hash for fixed byte array value by using its plain encoding result.
+  ///
+  /// @param value the value to hash.
+  /// @return hash result.
+  virtual uint64_t Hash(const FLBA* value, uint32_t len) const = 0;
+
+  virtual ~BloomFilter() {}
+
+ protected:
+  // Hash strategy available for Bloom filter.
+  enum class HashStrategy : uint32_t { MURMUR3_X64_128 = 0 };
+
+  // Bloom filter algorithm.
+  enum class Algorithm : uint32_t { BLOCK = 0 };
+};
+
+// The BlockSplitBloomFilter is implemented using block-based Bloom filters from
+// Putze et al.'s "Cache-,Hash- and Space-Efficient Bloom filters". The basic idea is to
+// hash the item to a tiny Bloom filter which size fit a single cache line or smaller.
+//
+// This implementation sets 8 bits in each tiny Bloom filter. Each tiny Bloom
+// filter is 32 bytes to take advantage of 32-byte SIMD instructions.
+class BlockSplitBloomFilter : public BloomFilter {
+ public:
+  /// The constructor of BlockSplitBloomFilter. It uses murmur3_x64_128 as hash function.
+  BlockSplitBloomFilter();
+
+  /// Initialize the BlockSplitBloomFilter. The range of num_bytes should be within
+  /// [kMinimumBloomFilterBytes, kMaximumBloomFilterBytes], it will be
+  /// rounded up/down to lower/upper bound if num_bytes is out of range and also
+  /// will be rounded up to a power of 2.
+  ///
+  /// @param num_bytes The number of bytes to store Bloom filter bitset.
+  void Init(uint32_t num_bytes);
+
+  /// Initialize the BlockSplitBloomFilter. It copies the bitset as underlying
+  /// bitset because the given bitset may not satisfy the 32-byte alignment requirement
+  /// which may lead to segfault when performing SIMD instructions. It is the caller's
+  /// responsibility to free the bitset passed in. This is used when reconstructing
+  /// a Bloom filter from a parquet file.
+  ///
+  /// @param bitset The given bitset to initialize the Bloom filter.
+  /// @param num_bytes  The number of bytes of given bitset.
+  void Init(const uint8_t* bitset, uint32_t num_bytes);
+
+  // Minimum Bloom filter size, it sets to 32 bytes to fit a tiny Bloom filter.
+  static constexpr uint32_t kMinimumBloomFilterBytes = 32;
+
+  /// Calculate optimal size according to the number of distinct values and false
+  /// positive probability.
+  ///
+  /// @param ndv The number of distinct values.
+  /// @param fpp The false positive probability.
+  /// @return it always return a value between kMinimumBloomFilterBytes and
+  /// kMaximumBloomFilterBytes, and the return value is always a power of 2
+  static uint32_t OptimalNumOfBits(uint32_t ndv, double fpp) {
+    DCHECK(fpp > 0.0 && fpp < 1.0);
+    const double m = -8.0 * ndv / log(1 - pow(fpp, 1.0 / 8));
+    uint32_t num_bits = static_cast<uint32_t>(m);
+
+    // Handle overflow.
+    if (m < 0 || m > kMaximumBloomFilterBytes << 3) {
+      num_bits = static_cast<uint32_t>(kMaximumBloomFilterBytes << 3);
+    }
+
+    // Round up to lower bound
+    if (num_bits < kMinimumBloomFilterBytes << 3) {
+      num_bits = kMinimumBloomFilterBytes << 3;
+    }
+
+    // Get next power of 2 if bits is not power of 2.
+    if ((num_bits & (num_bits - 1)) != 0) {
+      num_bits = static_cast<uint32_t>(::arrow::BitUtil::NextPower2(num_bits));
+    }
+
+    // Round down to upper bound
+    if (num_bits > kMaximumBloomFilterBytes << 3) {
+      num_bits = kMaximumBloomFilterBytes << 3;
+    }
+
+    return num_bits;
+  }
+
+  bool FindHash(uint64_t hash) const override;
+  void InsertHash(uint64_t hash) override;
+  void WriteTo(OutputStream* sink) const override;
+  uint32_t GetBitsetSize() const override { return num_bytes_; }
+  uint64_t Hash(int64_t value) const override { return hasher_->Hash(value); }
+  uint64_t Hash(float value) const override { return hasher_->Hash(value); }
+  uint64_t Hash(double value) const override { return hasher_->Hash(value); }
+  uint64_t Hash(const Int96* value) const override { return hasher_->Hash(value); }
+  uint64_t Hash(const ByteArray* value) const override { return hasher_->Hash(value); }
+  uint64_t Hash(int32_t value) const override { return hasher_->Hash(value); }
+  uint64_t Hash(const FLBA* value, uint32_t len) const override {
+    return hasher_->Hash(value, len);
+  }
+  /// Deserialize the Bloom filter from an input stream. It is used when reconstructing
+  /// a Bloom filter from a parquet filter.
+  ///
+  /// @param input_stream The input stream from which to construct the Bloom filter
+  /// @return The BlockSplitBloomFilter.
+  static BlockSplitBloomFilter Deserialize(InputStream* input_stream);
+
+ private:
+  // Bytes in a tiny Bloom filter block.
+  static constexpr int kBytesPerFilterBlock = 32;
+
+  // The number of bits to be set in each tiny Bloom filter
+  static constexpr int kBitsSetPerBlock = 8;
+
+  // A mask structure used to set bits in each tiny Bloom filter.
+  struct BlockMask {
+    uint32_t item[kBitsSetPerBlock];
+  };
+
+  // The block-based algorithm needs eight odd SALT values to calculate eight indexes
+  // of bit to set, one bit in each 32-bit word.
+  static constexpr uint32_t SALT[kBitsSetPerBlock] = {
+      0x47b6137bU, 0x44974d91U, 0x8824ad5bU, 0xa2b7289dU,
+      0x705495c7U, 0x2df1424bU, 0x9efc4947U, 0x5c6bfb31U};
+
+  /// Set bits in mask array according to input key.
+  /// @param key the value to calculate mask values.
+  /// @param mask the mask array is used to set inside a block
+  void SetMask(uint32_t key, BlockMask& mask) const;
+
+  // Memory pool to allocate aligned buffer for bitset
+  ::arrow::MemoryPool* pool_;
+
+  // The underlying buffer of bitset.
+  std::shared_ptr<Buffer> data_;
+
+  // The number of bytes of Bloom filter bitset.
+  uint32_t num_bytes_;
+
+  // Hash strategy used in this Bloom filter.
+  HashStrategy hash_strategy_;
+
+  // Algorithm used in this Bloom filter.
+  Algorithm algorithm_;
+
+  // The hash pointer points to actual hash class used.
+  std::unique_ptr<Hasher> hasher_;
+};
+
+}  // namespace parquet
+
+#endif  // PARQUET_BLOOM_FILTER_H
diff --git a/src/parquet/hasher.h b/src/parquet/hasher.h
new file mode 100644
index 00000000..dc316a03
--- /dev/null
+++ b/src/parquet/hasher.h
@@ -0,0 +1,75 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef PARQUET_HASHER_H
+#define PARQUET_HASHER_H
+
+#include <cstdint>
+#include "parquet/types.h"
+
+namespace parquet {
+// Abstract class for hash
+class Hasher {
+ public:
+  /// Compute hash for 32 bits value by using its plain encoding result.
+  ///
+  /// @param value the value to hash.
+  /// @return hash result.
+  virtual uint64_t Hash(int32_t value) const = 0;
+
+  /// Compute hash for 64 bits value by using its plain encoding result.
+  ///
+  /// @param value the value to hash.
+  /// @return hash result.
+  virtual uint64_t Hash(int64_t value) const = 0;
+
+  /// Compute hash for float value by using its plain encoding result.
+  ///
+  /// @param value the value to hash.
+  /// @return hash result.
+  virtual uint64_t Hash(float value) const = 0;
+
+  /// Compute hash for double value by using its plain encoding result.
+  ///
+  /// @param value the value to hash.
+  /// @return hash result.
+  virtual uint64_t Hash(double value) const = 0;
+
+  /// Compute hash for Int96 value by using its plain encoding result.
+  ///
+  /// @param value the value to hash.
+  /// @return hash result.
+  virtual uint64_t Hash(const Int96* value) const = 0;
+
+  /// Compute hash for ByteArray value by using its plain encoding result.
+  ///
+  /// @param value the value to hash.
+  /// @return hash result.
+  virtual uint64_t Hash(const ByteArray* value) const = 0;
+
+  /// Compute hash for fixed byte array value by using its plain encoding result.
+  ///
+  /// @param value the value to hash.
+  /// @return hash result.
+  virtual uint64_t Hash(const FLBA* value, uint32_t len) const = 0;
+
+  virtual ~Hasher() = default;
+};
+
+}  // namespace parquet
+
+#endif  // PARQUET_HASHER_H
diff --git a/src/parquet/murmur3.cc b/src/parquet/murmur3.cc
new file mode 100644
index 00000000..a19436e3
--- /dev/null
+++ b/src/parquet/murmur3.cc
@@ -0,0 +1,222 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//-----------------------------------------------------------------------------
+// MurmurHash3 was written by Austin Appleby, and is placed in the public
+// domain. The author hereby disclaims copyright to this source code.
+
+// Note - The x86 and x64 versions do _not_ produce the same results, as the
+// algorithms are optimized for their respective platforms. You can still
+// compile and run any of them on any platform, but your performance with the
+// non-native version will be less than optimal.
+
+#include "parquet/murmur3.h"
+
+namespace parquet {
+
+#if defined(_MSC_VER)
+
+#define FORCE_INLINE __forceinline
+#define ROTL64(x, y) _rotl64(x, y)
+
+#else  // defined(_MSC_VER)
+
+#define FORCE_INLINE inline __attribute__((always_inline))
+inline uint64_t rotl64(uint64_t x, int8_t r) { return (x << r) | (x >> (64 - r)); }
+#define ROTL64(x, y) rotl64(x, y)
+
+#endif  // !defined(_MSC_VER)
+
+#define BIG_CONSTANT(x) (x##LLU)
+
+//-----------------------------------------------------------------------------
+// Block read - if your platform needs to do endian-swapping or can only
+// handle aligned reads, do the conversion here
+
+FORCE_INLINE uint32_t getblock32(const uint32_t* p, int i) { return p[i]; }
+
+FORCE_INLINE uint64_t getblock64(const uint64_t* p, int i) { return p[i]; }
+
+//-----------------------------------------------------------------------------
+// Finalization mix - force all bits of a hash block to avalanche
+
+FORCE_INLINE uint32_t fmix32(uint32_t h) {
+  h ^= h >> 16;
+  h *= 0x85ebca6b;
+  h ^= h >> 13;
+  h *= 0xc2b2ae35;
+  h ^= h >> 16;
+
+  return h;
+}
+
+//----------
+
+FORCE_INLINE uint64_t fmix64(uint64_t k) {
+  k ^= k >> 33;
+  k *= BIG_CONSTANT(0xff51afd7ed558ccd);
+  k ^= k >> 33;
+  k *= BIG_CONSTANT(0xc4ceb9fe1a85ec53);
+  k ^= k >> 33;
+
+  return k;
+}
+
+//-----------------------------------------------------------------------------
+
+void Hash_x64_128(const void* key, const int len, const uint32_t seed, uint64_t out[2]) {
+  const uint8_t* data = (const uint8_t*)key;
+  const int nblocks = len / 16;
+
+  uint64_t h1 = seed;
+  uint64_t h2 = seed;
+
+  const uint64_t c1 = BIG_CONSTANT(0x87c37b91114253d5);
+  const uint64_t c2 = BIG_CONSTANT(0x4cf5ad432745937f);
+
+  //----------
+  // body
+
+  const uint64_t* blocks = (const uint64_t*)(data);
+
+  for (int i = 0; i < nblocks; i++) {
+    uint64_t k1 = getblock64(blocks, i * 2 + 0);
+    uint64_t k2 = getblock64(blocks, i * 2 + 1);
+
+    k1 *= c1;
+    k1 = ROTL64(k1, 31);
+    k1 *= c2;
+    h1 ^= k1;
+
+    h1 = ROTL64(h1, 27);
+    h1 += h2;
+    h1 = h1 * 5 + 0x52dce729;
+
+    k2 *= c2;
+    k2 = ROTL64(k2, 33);
+    k2 *= c1;
+    h2 ^= k2;
+
+    h2 = ROTL64(h2, 31);
+    h2 += h1;
+    h2 = h2 * 5 + 0x38495ab5;
+  }
+
+  //----------
+  // tail
+
+  const uint8_t* tail = (const uint8_t*)(data + nblocks * 16);
+
+  uint64_t k1 = 0;
+  uint64_t k2 = 0;
+
+  switch (len & 15) {
+    case 15:
+      k2 ^= ((uint64_t)tail[14]) << 48;
+    case 14:
+      k2 ^= ((uint64_t)tail[13]) << 40;
+    case 13:
+      k2 ^= ((uint64_t)tail[12]) << 32;
+    case 12:
+      k2 ^= ((uint64_t)tail[11]) << 24;
+    case 11:
+      k2 ^= ((uint64_t)tail[10]) << 16;
+    case 10:
+      k2 ^= ((uint64_t)tail[9]) << 8;
+    case 9:
+      k2 ^= ((uint64_t)tail[8]) << 0;
+      k2 *= c2;
+      k2 = ROTL64(k2, 33);
+      k2 *= c1;
+      h2 ^= k2;
+
+    case 8:
+      k1 ^= ((uint64_t)tail[7]) << 56;
+    case 7:
+      k1 ^= ((uint64_t)tail[6]) << 48;
+    case 6:
+      k1 ^= ((uint64_t)tail[5]) << 40;
+    case 5:
+      k1 ^= ((uint64_t)tail[4]) << 32;
+    case 4:
+      k1 ^= ((uint64_t)tail[3]) << 24;
+    case 3:
+      k1 ^= ((uint64_t)tail[2]) << 16;
+    case 2:
+      k1 ^= ((uint64_t)tail[1]) << 8;
+    case 1:
+      k1 ^= ((uint64_t)tail[0]) << 0;
+      k1 *= c1;
+      k1 = ROTL64(k1, 31);
+      k1 *= c2;
+      h1 ^= k1;
+  }
+
+  //----------
+  // finalization
+
+  h1 ^= len;
+  h2 ^= len;
+
+  h1 += h2;
+  h2 += h1;
+
+  h1 = fmix64(h1);
+  h2 = fmix64(h2);
+
+  h1 += h2;
+  h2 += h1;
+
+  reinterpret_cast<uint64_t*>(out)[0] = h1;
+  reinterpret_cast<uint64_t*>(out)[1] = h2;
+}
+
+template <typename T>
+uint64_t HashHelper(T value, uint32_t seed) {
+  uint64_t output[2];
+  Hash_x64_128(reinterpret_cast<void*>(&value), sizeof(T), seed, output);
+  return output[0];
+}
+
+uint64_t MurmurHash3::Hash(int32_t value) const { return HashHelper(value, seed_); }
+
+uint64_t MurmurHash3::Hash(int64_t value) const { return HashHelper(value, seed_); }
+
+uint64_t MurmurHash3::Hash(float value) const { return HashHelper(value, seed_); }
+
+uint64_t MurmurHash3::Hash(double value) const { return HashHelper(value, seed_); }
+
+uint64_t MurmurHash3::Hash(const FLBA* value, uint32_t len) const {
+  uint64_t out[2];
+  Hash_x64_128(reinterpret_cast<const void*>(value->ptr), len, seed_, out);
+  return out[0];
+}
+
+uint64_t MurmurHash3::Hash(const Int96* value) const {
+  uint64_t out[2];
+  Hash_x64_128(reinterpret_cast<const void*>(value->value), sizeof(value->value), seed_,
+               out);
+  return out[0];
+}
+
+uint64_t MurmurHash3::Hash(const ByteArray* value) const {
+  uint64_t out[2];
+  Hash_x64_128(reinterpret_cast<const void*>(value->ptr), value->len, seed_, out);
+  return out[0];
+}
+
+}  // namespace parquet
diff --git a/src/parquet/murmur3.h b/src/parquet/murmur3.h
new file mode 100644
index 00000000..84792f36
--- /dev/null
+++ b/src/parquet/murmur3.h
@@ -0,0 +1,56 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//-----------------------------------------------------------------------------
+// MurmurHash3 was written by Austin Appleby, and is placed in the public
+// domain. The author hereby disclaims copyright to this source code.
+
+#ifndef PARQUET_MURMURHASH3_H_
+#define PARQUET_MURMURHASH3_H_
+
+#include <cstdint>
+
+#include "parquet/hasher.h"
+#include "parquet/types.h"
+
+namespace parquet {
+
+/// Source:
+/// https://github.com/aappleby/smhasher/blob/master/src/MurmurHash3.cpp
+/// (Modified to adapt to coding conventions and to inherit the Hasher abstract class)
+class MurmurHash3 : public Hasher {
+ public:
+  MurmurHash3() : seed_(DEFAULT_SEED) {}
+  uint64_t Hash(int32_t value) const override;
+  uint64_t Hash(int64_t value) const override;
+  uint64_t Hash(float value) const override;
+  uint64_t Hash(double value) const override;
+  uint64_t Hash(const Int96* value) const override;
+  uint64_t Hash(const ByteArray* value) const override;
+  uint64_t Hash(const FLBA* val, uint32_t len) const override;
+
+ private:
+  // Default seed for hash which comes from Bloom filter in parquet-mr, it is generated
+  // by System.nanoTime() of java.
+  static constexpr int DEFAULT_SEED = 1361930890;
+
+  uint32_t seed_;
+};
+
+}  // namespace parquet
+
+#endif  // PARQUET_MURMURHASH3_H_


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> [C++] Add bloom filter utility class
> ------------------------------------
>
>                 Key: PARQUET-1332
>                 URL: https://issues.apache.org/jira/browse/PARQUET-1332
>             Project: Parquet
>          Issue Type: Sub-task
>          Components: parquet-cpp
>            Reporter: Junjie Chen
>            Assignee: Junjie Chen
>            Priority: Minor
>              Labels: pull-request-available
>             Fix For: cpp-1.5.0
>
>
> This is subtask of PARQUET-41, this JIRA is used to track adding bloom filter class to paruqet-mr and parquet-cpp.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)