You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@orc.apache.org by md...@apache.org on 2019/05/02 20:49:10 UTC

[orc] branch master updated: ORC-488: [C++] Support BloomFilter in C++ library (#379)

This is an automated email from the ASF dual-hosted git repository.

mdeepak pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/orc.git


The following commit(s) were added to refs/heads/master by this push:
     new 61dcca9  ORC-488: [C++] Support BloomFilter in C++ library (#379)
61dcca9 is described below

commit 61dcca9d43d3da2ec03e3b984027ad2f63282430
Author: Gang Wu <ga...@alibaba-inc.com>
AuthorDate: Thu May 2 13:49:05 2019 -0700

    ORC-488: [C++] Support BloomFilter in C++ library (#379)
    
    * ORC-488: [C++] Support BloomFilter in C++ library
    
    1. Support C++ writer to enable bloom filter of specific columns.
    2. Support C++ reader to get bloom filter of specific columns.
    3. Only bloom filter of UTF8 type is supported.
    
    * fix build
    
    * Fix types based on feedback
---
 c++/include/orc/BloomFilter.hh |  45 ++++++
 c++/include/orc/Common.hh      |  11 ++
 c++/include/orc/Reader.hh      |  12 ++
 c++/include/orc/Writer.hh      |  26 ++++
 c++/src/BloomFilter.cc         | 337 +++++++++++++++++++++++++++++++++++++++++
 c++/src/BloomFilter.hh         | 201 ++++++++++++++++++++++++
 c++/src/CMakeLists.txt         |   2 +
 c++/src/ColumnWriter.cc        | 117 +++++++++++++-
 c++/src/ColumnWriter.hh        |  12 ++
 c++/src/Murmur3.cc             |  98 ++++++++++++
 c++/src/Murmur3.hh             |  40 +++++
 c++/src/Reader.cc              |  60 +++++++-
 c++/src/Reader.hh              |   3 +
 c++/src/Writer.cc              |  31 ++++
 c++/test/CMakeLists.txt        |   1 +
 c++/test/TestBloomFilter.cc    | 219 ++++++++++++++++++++++++++
 c++/test/TestWriter.cc         |  78 ++++++++++
 17 files changed, 1291 insertions(+), 2 deletions(-)

diff --git a/c++/include/orc/BloomFilter.hh b/c++/include/orc/BloomFilter.hh
new file mode 100644
index 0000000..86c1288
--- /dev/null
+++ b/c++/include/orc/BloomFilter.hh
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ORC_BLOOMFILTER_HH
+#define ORC_BLOOMFILTER_HH
+
+#include "orc/orc-config.hh"
+
+#include <memory>
+#include <vector>
+
+namespace orc {
+
+  class BloomFilter {
+  public:
+    virtual ~BloomFilter();
+
+    // test if the element exists in BloomFilter
+    virtual bool testBytes(const char * data, int64_t length) const = 0;
+    virtual bool testLong(int64_t data) const = 0;
+    virtual bool testDouble(double data) const = 0;
+  };
+
+  struct BloomFilterIndex {
+    std::vector<std::shared_ptr<BloomFilter>> entries;
+  };
+
+};
+
+#endif //ORC_BLOOMFILTER_HH
diff --git a/c++/include/orc/Common.hh b/c++/include/orc/Common.hh
index 9bfa82d..c4523af 100644
--- a/c++/include/orc/Common.hh
+++ b/c++/include/orc/Common.hh
@@ -262,6 +262,17 @@ namespace orc {
     }
     return false;
   }
+
+  enum BloomFilterVersion {
+    // Include both the BLOOM_FILTER and BLOOM_FILTER_UTF8 streams to support
+    // both old and new readers.
+    ORIGINAL = 0,
+    // Only include the BLOOM_FILTER_UTF8 streams that consistently use UTF8.
+    // See ORC-101
+    UTF8 = 1,
+    FUTURE = INT64_MAX
+  };
+
 }
 
 #endif
diff --git a/c++/include/orc/Reader.hh b/c++/include/orc/Reader.hh
index 32549b5..d5f8541 100644
--- a/c++/include/orc/Reader.hh
+++ b/c++/include/orc/Reader.hh
@@ -19,13 +19,16 @@
 #ifndef ORC_READER_HH
 #define ORC_READER_HH
 
+#include "orc/BloomFilter.hh"
 #include "orc/Common.hh"
 #include "orc/orc-config.hh"
 #include "orc/Statistics.hh"
 #include "orc/Type.hh"
 #include "orc/Vector.hh"
 
+#include <map>
 #include <memory>
+#include <set>
 #include <string>
 #include <vector>
 
@@ -472,6 +475,15 @@ namespace orc {
      */
     virtual uint64_t getMemoryUseByTypeId(const std::list<uint64_t>& include, int stripeIx=-1) = 0;
 
+    /**
+     * Get BloomFiters of all selected columns in the specified stripe
+     * @param stripeIndex index of the stripe to be read for bloom filters.
+     * @param included index of selected columns to return (if not specified,
+     *        all columns that have bloom filters are considered).
+     * @return map of bloom filters with the key standing for the index of column.
+     */
+    virtual std::map<uint32_t, BloomFilterIndex>
+    getBloomFilters(uint32_t stripeIndex, const std::set<uint32_t>& included = {}) const = 0;
   };
 
   /**
diff --git a/c++/include/orc/Writer.hh b/c++/include/orc/Writer.hh
index a333a23..5b33386 100644
--- a/c++/include/orc/Writer.hh
+++ b/c++/include/orc/Writer.hh
@@ -25,6 +25,7 @@
 #include "orc/Vector.hh"
 
 #include <memory>
+#include <set>
 #include <string>
 #include <vector>
 
@@ -191,6 +192,31 @@ namespace orc {
      * @return if not set, the default is false
      */
     bool getEnableDictionary() const;
+
+    /**
+     * Set columns that use BloomFilter
+     */
+    WriterOptions& setColumnsUseBloomFilter(const std::set<uint64_t>& columns);
+
+    /**
+     * Get whether this column uses BloomFilter
+     */
+    bool isColumnUseBloomFilter(uint64_t column) const;
+
+    /**
+     * Set false positive probability of BloomFilter
+     */
+    WriterOptions& setBloomFilterFPP(double fpp);
+
+    /**
+     * Get false positive probability of BloomFilter
+     */
+    double getBloomFilterFPP() const;
+
+    /**
+     * Get version of BloomFilter
+     */
+    BloomFilterVersion getBloomFilterVersion() const;
   };
 
   class Writer {
diff --git a/c++/src/BloomFilter.cc b/c++/src/BloomFilter.cc
new file mode 100644
index 0000000..020a271
--- /dev/null
+++ b/c++/src/BloomFilter.cc
@@ -0,0 +1,337 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "BloomFilter.hh"
+#include "Murmur3.hh"
+
+namespace orc {
+
+  constexpr uint64_t BITS_OF_LONG = 64;
+  constexpr uint8_t  SHIFT_6_BITS = 6;
+  constexpr uint8_t  SHIFT_3_BITS = 3;
+
+  static bool isLittleEndian() {
+    static union { uint32_t i; char c[4]; } num = { 0x01020304 };
+    return num.c[0] == 4;
+  }
+
+  /**
+   * Implementation of BitSet
+   */
+  BitSet::BitSet(uint64_t numBits) {
+    mData.resize(static_cast<size_t>(ceil(
+      static_cast<double>(numBits) / BITS_OF_LONG)), 0);
+  }
+
+  BitSet::BitSet(const uint64_t * bits, uint64_t numBits) {
+    // caller should make sure numBits is multiple of 64
+    mData.resize(numBits >> SHIFT_6_BITS, 0);
+    memcpy(mData.data(), bits, numBits >> SHIFT_3_BITS);
+  }
+
+  void BitSet::set(uint64_t index) {
+    mData[index >> SHIFT_6_BITS] |= (1ULL << (index % BITS_OF_LONG));
+  }
+
+  bool BitSet::get(uint64_t index) {
+    return (mData[index >> SHIFT_6_BITS] & (1ULL << (index % BITS_OF_LONG))) != 0;
+  }
+
+  uint64_t BitSet::bitSize() {
+    return mData.size() << SHIFT_6_BITS;
+  }
+
+  void BitSet::merge(const BitSet& other) {
+    if (mData.size() != other.mData.size()) {
+      std::stringstream ss;
+      ss << "BitSet must be of equal length ("
+         << mData.size() << " != " << other.mData.size() << ")";
+      throw std::logic_error(ss.str());
+    }
+
+    for (size_t i = 0; i != mData.size(); i++) {
+      mData[i] |= other.mData[i];
+    }
+  }
+
+  void BitSet::clear() {
+    memset(mData.data(), 0, sizeof(uint64_t) * mData.size());
+  }
+
+  const uint64_t * BitSet::getData() const {
+    return mData.data();
+  }
+
+  bool BitSet::operator==(const BitSet& other) const {
+    return mData == other.mData;
+  }
+
+  /**
+   * Helper functions
+   */
+  void checkArgument(bool expression, const std::string& message) {
+    if (!expression) {
+      throw std::logic_error(message);
+    }
+  }
+
+  int32_t optimalNumOfHashFunctions(uint64_t expectedEntries, uint64_t numBits) {
+    double n = static_cast<double>(expectedEntries);
+    return std::max<int32_t>(1, static_cast<int32_t>(
+      std::round(static_cast<double>(numBits) / n * std::log(2.0))));
+  }
+
+  int32_t optimalNumOfBits(uint64_t expectedEntries, double fpp) {
+    double n = static_cast<double>(expectedEntries);
+    return static_cast<int32_t>(-n * std::log(fpp) / (std::log(2.0) * std::log(2.0)));
+  }
+
+  // Thomas Wang's integer hash function
+  // http://web.archive.org/web/20071223173210/http://www.concentric.net/~Ttwang/tech/inthash.htm
+  inline uint64_t getLongHash(uint64_t key) {
+    key = (~key) + (key << 21); // key = (key << 21) - key - 1;
+    key = key ^ (key >> 24);
+    key = (key + (key << 3)) + (key << 8); // key * 265
+    key = key ^ (key >> 14);
+    key = (key + (key << 2)) + (key << 4); // key * 21
+    key = key ^ (key >> 28);
+    key = key + (key << 31);
+    return key;
+  }
+
+  // We use the trick mentioned in "Less Hashing, Same Performance:
+  // Building a Better Bloom Filter" by Kirsch et.al. From abstract
+  // 'only two hash functions are necessary to effectively implement
+  // a Bloom filter without any loss in the asymptotic false positive
+  // probability'
+  // Lets split up 64-bit hashcode into two 32-bit hash codes and employ
+  // the technique mentioned in the above paper
+  inline uint64_t getBytesHash(const char * data, int64_t length) {
+    if (data == nullptr) {
+      return Murmur3::NULL_HASHCODE;
+    }
+
+    return Murmur3::hash64(reinterpret_cast<const uint8_t *>(data),
+                           static_cast<uint32_t>(length));
+  }
+
+  /**
+   * Implementation of BloomFilter
+   */
+  BloomFilterImpl::BloomFilterImpl(uint64_t expectedEntries, double fpp) {
+    checkArgument(expectedEntries > 0,
+                  "expectedEntries should be > 0");
+    checkArgument(fpp > 0.0 && fpp < 1.0,
+                  "False positive probability should be > 0.0 & < 1.0");
+
+    uint64_t nb = static_cast<uint64_t>(optimalNumOfBits(expectedEntries, fpp));
+    // make 'mNumBits' multiple of 64
+    mNumBits = nb + (BITS_OF_LONG - (nb % BITS_OF_LONG));
+    mNumHashFunctions = optimalNumOfHashFunctions(expectedEntries, mNumBits);
+    mBitSet.reset(new BitSet(mNumBits));
+  }
+
+  void BloomFilterImpl::addBytes(const char * data, int64_t length) {
+    uint64_t hash64 = getBytesHash(data, length);
+    addHash(hash64);
+  }
+
+  void BloomFilterImpl::addLong(int64_t data) {
+    addHash(getLongHash(static_cast<uint64_t>(data)));
+  }
+
+  bool BloomFilterImpl::testBytes(const char * data, int64_t length) const {
+    uint64_t hash64 = getBytesHash(data, length);
+    return testHash(hash64);
+  }
+
+  bool BloomFilterImpl::testLong(int64_t data) const {
+    return testHash(getLongHash(static_cast<uint64_t>(data)));
+  }
+
+  uint64_t BloomFilterImpl::sizeInBytes() const {
+    return getBitSize() >> SHIFT_3_BITS;
+  }
+
+  uint64_t BloomFilterImpl::getBitSize() const {
+    return mBitSet->bitSize();
+  }
+
+  int32_t BloomFilterImpl::getNumHashFunctions() const {
+    return mNumHashFunctions;
+  }
+
+  DIAGNOSTIC_PUSH
+
+#if defined(__clang__)
+  DIAGNOSTIC_IGNORE("-Wundefined-reinterpret-cast")
+#endif
+
+#if defined(__GNUC__)
+  DIAGNOSTIC_IGNORE("-Wstrict-aliasing")
+#endif
+
+  // caller should make sure input proto::BloomFilter is valid since
+  // no check will be performed in the following constructor
+  BloomFilterImpl::BloomFilterImpl(const proto::BloomFilter& bloomFilter) {
+    mNumHashFunctions = static_cast<int32_t>(bloomFilter.numhashfunctions());
+
+    const std::string& bitsetStr = bloomFilter.utf8bitset();
+    mNumBits = bitsetStr.size() << SHIFT_3_BITS;
+    checkArgument(mNumBits % BITS_OF_LONG == 0, "numBits should be multiple of 64!");
+
+    const uint64_t * bitset = reinterpret_cast<const uint64_t *>(bitsetStr.data());
+    if (isLittleEndian()) {
+      mBitSet.reset(new BitSet(bitset, mNumBits));
+    } else {
+      std::vector<uint64_t> longs(mNumBits >> SHIFT_6_BITS);
+      for (size_t i = 0; i != longs.size(); ++i) {
+        // convert little-endian to big-endian
+        const uint64_t src = bitset[i];
+        uint64_t& dst = longs[i];
+        for (size_t bit = 0; bit != 64; bit += 8) {
+          dst |= (((src & (0xFFu << bit)) >> bit) << (56 - bit));
+        }
+      }
+
+      mBitSet.reset(new BitSet(longs.data(), mNumBits));
+    }
+  }
+
+  void BloomFilterImpl::addDouble(double data) {
+    addLong(reinterpret_cast<int64_t&>(data));
+  }
+
+  bool BloomFilterImpl::testDouble(double data) const{
+    return testLong(reinterpret_cast<int64_t&>(data));
+  }
+
+  DIAGNOSTIC_POP
+
+  void BloomFilterImpl::addHash(uint64_t hash64) {
+    int32_t hash1 = static_cast<int32_t>(hash64 & 0xffffffff);
+    int32_t hash2 = static_cast<int32_t>(hash64 >> 32);
+
+    for (int32_t i = 1; i <= mNumHashFunctions; ++i) {
+      int32_t combinedHash = hash1 + i * hash2;
+      // hashcode should be positive, flip all the bits if it's negative
+      if (combinedHash < 0) {
+        combinedHash = ~combinedHash;
+      }
+      uint64_t pos = static_cast<uint64_t>(combinedHash) % mNumBits;
+      mBitSet->set(pos);
+    }
+  }
+
+  bool BloomFilterImpl::testHash(uint64_t hash64) const{
+    int32_t hash1 = static_cast<int32_t>(hash64 & 0xffffffff);
+    int32_t hash2 = static_cast<int32_t>(hash64 >> 32);
+
+    for (int32_t i = 1; i <= mNumHashFunctions; ++i) {
+      int32_t combinedHash = hash1 + i * hash2;
+      // hashcode should be positive, flip all the bits if it's negative
+      if (combinedHash < 0) {
+        combinedHash = ~combinedHash;
+      }
+      uint64_t pos = static_cast<uint64_t>(combinedHash) % mNumBits;
+      if (!mBitSet->get(pos)) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  void BloomFilterImpl::merge(const BloomFilterImpl& other) {
+    if (mNumBits != other.mNumBits || mNumHashFunctions != other.mNumHashFunctions) {
+      std::stringstream ss;
+      ss << "BloomFilters are not compatible for merging: "
+         << "this: numBits:" << mNumBits
+         << ",numHashFunctions:" << mNumHashFunctions
+         << ", that: numBits:" << other.mNumBits
+         << ",numHashFunctions:" << other.mNumHashFunctions;
+      throw std::logic_error(ss.str());
+    }
+
+    mBitSet->merge(*other.mBitSet);
+  }
+
+  void BloomFilterImpl::reset() {
+    mBitSet->clear();
+  }
+
+  void BloomFilterImpl::serialize(proto::BloomFilter& bloomFilter) const {
+    bloomFilter.set_numhashfunctions(static_cast<uint32_t>(mNumHashFunctions));
+
+    // According to ORC standard, the encoding is a sequence of bytes with
+    // a little endian encoding in the utf8bitset field.
+    if (isLittleEndian()) {
+      // bytes are already organized in little endian; thus no conversion needed
+      const char * bitset = reinterpret_cast<const char *>(mBitSet->getData());
+      bloomFilter.set_utf8bitset(bitset, sizeInBytes());
+    } else {
+      std::vector<uint64_t> bitset(sizeInBytes() / sizeof(uint64_t), 0);
+      const uint64_t * longs = mBitSet->getData();
+      for (size_t i = 0; i != bitset.size(); ++i) {
+        uint64_t& dst = bitset[i];
+        const uint64_t src = longs[i];
+        // convert big-endian to little-endian
+        for (size_t bit = 0; bit != 64; bit += 8) {
+          dst |= (((src & (0xFFu << bit)) >> bit) << (56 - bit));
+        }
+      }
+      bloomFilter.set_utf8bitset(bitset.data(), sizeInBytes());
+    }
+  }
+
+  bool BloomFilterImpl::operator==(const BloomFilterImpl& other) const {
+    return mNumBits == other.mNumBits &&
+           mNumHashFunctions == other.mNumHashFunctions &&
+           *mBitSet == *other.mBitSet;
+  }
+
+  BloomFilter::~BloomFilter() {
+    // PASS
+  }
+
+  std::unique_ptr<BloomFilter> BloomFilterUTF8Utils::deserialize(
+    const proto::Stream_Kind& streamKind,
+    const proto::ColumnEncoding& encoding,
+    const proto::BloomFilter& bloomFilter) {
+
+    std::unique_ptr<BloomFilter> ret(nullptr);
+
+    // only BLOOM_FILTER_UTF8 is supported
+    if (streamKind != proto::Stream_Kind_BLOOM_FILTER_UTF8) {
+      return ret;
+    }
+
+    // make sure we don't use unknown encodings or original timestamp encodings
+    if (!encoding.has_bloomencoding() || encoding.bloomencoding() != 1) {
+      return ret;
+    }
+
+    // make sure all required fields exist
+    if (!bloomFilter.has_numhashfunctions() || !bloomFilter.has_utf8bitset()) {
+      return ret;
+    }
+
+    ret.reset(new BloomFilterImpl(bloomFilter));
+    return ret;
+  }
+
+}
diff --git a/c++/src/BloomFilter.hh b/c++/src/BloomFilter.hh
new file mode 100644
index 0000000..8c0d73a
--- /dev/null
+++ b/c++/src/BloomFilter.hh
@@ -0,0 +1,201 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ORC_BLOOMFILTER_IMPL_HH
+#define ORC_BLOOMFILTER_IMPL_HH
+
+#include "orc/BloomFilter.hh"
+#include "wrap/orc-proto-wrapper.hh"
+
+#include <cmath>
+#include <sstream>
+#include <vector>
+
+namespace orc {
+
+  /**
+   * Bare metal bit set implementation. For performance reasons, this implementation does not check
+   * for index bounds nor expand the bit set size if the specified index is greater than the size.
+   */
+  class BitSet {
+  public:
+    /**
+     * Creates an empty BitSet
+     *
+     * @param numBits - number of bits used
+     */
+    BitSet(uint64_t numBits);
+
+    /**
+     * Creates BitSet from serialized uint64_t buffer
+     *
+     * @param bits - serialized uint64_t buffer of bitset
+     * @param numBits - number of bits used
+     */
+    BitSet(const uint64_t * bits, uint64_t numBits);
+
+    /**
+     * Sets the bit at specified index.
+     *
+     * @param index - position
+     */
+    void set(uint64_t index);
+
+    /**
+     * Returns true if the bit is set in the specified index.
+     *
+     * @param index - position
+     * @return - value at the bit position
+     */
+    bool get(uint64_t index);
+
+    /**
+     * Number of bits
+     */
+    uint64_t bitSize();
+
+    /**
+     * Combines the two BitSets using bitwise OR.
+     */
+    void merge(const BitSet& other);
+
+    /**
+     * Clears the bit set.
+     */
+    void clear();
+
+    /**
+     * Gets underlying raw data
+     */
+    const uint64_t * getData() const;
+
+    /**
+     * Compares two BitSets
+     */
+    bool operator==(const BitSet& other) const;
+
+  private:
+    std::vector<uint64_t> mData;
+  };
+
+  /**
+   * BloomFilter is a probabilistic data structure for set membership check.
+   * BloomFilters are highly space efficient when compared to using a HashSet.
+   * Because of the probabilistic nature of bloom filter false positive (element
+   * not present in bloom filter but test() says true) are possible but false
+   * negatives are not possible (if element is present then test() will never
+   * say false). The false positive probability is configurable (default: 5%)
+   * depending on which storage requirement may increase or decrease. Lower the
+   * false positive probability greater is the space requirement.
+   *
+   * Bloom filters are sensitive to number of elements that will be inserted in
+   * the bloom filter. During the creation of bloom filter expected number of
+   * entries must be specified. If the number of insertions exceed the specified
+   * initial number of entries then false positive probability will increase
+   * accordingly.
+   *
+   * Internally, this implementation of bloom filter uses Murmur3 fast
+   * non-cryptographic hash algorithm. Although Murmur2 is slightly faster than
+   * Murmur3 in Java, it suffers from hash collisions for specific sequence of
+   * repeating bytes. Check the following link for more info
+   * https://code.google.com/p/smhasher/wiki/MurmurHash2Flaw
+   *
+   * Note that this class is here for backwards compatibility, because it uses
+   * the JVM default character set for strings. All new users should
+   * BloomFilterUtf8, which always uses UTF8 for the encoding.
+   */
+  class BloomFilterImpl : public BloomFilter {
+  public:
+    /**
+     * Creates an empty BloomFilter
+     *
+     * @param expectedEntries - number of entries it will hold
+     * @param fpp - false positive probability
+     */
+    BloomFilterImpl(uint64_t expectedEntries, double fpp);
+
+    BloomFilterImpl(uint64_t expectedEntries)
+                   :BloomFilterImpl(expectedEntries, DEFAULT_FPP) {
+      // PASS
+    }
+
+    /**
+     * Creates a BloomFilter by deserializing the proto-buf version
+     *
+     * caller should make sure input proto::BloomFilter is valid
+     */
+    BloomFilterImpl(const proto::BloomFilter& bloomFilter);
+
+    /**
+     * Adds a new element to the BloomFilter
+     */
+    void addBytes(const char * data, int64_t length);
+    void addLong(int64_t data);
+    void addDouble(double data);
+
+    /**
+     * Test if the element exists in BloomFilter
+     */
+    bool testBytes(const char * data, int64_t length) const override;
+    bool testLong(int64_t data) const override;
+    bool testDouble(double data) const override;
+
+    uint64_t sizeInBytes() const;
+    uint64_t getBitSize() const;
+    int32_t getNumHashFunctions() const;
+
+    void merge(const BloomFilterImpl& other);
+
+    void reset();
+
+    bool operator==(const BloomFilterImpl& other) const;
+
+  private:
+    friend struct BloomFilterUTF8Utils;
+
+    // compute k hash values from hash64 and set bits
+    void addHash(uint64_t hash64);
+
+    // compute k hash values from hash64 and check bits
+    bool testHash(uint64_t hash64) const;
+
+    void serialize(proto::BloomFilter& bloomFilter) const;
+
+  private:
+    static constexpr double DEFAULT_FPP = 0.05;
+    uint64_t mNumBits;
+    int32_t mNumHashFunctions;
+    std::unique_ptr<BitSet> mBitSet;
+  };
+
+  struct BloomFilterUTF8Utils {
+    // serialize BloomFilter in protobuf
+    static void serialize(const BloomFilterImpl& in, proto::BloomFilter& out) {
+      in.serialize(out);
+    }
+
+    // deserialize BloomFilter from protobuf
+    static std::unique_ptr<BloomFilter>
+    deserialize(const proto::Stream_Kind& streamKind,
+                const proto::ColumnEncoding& columnEncoding,
+                const proto::BloomFilter& bloomFilter);
+  };
+
+}
+
+#endif //ORC_BLOOMFILTER_IMPL_HH
diff --git a/c++/src/CMakeLists.txt b/c++/src/CMakeLists.txt
index 235ced8..1991f74 100644
--- a/c++/src/CMakeLists.txt
+++ b/c++/src/CMakeLists.txt
@@ -186,6 +186,7 @@ set(SOURCE_FILES
   io/OutputStream.cc
   wrap/orc-proto-wrapper.cc
   Adaptor.cc
+  BloomFilter.cc
   ByteRLE.cc
   ColumnPrinter.cc
   ColumnReader.cc
@@ -196,6 +197,7 @@ set(SOURCE_FILES
   Int128.cc
   LzoDecompressor.cc
   MemoryPool.cc
+  Murmur3.cc
   OrcFile.cc
   Reader.cc
   RLEv1.cc
diff --git a/c++/src/ColumnWriter.cc b/c++/src/ColumnWriter.cc
index 8c21ed3..30d96ac 100644
--- a/c++/src/ColumnWriter.cc
+++ b/c++/src/ColumnWriter.cc
@@ -97,8 +97,10 @@ namespace orc {
                                 rowIndex(),
                                 rowIndexEntry(),
                                 rowIndexPosition(),
+                                enableBloomFilter(false),
                                 memPool(*options.getMemoryPool()),
-                                indexStream() {
+                                indexStream(),
+                                bloomFilterStream() {
 
     std::unique_ptr<BufferedOutputStream> presentStream =
         factory.createStream(proto::Stream_Kind_PRESENT);
@@ -116,6 +118,16 @@ namespace orc {
                      new RowIndexPositionRecorder(*rowIndexEntry));
       indexStream =
         factory.createStream(proto::Stream_Kind_ROW_INDEX);
+
+      // BloomFilters for non-UTF8 strings and non-UTC timestamps are not supported
+      if (options.isColumnUseBloomFilter(columnId)
+          && options.getBloomFilterVersion() == BloomFilterVersion::UTF8) {
+        enableBloomFilter = true;
+        bloomFilter.reset(new BloomFilterImpl(
+          options.getRowIndexStride(), options.getBloomFilterFPP()));
+        bloomFilterIndex.reset(new proto::BloomFilterIndex());
+        bloomFilterStream = factory.createStream(proto::Stream_Kind_BLOOM_FILTER_UTF8);
+      }
     }
   }
 
@@ -174,9 +186,18 @@ namespace orc {
     colStripeStatistics->merge(*colIndexStatistics);
     colIndexStatistics->reset();
 
+    addBloomFilterEntry();
+
     recordPosition();
   }
 
+  void ColumnWriter::addBloomFilterEntry() {
+    if (enableBloomFilter) {
+      BloomFilterUTF8Utils::serialize(*bloomFilter, *bloomFilterIndex->add_bloomfilter());
+      bloomFilter->reset();
+    }
+  }
+
   void ColumnWriter::writeIndex(std::vector<proto::Stream> &streams) const {
     // write row index to output stream
     rowIndex->SerializeToZeroCopyStream(indexStream.get());
@@ -187,6 +208,17 @@ namespace orc {
     stream.set_column(static_cast<uint32_t>(columnId));
     stream.set_length(indexStream->flush());
     streams.push_back(stream);
+
+    // write BLOOM_FILTER_UTF8 stream
+    if (enableBloomFilter) {
+      if (!bloomFilterIndex->SerializeToZeroCopyStream(bloomFilterStream.get())) {
+        throw std::logic_error("Failed to write bloom filter stream.");
+      }
+      stream.set_kind(proto::Stream_Kind_BLOOM_FILTER_UTF8);
+      stream.set_column(static_cast<uint32_t>(columnId));
+      stream.set_length(bloomFilterStream->flush());
+      streams.push_back(stream);
+    }
   }
 
   void ColumnWriter::recordPosition() const {
@@ -203,6 +235,11 @@ namespace orc {
       // write current positions
       recordPosition();
     }
+
+    if (enableBloomFilter) {
+      bloomFilter->reset();
+      bloomFilterIndex->clear_bloomfilter();
+    }
   }
 
   void ColumnWriter::writeDictionary() {
@@ -474,6 +511,9 @@ namespace orc {
     for (uint64_t i = 0; i < numValues; ++i) {
       if (notNull == nullptr || notNull[i]) {
         ++count;
+        if (enableBloomFilter) {
+          bloomFilter->addLong(data[i]);
+        }
         intStats->update(data[i], 1);
       }
     }
@@ -504,6 +544,9 @@ namespace orc {
     proto::ColumnEncoding encoding;
     encoding.set_kind(RleVersionMapper(rleVersion));
     encoding.set_dictionarysize(0);
+    if (enableBloomFilter) {
+      encoding.set_bloomencoding(BloomFilterVersion::UTF8);
+    }
     encodings.push_back(encoding);
   }
 
@@ -580,6 +623,9 @@ namespace orc {
     for (uint64_t i = 0; i < numValues; ++i) {
       if (notNull == nullptr || notNull[i]) {
         ++count;
+        if (enableBloomFilter) {
+          bloomFilter->addLong(data[i]);
+        }
         intStats->update(static_cast<int64_t>(byteData[i]), 1);
       }
     }
@@ -610,6 +656,9 @@ namespace orc {
     proto::ColumnEncoding encoding;
     encoding.set_kind(proto::ColumnEncoding_Kind_DIRECT);
     encoding.set_dictionarysize(0);
+    if (enableBloomFilter) {
+      encoding.set_bloomencoding(BloomFilterVersion::UTF8);
+    }
     encodings.push_back(encoding);
   }
 
@@ -686,6 +735,9 @@ namespace orc {
     for (uint64_t i = 0; i < numValues; ++i) {
       if (notNull == nullptr || notNull[i]) {
         ++count;
+        if (enableBloomFilter) {
+          bloomFilter->addLong(data[i]);
+        }
         boolStats->update(byteData[i] != 0, 1);
       }
     }
@@ -716,6 +768,9 @@ namespace orc {
     proto::ColumnEncoding encoding;
     encoding.set_kind(proto::ColumnEncoding_Kind_DIRECT);
     encoding.set_dictionarysize(0);
+    if (enableBloomFilter) {
+      encoding.set_bloomencoding(BloomFilterVersion::UTF8);
+    }
     encodings.push_back(encoding);
   }
 
@@ -811,6 +866,9 @@ namespace orc {
         }
         dataStream->write(data, bytes);
         ++count;
+        if (enableBloomFilter) {
+          bloomFilter->addDouble(doubleData[i]);
+        }
         doubleStats->update(doubleData[i]);
       }
     }
@@ -841,6 +899,9 @@ namespace orc {
     proto::ColumnEncoding encoding;
     encoding.set_kind(proto::ColumnEncoding_Kind_DIRECT);
     encoding.set_dictionarysize(0);
+    if (enableBloomFilter) {
+      encoding.set_bloomencoding(BloomFilterVersion::UTF8);
+    }
     encodings.push_back(encoding);
   }
 
@@ -1112,6 +1173,9 @@ namespace orc {
         } else {
           directDataStream->write(data[i], len);
         }
+        if (enableBloomFilter) {
+          bloomFilter->addBytes(data[i], static_cast<int64_t>(len));
+        }
         strStats->update(data[i], len);
         ++count;
       }
@@ -1187,6 +1251,9 @@ namespace orc {
                         proto::ColumnEncoding_Kind_DICTIONARY_V2);
     }
     encoding.set_dictionarysize(static_cast<uint32_t>(dictionary.size()));
+    if (enableBloomFilter) {
+      encoding.set_bloomencoding(BloomFilterVersion::UTF8);
+    }
     encodings.push_back(encoding);
   }
 
@@ -1489,6 +1556,9 @@ namespace orc {
           directDataStream->write(charData, static_cast<size_t>(length[i]));
         }
 
+        if (enableBloomFilter) {
+          bloomFilter->addBytes(data[i], length[i]);
+        }
         strStats->update(charData, static_cast<size_t>(length[i]));
         ++count;
       }
@@ -1559,6 +1629,9 @@ namespace orc {
           directDataStream->write(data[i], static_cast<size_t>(length[i]));
         }
 
+        if (enableBloomFilter) {
+          bloomFilter->addBytes(data[i], length[i]);
+        }
         strStats->update(data[i], static_cast<size_t>(length[i]));
         ++count;
       }
@@ -1733,6 +1806,9 @@ namespace orc {
         // TimestampVectorBatch already stores data in UTC
         int64_t millsUTC = secs[i] * 1000 + nanos[i] / 1000000;
         ++count;
+        if (enableBloomFilter) {
+          bloomFilter->addLong(millsUTC);
+        }
         tsStats->update(millsUTC);
 
         if (secs[i] < 0 && nanos[i] != 0) {
@@ -1780,6 +1856,9 @@ namespace orc {
     proto::ColumnEncoding encoding;
     encoding.set_kind(RleVersionMapper(rleVersion));
     encoding.set_dictionarysize(0);
+    if (enableBloomFilter) {
+      encoding.set_bloomencoding(BloomFilterVersion::UTF8);
+    }
     encodings.push_back(encoding);
   }
 
@@ -1838,6 +1917,9 @@ namespace orc {
       if (!notNull || notNull[i]) {
         ++count;
         dateStats->update(static_cast<int32_t>(data[i]));
+        if (enableBloomFilter) {
+          bloomFilter->addLong(data[i]);
+        }
       }
     }
     dateStats->increase(count);
@@ -1942,6 +2024,12 @@ namespace orc {
         }
         valueStream->write(buffer, static_cast<size_t>(data - buffer));
         ++count;
+        if (enableBloomFilter) {
+          std::string decimal = Decimal(
+            values[i], static_cast<int32_t>(scale)).toString();
+          bloomFilter->addBytes(
+            decimal.c_str(), static_cast<int64_t>(decimal.size()));
+        }
         decStats->update(Decimal(values[i], static_cast<int32_t>(scale)));
       }
     }
@@ -1981,6 +2069,9 @@ namespace orc {
     proto::ColumnEncoding encoding;
     encoding.set_kind(RleVersionMapper(rleVersion));
     encoding.set_dictionarysize(0);
+    if (enableBloomFilter) {
+      encoding.set_bloomencoding(BloomFilterVersion::UTF8);
+    }
     encodings.push_back(encoding);
   }
 
@@ -2067,6 +2158,12 @@ namespace orc {
         valueStream->write(buffer, static_cast<size_t>(data - buffer));
 
         ++count;
+        if (enableBloomFilter) {
+          std::string decimal = Decimal(
+            values[i], static_cast<int32_t>(scale)).toString();
+          bloomFilter->addBytes(
+            decimal.c_str(), static_cast<int64_t>(decimal.size()));
+        }
         decStats->update(Decimal(values[i], static_cast<int32_t>(scale)));
       }
     }
@@ -2188,6 +2285,9 @@ namespace orc {
         for (uint64_t i = 0; i < numValues; ++i) {
           if (notNull[i]) {
             ++count;
+            if (enableBloomFilter) {
+              bloomFilter->addLong(offsets[i]);
+            }
           }
         }
         colIndexStatistics->increase(count);
@@ -2233,6 +2333,9 @@ namespace orc {
     proto::ColumnEncoding encoding;
     encoding.set_kind(RleVersionMapper(rleVersion));
     encoding.set_dictionarysize(0);
+    if (enableBloomFilter) {
+      encoding.set_bloomencoding(BloomFilterVersion::UTF8);
+    }
     encodings.push_back(encoding);
     if (child.get()) {
       child->getColumnEncoding(encodings);
@@ -2412,6 +2515,9 @@ namespace orc {
         for (uint64_t i = 0; i < numValues; ++i) {
           if (notNull[i]) {
             ++count;
+            if (enableBloomFilter) {
+              bloomFilter->addLong(offsets[i]);
+            }
           }
         }
         colIndexStatistics->increase(count);
@@ -2467,6 +2573,9 @@ namespace orc {
     proto::ColumnEncoding encoding;
     encoding.set_kind(RleVersionMapper(rleVersion));
     encoding.set_dictionarysize(0);
+    if (enableBloomFilter) {
+      encoding.set_bloomencoding(BloomFilterVersion::UTF8);
+    }
     encodings.push_back(encoding);
     if (keyWriter.get()) {
       keyWriter->getColumnEncoding(encodings);
@@ -2668,6 +2777,9 @@ namespace orc {
         for (uint64_t i = 0; i < numValues; ++i) {
           if (notNull[i]) {
             ++count;
+            if (enableBloomFilter) {
+              bloomFilter->addLong(tags[i]);
+            }
           }
         }
         colIndexStatistics->increase(count);
@@ -2713,6 +2825,9 @@ namespace orc {
     proto::ColumnEncoding encoding;
     encoding.set_kind(proto::ColumnEncoding_Kind_DIRECT);
     encoding.set_dictionarysize(0);
+    if (enableBloomFilter) {
+      encoding.set_bloomencoding(BloomFilterVersion::UTF8);
+    }
     encodings.push_back(encoding);
     for (uint32_t i = 0; i < children.size(); ++i) {
       children[i]->getColumnEncoding(encodings);
diff --git a/c++/src/ColumnWriter.hh b/c++/src/ColumnWriter.hh
index 2364066..cbbb5d0 100644
--- a/c++/src/ColumnWriter.hh
+++ b/c++/src/ColumnWriter.hh
@@ -21,6 +21,7 @@
 
 #include "orc/Vector.hh"
 
+#include "BloomFilter.hh"
 #include "ByteRLE.hh"
 #include "Compression.hh"
 #include "orc/Exceptions.hh"
@@ -82,6 +83,11 @@ namespace orc {
     std::unique_ptr<proto::RowIndexEntry> rowIndexEntry;
     std::unique_ptr<RowIndexPositionRecorder> rowIndexPosition;
 
+    // bloom filters are recorded per row group
+    bool enableBloomFilter;
+    std::unique_ptr<BloomFilterImpl> bloomFilter;
+    std::unique_ptr<proto::BloomFilterIndex> bloomFilterIndex;
+
   public:
     ColumnWriter(const Type& type, const StreamsFactory& factory,
                  const WriterOptions& options);
@@ -153,6 +159,11 @@ namespace orc {
     virtual void createRowIndexEntry();
 
     /**
+     * Create a new BloomFilter entry and add the previous one to BloomFilterIndex
+     */
+    virtual void addBloomFilterEntry();
+
+    /**
      * Write row index streams for this column.
      * @param streams output list of ROW_INDEX streams
      */
@@ -195,6 +206,7 @@ namespace orc {
   protected:
     MemoryPool& memPool;
     std::unique_ptr<BufferedOutputStream> indexStream;
+    std::unique_ptr<BufferedOutputStream> bloomFilterStream;
   };
 
   /**
diff --git a/c++/src/Murmur3.cc b/c++/src/Murmur3.cc
new file mode 100644
index 0000000..b45bd6d
--- /dev/null
+++ b/c++/src/Murmur3.cc
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "Adaptor.hh"
+#include "Murmur3.hh"
+
+#define ROTL64(x, r) ((x << r) | (x >> (64 - r)))
+
+namespace orc {
+
+  inline uint64_t rotl64 ( uint64_t x, int8_t r ) {
+    return (x << r) | (x >> (64 - r));
+  }
+
+  inline uint64_t Murmur3::fmix64(uint64_t value) {
+    value ^= (value >> 33);
+    value *= 0xff51afd7ed558ccdL;
+    value ^= (value >> 33);
+    value *= 0xc4ceb9fe1a85ec53L;
+    value ^= (value >> 33);
+    return value;
+  }
+
+  uint64_t Murmur3::hash64(const uint8_t *data, uint32_t len) {
+    return hash64(data, len, DEFAULT_SEED);
+  }
+
+  DIAGNOSTIC_PUSH
+
+#if defined(__clang__)
+    DIAGNOSTIC_IGNORE("-Wimplicit-fallthrough")
+#endif
+
+  uint64_t Murmur3::hash64(const uint8_t *data, uint32_t len, uint32_t seed) {
+    uint64_t h = seed;
+    uint32_t blocks = len >> 3;
+
+    const uint64_t* src = reinterpret_cast<const uint64_t*>(data);
+    uint64_t c1 = 0x87c37b91114253d5L;
+    uint64_t c2 = 0x4cf5ad432745937fL;
+    for (uint32_t i = 0; i < blocks; i++) {
+      uint64_t k = src[i];
+      k *= c1;
+      k = ROTL64(k, 31);
+      k *= c2;
+
+      h ^= k;
+      h = ROTL64(h, 27);
+      h = h * 5 + 0x52dce729;
+    }
+
+    uint64_t k = 0;
+    uint32_t idx = blocks << 3;
+    switch (len - idx) {
+      case 7:
+        k ^= static_cast<uint64_t>(data[idx + 6]) << 48;
+      case 6:
+        k ^= static_cast<uint64_t>(data[idx + 5]) << 40;
+      case 5:
+        k ^= static_cast<uint64_t>(data[idx + 4]) << 32;
+      case 4:
+        k ^= static_cast<uint64_t>(data[idx + 3]) << 24;
+      case 3:
+        k ^= static_cast<uint64_t>(data[idx + 2]) << 16;
+      case 2:
+        k ^= static_cast<uint64_t>(data[idx + 1]) << 8;
+      case 1:
+        k ^= static_cast<uint64_t>(data[idx + 0]);
+
+        k *= c1;
+        k = ROTL64(k, 31);
+        k *= c2;
+        h ^= k;
+    }
+
+    h ^= len;
+    h = fmix64(h);
+    return h;
+  }
+
+  DIAGNOSTIC_POP
+
+}
diff --git a/c++/src/Murmur3.hh b/c++/src/Murmur3.hh
new file mode 100644
index 0000000..0e7b1a2
--- /dev/null
+++ b/c++/src/Murmur3.hh
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ORC_MURMUR3_HH
+#define ORC_MURMUR3_HH
+
+#include "orc/orc-config.hh"
+
+namespace orc {
+
+  class Murmur3 {
+  public:
+    enum { DEFAULT_SEED = 104729 };
+    enum { NULL_HASHCODE = 2862933555777941757LL };
+
+    static uint64_t hash64(const uint8_t *data, uint32_t len);
+
+  private:
+    static uint64_t fmix64(uint64_t value);
+    static uint64_t hash64(const uint8_t* data, uint32_t len, uint32_t seed);
+  };
+
+}
+
+#endif //ORC_MURMUR3_HH
diff --git a/c++/src/Reader.cc b/c++/src/Reader.cc
index 8f29327..c61bb68 100644
--- a/c++/src/Reader.cc
+++ b/c++/src/Reader.cc
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-
+#include "BloomFilter.hh"
 #include "Options.hh"
 #include "Reader.hh"
 #include "Statistics.hh"
@@ -1061,6 +1061,64 @@ namespace orc {
                                                   postscriptLength));
   }
 
+  std::map<uint32_t, BloomFilterIndex>
+  ReaderImpl::getBloomFilters(uint32_t stripeIndex,
+                              const std::set<uint32_t>& included) const {
+    std::map<uint32_t, BloomFilterIndex> ret;
+
+    // find stripe info
+    if (stripeIndex >= static_cast<uint32_t>(footer->stripes_size())) {
+      throw std::logic_error("Illegal stripe index: " + std::to_string(stripeIndex));
+    }
+    const proto::StripeInformation currentStripeInfo =
+      footer->stripes(static_cast<int>(stripeIndex));
+    const proto::StripeFooter currentStripeFooter =
+      getStripeFooter(currentStripeInfo, *contents);
+
+    // iterate stripe footer to get stream of bloomfilter
+    uint64_t offset = static_cast<uint64_t>(currentStripeInfo.offset());
+    for (int i = 0; i < currentStripeFooter.streams_size(); i++) {
+      const proto::Stream& stream = currentStripeFooter.streams(i);
+      uint32_t column = static_cast<uint32_t>(stream.column());
+      uint64_t length = static_cast<uint64_t>(stream.length());
+
+      // a bloom filter stream from a selected column is found
+      if (stream.kind() == proto::Stream_Kind_BLOOM_FILTER_UTF8 &&
+          (included.empty() || included.find(column) != included.end())) {
+
+        std::unique_ptr<SeekableInputStream> pbStream =
+          createDecompressor(contents->compression,
+                             std::unique_ptr<SeekableInputStream>
+                               (new SeekableFileInputStream(contents->stream.get(),
+                                                            offset,
+                                                            length,
+                                                            *contents->pool)),
+                             contents->blockSize,
+                             *(contents->pool));
+
+        proto::BloomFilterIndex pbBFIndex;
+        if (!pbBFIndex.ParseFromZeroCopyStream(pbStream.get())) {
+          throw ParseError("Failed to parse BloomFilterIndex");
+        }
+
+        BloomFilterIndex bfIndex;
+        for (int j = 0; j < pbBFIndex.bloomfilter_size(); j++) {
+          bfIndex.entries.push_back(BloomFilterUTF8Utils::deserialize(
+            stream.kind(),
+            currentStripeFooter.columns(static_cast<int>(stream.column())),
+            pbBFIndex.bloomfilter(j)));
+        }
+
+        // add bloom filters to result for one column
+        ret[column] = bfIndex;
+      }
+
+      offset += length;
+    }
+
+    return ret;
+  }
+
   RowReader::~RowReader() {
     // PASS
   }
diff --git a/c++/src/Reader.hh b/c++/src/Reader.hh
index 4efa894..75eb0bb 100644
--- a/c++/src/Reader.hh
+++ b/c++/src/Reader.hh
@@ -286,6 +286,9 @@ namespace orc {
     uint64_t getMemoryUseByName(const std::list<std::string>& names, int stripeIx=-1) override;
 
     uint64_t getMemoryUseByTypeId(const std::list<uint64_t>& include, int stripeIx=-1) override;
+
+    std::map<uint32_t, BloomFilterIndex>
+    getBloomFilters(uint32_t stripeIndex, const std::set<uint32_t>& included) const override;
   };
 
 }// namespace
diff --git a/c++/src/Writer.cc b/c++/src/Writer.cc
index 72f7ba7..826334b 100644
--- a/c++/src/Writer.cc
+++ b/c++/src/Writer.cc
@@ -38,6 +38,9 @@ namespace orc {
     FileVersion fileVersion;
     double dictionaryKeySizeThreshold;
     bool enableIndex;
+    std::set<uint64_t> columnsUseBloomFilter;
+    double bloomFilterFalsePositiveProb;
+    BloomFilterVersion bloomFilterVersion;
 
     WriterOptionsPrivate() :
                             fileVersion(FileVersion::v_0_12()) { // default to Hive_0_12
@@ -51,6 +54,8 @@ namespace orc {
       errorStream = &std::cerr;
       dictionaryKeySizeThreshold = 0.0;
       enableIndex = true;
+      bloomFilterFalsePositiveProb = 0.05;
+      bloomFilterVersion = UTF8;
     }
   };
 
@@ -200,6 +205,32 @@ namespace orc {
     return privateBits->dictionaryKeySizeThreshold > 0.0;
   }
 
+  WriterOptions& WriterOptions::setColumnsUseBloomFilter(
+    const std::set<uint64_t>& columns) {
+    privateBits->columnsUseBloomFilter = columns;
+    return *this;
+  }
+
+  bool WriterOptions::isColumnUseBloomFilter(uint64_t column) const {
+    return privateBits->columnsUseBloomFilter.find(column) !=
+           privateBits->columnsUseBloomFilter.end();
+  }
+
+  WriterOptions& WriterOptions::setBloomFilterFPP(double fpp) {
+    privateBits->bloomFilterFalsePositiveProb = fpp;
+    return *this;
+  }
+
+  double WriterOptions::getBloomFilterFPP() const {
+    return privateBits->bloomFilterFalsePositiveProb;
+  }
+
+  // delibrately not provide setter to write bloom filter version because
+  // we only support UTF8 for now.
+  BloomFilterVersion WriterOptions::getBloomFilterVersion() const {
+    return privateBits->bloomFilterVersion;
+  }
+
   Writer::~Writer() {
     // PASS
   }
diff --git a/c++/test/CMakeLists.txt b/c++/test/CMakeLists.txt
index db68578..11e8cf4 100644
--- a/c++/test/CMakeLists.txt
+++ b/c++/test/CMakeLists.txt
@@ -26,6 +26,7 @@ add_executable (orc-test
   MemoryInputStream.cc
   MemoryOutputStream.cc
   TestBufferedOutputStream.cc
+  TestBloomFilter.cc
   TestByteRle.cc
   TestByteRLEEncoder.cc
   TestColumnPrinter.cc
diff --git a/c++/test/TestBloomFilter.cc b/c++/test/TestBloomFilter.cc
new file mode 100644
index 0000000..c769f51
--- /dev/null
+++ b/c++/test/TestBloomFilter.cc
@@ -0,0 +1,219 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "BloomFilter.hh"
+#include "orc/OrcFile.hh"
+#include "wrap/gmock.h"
+#include "wrap/gtest-wrapper.h"
+
+namespace orc {
+
+  TEST(TestBloomFilter, testBitSetEqual) {
+    BitSet bitSet64_1(64), bitSet64_2(64), bitSet32(128);
+    EXPECT_TRUE(bitSet64_1 == bitSet64_2);
+    EXPECT_FALSE(bitSet64_1 == bitSet32);
+
+    bitSet64_1.set(6U);
+    bitSet64_1.set(16U);
+    bitSet64_1.set(26U);
+    bitSet64_2.set(6U);
+    bitSet64_2.set(16U);
+    bitSet64_2.set(26U);
+    EXPECT_TRUE(bitSet64_1 == bitSet64_2);
+    EXPECT_EQ(bitSet64_1.get(6U), bitSet64_2.get(6U));
+    EXPECT_EQ(bitSet64_1.get(16U), bitSet64_2.get(16U));
+    EXPECT_EQ(bitSet64_1.get(26U), bitSet64_2.get(26U));
+
+    bitSet64_1.set(36U);
+    bitSet64_2.set(46U);
+    EXPECT_FALSE(bitSet64_1 == bitSet64_2);
+    EXPECT_TRUE(bitSet64_1.get(36U));
+    EXPECT_TRUE(bitSet64_2.get(46U));
+
+    bitSet64_1.clear();
+    bitSet64_2.clear();
+    EXPECT_TRUE(bitSet64_1 == bitSet64_2);
+  }
+
+  // ported from Java ORC
+  TEST(TestBloomFilter, testSetGetBitSet) {
+    BitSet bitset(128);
+
+    // set every 9th bit for a rotating pattern
+    for (uint64_t l = 0; l < 8; ++l) {
+      bitset.set(l * 9);
+    }
+
+    // set every non-9th bit
+    for (uint64_t l = 8; l < 16; ++l) {
+      for(uint64_t b = 0; b < 8; ++b) {
+        if (b != l - 8) {
+          bitset.set(l * 8 + b);
+        }
+      }
+    }
+
+    for(uint64_t b = 0; b < 64; ++b) {
+      EXPECT_EQ(b % 9 == 0, bitset.get(b));
+    }
+
+    for(uint64_t b = 64; b < 128; ++b) {
+      EXPECT_EQ((b % 8) != (b - 64) / 8, bitset.get(b));
+    }
+
+    // test that the longs are mapped correctly
+    const uint64_t * longs = bitset.getData();
+    EXPECT_EQ(128, bitset.bitSize());
+    EXPECT_EQ(0x8040201008040201L, longs[0]);
+    EXPECT_EQ(~0x8040201008040201L, longs[1]);
+  }
+
+  TEST(TestBloomFilter, testBloomFilterBasicOperations) {
+    BloomFilterImpl bloomFilter(128);
+
+    // test integers
+    bloomFilter.reset();
+    EXPECT_FALSE(bloomFilter.testLong(1));
+    EXPECT_FALSE(bloomFilter.testLong(11));
+    EXPECT_FALSE(bloomFilter.testLong(111));
+    EXPECT_FALSE(bloomFilter.testLong(1111));
+    EXPECT_FALSE(bloomFilter.testLong(0));
+    EXPECT_FALSE(bloomFilter.testLong(-1));
+    EXPECT_FALSE(bloomFilter.testLong(-11));
+    EXPECT_FALSE(bloomFilter.testLong(-111));
+    EXPECT_FALSE(bloomFilter.testLong(-1111));
+
+    bloomFilter.addLong(1);
+    bloomFilter.addLong(11);
+    bloomFilter.addLong(111);
+    bloomFilter.addLong(1111);
+    bloomFilter.addLong(0);
+    bloomFilter.addLong(-1);
+    bloomFilter.addLong(-11);
+    bloomFilter.addLong(-111);
+    bloomFilter.addLong(-1111);
+
+    EXPECT_TRUE(bloomFilter.testLong(1));
+    EXPECT_TRUE(bloomFilter.testLong(11));
+    EXPECT_TRUE(bloomFilter.testLong(111));
+    EXPECT_TRUE(bloomFilter.testLong(1111));
+    EXPECT_TRUE(bloomFilter.testLong(0));
+    EXPECT_TRUE(bloomFilter.testLong(-1));
+    EXPECT_TRUE(bloomFilter.testLong(-11));
+    EXPECT_TRUE(bloomFilter.testLong(-111));
+    EXPECT_TRUE(bloomFilter.testLong(-1111));
+
+    // test doubles
+    bloomFilter.reset();
+    EXPECT_FALSE(bloomFilter.testDouble(1.1));
+    EXPECT_FALSE(bloomFilter.testDouble(11.11));
+    EXPECT_FALSE(bloomFilter.testDouble(111.111));
+    EXPECT_FALSE(bloomFilter.testDouble(1111.1111));
+    EXPECT_FALSE(bloomFilter.testDouble(0.0));
+    EXPECT_FALSE(bloomFilter.testDouble(-1.1));
+    EXPECT_FALSE(bloomFilter.testDouble(-11.11));
+    EXPECT_FALSE(bloomFilter.testDouble(-111.111));
+    EXPECT_FALSE(bloomFilter.testDouble(-1111.1111));
+
+    bloomFilter.addDouble(1.1);
+    bloomFilter.addDouble(11.11);
+    bloomFilter.addDouble(111.111);
+    bloomFilter.addDouble(1111.1111);
+    bloomFilter.addDouble(0.0);
+    bloomFilter.addDouble(-1.1);
+    bloomFilter.addDouble(-11.11);
+    bloomFilter.addDouble(-111.111);
+    bloomFilter.addDouble(-1111.1111);
+
+    EXPECT_TRUE(bloomFilter.testDouble(1.1));
+    EXPECT_TRUE(bloomFilter.testDouble(11.11));
+    EXPECT_TRUE(bloomFilter.testDouble(111.111));
+    EXPECT_TRUE(bloomFilter.testDouble(1111.1111));
+    EXPECT_TRUE(bloomFilter.testDouble(0.0));
+    EXPECT_TRUE(bloomFilter.testDouble(-1.1));
+    EXPECT_TRUE(bloomFilter.testDouble(-11.11));
+    EXPECT_TRUE(bloomFilter.testDouble(-111.111));
+    EXPECT_TRUE(bloomFilter.testDouble(-1111.1111));
+
+    // test strings
+    bloomFilter.reset();
+    const char * emptyStr = u8"";
+    const char * enStr = u8"english";
+    const char * cnStr = u8"中国字";
+
+    EXPECT_FALSE(bloomFilter.testBytes(emptyStr,
+                                       static_cast<int64_t>(strlen(emptyStr))));
+    EXPECT_FALSE(bloomFilter.testBytes(enStr,
+                                       static_cast<int64_t>(strlen(enStr))));
+    EXPECT_FALSE(bloomFilter.testBytes(cnStr,
+                                       static_cast<int64_t>(strlen(cnStr))));
+
+    bloomFilter.addBytes(emptyStr, static_cast<int64_t>(strlen(emptyStr)));
+    bloomFilter.addBytes(enStr, static_cast<int64_t>(strlen(enStr)));
+    bloomFilter.addBytes(cnStr, static_cast<int64_t>(strlen(cnStr)));
+
+    EXPECT_TRUE(bloomFilter.testBytes(emptyStr,
+                                      static_cast<int64_t>(strlen(emptyStr))));
+    EXPECT_TRUE(bloomFilter.testBytes(enStr,
+                                      static_cast<int64_t>(strlen(enStr))));
+    EXPECT_TRUE(bloomFilter.testBytes(cnStr,
+                                      static_cast<int64_t>(strlen(cnStr))));
+  }
+
+  TEST(TestBloomFilter, testBloomFilterSerialization) {
+    BloomFilterImpl emptyFilter1(128), emptyFilter2(256);
+    EXPECT_FALSE(emptyFilter1 == emptyFilter2);
+
+    BloomFilterImpl emptyFilter3(128, 0.05), emptyFilter4(128, 0.01);
+    EXPECT_FALSE(emptyFilter3 == emptyFilter4);
+
+    BloomFilterImpl srcBloomFilter(64);
+    srcBloomFilter.addLong(1);
+    srcBloomFilter.addLong(11);
+    srcBloomFilter.addLong(111);
+    srcBloomFilter.addLong(1111);
+    srcBloomFilter.addLong(0);
+    srcBloomFilter.addLong(-1);
+    srcBloomFilter.addLong(-11);
+    srcBloomFilter.addLong(-111);
+    srcBloomFilter.addLong(-1111);
+
+    proto::BloomFilter pbBloomFilter;
+    proto::ColumnEncoding encoding;
+    encoding.set_bloomencoding(1);
+
+    // serialize
+    BloomFilterUTF8Utils::serialize(srcBloomFilter, pbBloomFilter);
+
+    // deserialize
+    std::shared_ptr<BloomFilter> dstBloomFilter = BloomFilterUTF8Utils::deserialize(
+      proto::Stream_Kind_BLOOM_FILTER_UTF8, encoding, pbBloomFilter);
+
+    EXPECT_TRUE(srcBloomFilter == dynamic_cast<BloomFilterImpl&>(*dstBloomFilter));
+    EXPECT_TRUE(dstBloomFilter->testLong(1));
+    EXPECT_TRUE(dstBloomFilter->testLong(11));
+    EXPECT_TRUE(dstBloomFilter->testLong(111));
+    EXPECT_TRUE(dstBloomFilter->testLong(1111));
+    EXPECT_TRUE(dstBloomFilter->testLong(0));
+    EXPECT_TRUE(dstBloomFilter->testLong(-1));
+    EXPECT_TRUE(dstBloomFilter->testLong(-11));
+    EXPECT_TRUE(dstBloomFilter->testLong(-111));
+    EXPECT_TRUE(dstBloomFilter->testLong(-1111));
+  }
+
+}
diff --git a/c++/test/TestWriter.cc b/c++/test/TestWriter.cc
index 12f2ed3..930becc 100644
--- a/c++/test/TestWriter.cc
+++ b/c++/test/TestWriter.cc
@@ -1601,5 +1601,83 @@ namespace orc {
     }
   }
 
+  TEST_P(WriterTest, testBloomFilter) {
+    WriterOptions options;
+    options.setStripeSize(1024)
+      .setCompressionBlockSize(64)
+      .setCompression(CompressionKind_ZSTD)
+      .setMemoryPool(getDefaultPool())
+      .setRowIndexStride(10000)
+      .setFileVersion(fileVersion)
+      .setColumnsUseBloomFilter({1, 2});
+
+    // write 65535 rows of data
+    MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE);
+    MemoryPool * pool = getDefaultPool();
+    std::unique_ptr<Type> type(Type::buildTypeFromString(
+      "struct<c1:bigint,c2:string>"));
+
+    char dataBuffer[327675]; // 300k
+    uint64_t offset = 0;
+    uint64_t rowCount = 65535;
+
+    std::unique_ptr<Writer> writer = createWriter(*type, &memStream, options);
+    std::unique_ptr<ColumnVectorBatch> batch = writer->createRowBatch(rowCount);
+    StructVectorBatch& structBatch = dynamic_cast<StructVectorBatch&>(*batch);
+    LongVectorBatch& longBatch = dynamic_cast<LongVectorBatch&>(*structBatch.fields[0]);
+    StringVectorBatch& strBatch = dynamic_cast<StringVectorBatch&>(*structBatch.fields[1]);
+
+    for (uint64_t i = 0; i < rowCount; ++i) {
+      // each row group has a unique value
+      uint64_t data = (i / options.getRowIndexStride());
+
+      // c1
+      longBatch.data[i] = static_cast<int64_t>(data);
+
+      // c2
+      std::ostringstream os;
+      os << data;
+      strBatch.data[i] = dataBuffer + offset;
+      strBatch.length[i] = static_cast<int64_t>(os.str().size());
+      memcpy(dataBuffer + offset, os.str().c_str(), os.str().size());
+      offset += os.str().size();
+    }
+
+    structBatch.numElements = rowCount;
+    longBatch.numElements = rowCount;
+    strBatch.numElements = rowCount;
+    writer->add(*batch);
+    writer->close();
+
+    // verify bloomfilters
+    std::unique_ptr<InputStream> inStream(new MemoryInputStream(
+      memStream.getData(), memStream.getLength()));
+    std::unique_ptr<Reader> reader = createReader(pool, std::move(inStream));
+    EXPECT_EQ(rowCount, reader->getNumberOfRows());
+
+    EXPECT_EQ(2, reader->getBloomFilters(0).size());
+    EXPECT_EQ(1, reader->getBloomFilters(0, {1}).size());
+    EXPECT_EQ(1, reader->getBloomFilters(0, {2}).size());
+
+    std::map<uint32_t, BloomFilterIndex> bfs = reader->getBloomFilters(0, {1, 2});
+    EXPECT_EQ(2, bfs.size());
+    EXPECT_EQ(7, bfs[1].entries.size());
+    EXPECT_EQ(7, bfs[2].entries.size());
+
+    // test bloomfilters
+    for (uint64_t rg = 0; rg <= rowCount / options.getRowIndexStride(); ++rg) {
+      for (uint64_t value = 0; value <= 100; ++value) {
+        std::string str = std::to_string(value);
+        if (value == rg) {
+          EXPECT_TRUE(bfs[1].entries[rg]->testLong(static_cast<int64_t>(value)));
+          EXPECT_TRUE(bfs[2].entries[rg]->testBytes(str.c_str(), static_cast<int64_t>(str.size())));
+        } else {
+          EXPECT_FALSE(bfs[1].entries[rg]->testLong(static_cast<int64_t>(value)));
+          EXPECT_FALSE(bfs[2].entries[rg]->testBytes(str.c_str(), static_cast<int64_t>(str.size())));
+        }
+      }
+    }
+  }
+
   INSTANTIATE_TEST_CASE_P(OrcTest, WriterTest, Values(FileVersion::v_0_11(), FileVersion::v_0_12()));
 }