You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@orc.apache.org by md...@apache.org on 2019/05/02 20:49:10 UTC
[orc] branch master updated: ORC-488: [C++] Support BloomFilter in
C++ library (#379)
This is an automated email from the ASF dual-hosted git repository.
mdeepak pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/orc.git
The following commit(s) were added to refs/heads/master by this push:
new 61dcca9 ORC-488: [C++] Support BloomFilter in C++ library (#379)
61dcca9 is described below
commit 61dcca9d43d3da2ec03e3b984027ad2f63282430
Author: Gang Wu <ga...@alibaba-inc.com>
AuthorDate: Thu May 2 13:49:05 2019 -0700
ORC-488: [C++] Support BloomFilter in C++ library (#379)
* ORC-488: [C++] Support BloomFilter in C++ library
1. Support C++ writer to enable bloom filter of specific columns.
2. Support C++ reader to get bloom filter of specific columns.
3. Only bloom filter of UTF8 type is supported.
* fix build
* Fix types based on feedback
---
c++/include/orc/BloomFilter.hh | 45 ++++++
c++/include/orc/Common.hh | 11 ++
c++/include/orc/Reader.hh | 12 ++
c++/include/orc/Writer.hh | 26 ++++
c++/src/BloomFilter.cc | 337 +++++++++++++++++++++++++++++++++++++++++
c++/src/BloomFilter.hh | 201 ++++++++++++++++++++++++
c++/src/CMakeLists.txt | 2 +
c++/src/ColumnWriter.cc | 117 +++++++++++++-
c++/src/ColumnWriter.hh | 12 ++
c++/src/Murmur3.cc | 98 ++++++++++++
c++/src/Murmur3.hh | 40 +++++
c++/src/Reader.cc | 60 +++++++-
c++/src/Reader.hh | 3 +
c++/src/Writer.cc | 31 ++++
c++/test/CMakeLists.txt | 1 +
c++/test/TestBloomFilter.cc | 219 ++++++++++++++++++++++++++
c++/test/TestWriter.cc | 78 ++++++++++
17 files changed, 1291 insertions(+), 2 deletions(-)
diff --git a/c++/include/orc/BloomFilter.hh b/c++/include/orc/BloomFilter.hh
new file mode 100644
index 0000000..86c1288
--- /dev/null
+++ b/c++/include/orc/BloomFilter.hh
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ORC_BLOOMFILTER_HH
+#define ORC_BLOOMFILTER_HH
+
+#include "orc/orc-config.hh"
+
+#include <memory>
+#include <vector>
+
+namespace orc {
+
+ class BloomFilter {
+ public:
+ virtual ~BloomFilter();
+
+ // test if the element exists in BloomFilter
+ virtual bool testBytes(const char * data, int64_t length) const = 0;
+ virtual bool testLong(int64_t data) const = 0;
+ virtual bool testDouble(double data) const = 0;
+ };
+
+ struct BloomFilterIndex {
+ std::vector<std::shared_ptr<BloomFilter>> entries;
+ };
+
+};
+
+#endif //ORC_BLOOMFILTER_HH
diff --git a/c++/include/orc/Common.hh b/c++/include/orc/Common.hh
index 9bfa82d..c4523af 100644
--- a/c++/include/orc/Common.hh
+++ b/c++/include/orc/Common.hh
@@ -262,6 +262,17 @@ namespace orc {
}
return false;
}
+
+ enum BloomFilterVersion {
+ // Include both the BLOOM_FILTER and BLOOM_FILTER_UTF8 streams to support
+ // both old and new readers.
+ ORIGINAL = 0,
+ // Only include the BLOOM_FILTER_UTF8 streams that consistently use UTF8.
+ // See ORC-101
+ UTF8 = 1,
+ FUTURE = INT64_MAX
+ };
+
}
#endif
diff --git a/c++/include/orc/Reader.hh b/c++/include/orc/Reader.hh
index 32549b5..d5f8541 100644
--- a/c++/include/orc/Reader.hh
+++ b/c++/include/orc/Reader.hh
@@ -19,13 +19,16 @@
#ifndef ORC_READER_HH
#define ORC_READER_HH
+#include "orc/BloomFilter.hh"
#include "orc/Common.hh"
#include "orc/orc-config.hh"
#include "orc/Statistics.hh"
#include "orc/Type.hh"
#include "orc/Vector.hh"
+#include <map>
#include <memory>
+#include <set>
#include <string>
#include <vector>
@@ -472,6 +475,15 @@ namespace orc {
*/
virtual uint64_t getMemoryUseByTypeId(const std::list<uint64_t>& include, int stripeIx=-1) = 0;
+ /**
+ * Get BloomFiters of all selected columns in the specified stripe
+ * @param stripeIndex index of the stripe to be read for bloom filters.
+ * @param included index of selected columns to return (if not specified,
+ * all columns that have bloom filters are considered).
+ * @return map of bloom filters with the key standing for the index of column.
+ */
+ virtual std::map<uint32_t, BloomFilterIndex>
+ getBloomFilters(uint32_t stripeIndex, const std::set<uint32_t>& included = {}) const = 0;
};
/**
diff --git a/c++/include/orc/Writer.hh b/c++/include/orc/Writer.hh
index a333a23..5b33386 100644
--- a/c++/include/orc/Writer.hh
+++ b/c++/include/orc/Writer.hh
@@ -25,6 +25,7 @@
#include "orc/Vector.hh"
#include <memory>
+#include <set>
#include <string>
#include <vector>
@@ -191,6 +192,31 @@ namespace orc {
* @return if not set, the default is false
*/
bool getEnableDictionary() const;
+
+ /**
+ * Set columns that use BloomFilter
+ */
+ WriterOptions& setColumnsUseBloomFilter(const std::set<uint64_t>& columns);
+
+ /**
+ * Get whether this column uses BloomFilter
+ */
+ bool isColumnUseBloomFilter(uint64_t column) const;
+
+ /**
+ * Set false positive probability of BloomFilter
+ */
+ WriterOptions& setBloomFilterFPP(double fpp);
+
+ /**
+ * Get false positive probability of BloomFilter
+ */
+ double getBloomFilterFPP() const;
+
+ /**
+ * Get version of BloomFilter
+ */
+ BloomFilterVersion getBloomFilterVersion() const;
};
class Writer {
diff --git a/c++/src/BloomFilter.cc b/c++/src/BloomFilter.cc
new file mode 100644
index 0000000..020a271
--- /dev/null
+++ b/c++/src/BloomFilter.cc
@@ -0,0 +1,337 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "BloomFilter.hh"
+#include "Murmur3.hh"
+
+namespace orc {
+
+ constexpr uint64_t BITS_OF_LONG = 64;
+ constexpr uint8_t SHIFT_6_BITS = 6;
+ constexpr uint8_t SHIFT_3_BITS = 3;
+
+ static bool isLittleEndian() {
+ static union { uint32_t i; char c[4]; } num = { 0x01020304 };
+ return num.c[0] == 4;
+ }
+
+ /**
+ * Implementation of BitSet
+ */
+ BitSet::BitSet(uint64_t numBits) {
+ mData.resize(static_cast<size_t>(ceil(
+ static_cast<double>(numBits) / BITS_OF_LONG)), 0);
+ }
+
+ BitSet::BitSet(const uint64_t * bits, uint64_t numBits) {
+ // caller should make sure numBits is multiple of 64
+ mData.resize(numBits >> SHIFT_6_BITS, 0);
+ memcpy(mData.data(), bits, numBits >> SHIFT_3_BITS);
+ }
+
+ void BitSet::set(uint64_t index) {
+ mData[index >> SHIFT_6_BITS] |= (1ULL << (index % BITS_OF_LONG));
+ }
+
+ bool BitSet::get(uint64_t index) {
+ return (mData[index >> SHIFT_6_BITS] & (1ULL << (index % BITS_OF_LONG))) != 0;
+ }
+
+ uint64_t BitSet::bitSize() {
+ return mData.size() << SHIFT_6_BITS;
+ }
+
+ void BitSet::merge(const BitSet& other) {
+ if (mData.size() != other.mData.size()) {
+ std::stringstream ss;
+ ss << "BitSet must be of equal length ("
+ << mData.size() << " != " << other.mData.size() << ")";
+ throw std::logic_error(ss.str());
+ }
+
+ for (size_t i = 0; i != mData.size(); i++) {
+ mData[i] |= other.mData[i];
+ }
+ }
+
+ void BitSet::clear() {
+ memset(mData.data(), 0, sizeof(uint64_t) * mData.size());
+ }
+
+ const uint64_t * BitSet::getData() const {
+ return mData.data();
+ }
+
+ bool BitSet::operator==(const BitSet& other) const {
+ return mData == other.mData;
+ }
+
+ /**
+ * Helper functions
+ */
+ void checkArgument(bool expression, const std::string& message) {
+ if (!expression) {
+ throw std::logic_error(message);
+ }
+ }
+
+ int32_t optimalNumOfHashFunctions(uint64_t expectedEntries, uint64_t numBits) {
+ double n = static_cast<double>(expectedEntries);
+ return std::max<int32_t>(1, static_cast<int32_t>(
+ std::round(static_cast<double>(numBits) / n * std::log(2.0))));
+ }
+
+ int32_t optimalNumOfBits(uint64_t expectedEntries, double fpp) {
+ double n = static_cast<double>(expectedEntries);
+ return static_cast<int32_t>(-n * std::log(fpp) / (std::log(2.0) * std::log(2.0)));
+ }
+
+ // Thomas Wang's integer hash function
+ // http://web.archive.org/web/20071223173210/http://www.concentric.net/~Ttwang/tech/inthash.htm
+ inline uint64_t getLongHash(uint64_t key) {
+ key = (~key) + (key << 21); // key = (key << 21) - key - 1;
+ key = key ^ (key >> 24);
+ key = (key + (key << 3)) + (key << 8); // key * 265
+ key = key ^ (key >> 14);
+ key = (key + (key << 2)) + (key << 4); // key * 21
+ key = key ^ (key >> 28);
+ key = key + (key << 31);
+ return key;
+ }
+
+ // We use the trick mentioned in "Less Hashing, Same Performance:
+ // Building a Better Bloom Filter" by Kirsch et.al. From abstract
+ // 'only two hash functions are necessary to effectively implement
+ // a Bloom filter without any loss in the asymptotic false positive
+ // probability'
+ // Lets split up 64-bit hashcode into two 32-bit hash codes and employ
+ // the technique mentioned in the above paper
+ inline uint64_t getBytesHash(const char * data, int64_t length) {
+ if (data == nullptr) {
+ return Murmur3::NULL_HASHCODE;
+ }
+
+ return Murmur3::hash64(reinterpret_cast<const uint8_t *>(data),
+ static_cast<uint32_t>(length));
+ }
+
+ /**
+ * Implementation of BloomFilter
+ */
+ BloomFilterImpl::BloomFilterImpl(uint64_t expectedEntries, double fpp) {
+ checkArgument(expectedEntries > 0,
+ "expectedEntries should be > 0");
+ checkArgument(fpp > 0.0 && fpp < 1.0,
+ "False positive probability should be > 0.0 & < 1.0");
+
+ uint64_t nb = static_cast<uint64_t>(optimalNumOfBits(expectedEntries, fpp));
+ // make 'mNumBits' multiple of 64
+ mNumBits = nb + (BITS_OF_LONG - (nb % BITS_OF_LONG));
+ mNumHashFunctions = optimalNumOfHashFunctions(expectedEntries, mNumBits);
+ mBitSet.reset(new BitSet(mNumBits));
+ }
+
+ void BloomFilterImpl::addBytes(const char * data, int64_t length) {
+ uint64_t hash64 = getBytesHash(data, length);
+ addHash(hash64);
+ }
+
+ void BloomFilterImpl::addLong(int64_t data) {
+ addHash(getLongHash(static_cast<uint64_t>(data)));
+ }
+
+ bool BloomFilterImpl::testBytes(const char * data, int64_t length) const {
+ uint64_t hash64 = getBytesHash(data, length);
+ return testHash(hash64);
+ }
+
+ bool BloomFilterImpl::testLong(int64_t data) const {
+ return testHash(getLongHash(static_cast<uint64_t>(data)));
+ }
+
+ uint64_t BloomFilterImpl::sizeInBytes() const {
+ return getBitSize() >> SHIFT_3_BITS;
+ }
+
+ uint64_t BloomFilterImpl::getBitSize() const {
+ return mBitSet->bitSize();
+ }
+
+ int32_t BloomFilterImpl::getNumHashFunctions() const {
+ return mNumHashFunctions;
+ }
+
+ DIAGNOSTIC_PUSH
+
+#if defined(__clang__)
+ DIAGNOSTIC_IGNORE("-Wundefined-reinterpret-cast")
+#endif
+
+#if defined(__GNUC__)
+ DIAGNOSTIC_IGNORE("-Wstrict-aliasing")
+#endif
+
+ // caller should make sure input proto::BloomFilter is valid since
+ // no check will be performed in the following constructor
+ BloomFilterImpl::BloomFilterImpl(const proto::BloomFilter& bloomFilter) {
+ mNumHashFunctions = static_cast<int32_t>(bloomFilter.numhashfunctions());
+
+ const std::string& bitsetStr = bloomFilter.utf8bitset();
+ mNumBits = bitsetStr.size() << SHIFT_3_BITS;
+ checkArgument(mNumBits % BITS_OF_LONG == 0, "numBits should be multiple of 64!");
+
+ const uint64_t * bitset = reinterpret_cast<const uint64_t *>(bitsetStr.data());
+ if (isLittleEndian()) {
+ mBitSet.reset(new BitSet(bitset, mNumBits));
+ } else {
+ std::vector<uint64_t> longs(mNumBits >> SHIFT_6_BITS);
+ for (size_t i = 0; i != longs.size(); ++i) {
+ // convert little-endian to big-endian
+ const uint64_t src = bitset[i];
+ uint64_t& dst = longs[i];
+ for (size_t bit = 0; bit != 64; bit += 8) {
+ dst |= (((src & (0xFFu << bit)) >> bit) << (56 - bit));
+ }
+ }
+
+ mBitSet.reset(new BitSet(longs.data(), mNumBits));
+ }
+ }
+
+ void BloomFilterImpl::addDouble(double data) {
+ addLong(reinterpret_cast<int64_t&>(data));
+ }
+
+ bool BloomFilterImpl::testDouble(double data) const{
+ return testLong(reinterpret_cast<int64_t&>(data));
+ }
+
+ DIAGNOSTIC_POP
+
+ void BloomFilterImpl::addHash(uint64_t hash64) {
+ int32_t hash1 = static_cast<int32_t>(hash64 & 0xffffffff);
+ int32_t hash2 = static_cast<int32_t>(hash64 >> 32);
+
+ for (int32_t i = 1; i <= mNumHashFunctions; ++i) {
+ int32_t combinedHash = hash1 + i * hash2;
+ // hashcode should be positive, flip all the bits if it's negative
+ if (combinedHash < 0) {
+ combinedHash = ~combinedHash;
+ }
+ uint64_t pos = static_cast<uint64_t>(combinedHash) % mNumBits;
+ mBitSet->set(pos);
+ }
+ }
+
+ bool BloomFilterImpl::testHash(uint64_t hash64) const{
+ int32_t hash1 = static_cast<int32_t>(hash64 & 0xffffffff);
+ int32_t hash2 = static_cast<int32_t>(hash64 >> 32);
+
+ for (int32_t i = 1; i <= mNumHashFunctions; ++i) {
+ int32_t combinedHash = hash1 + i * hash2;
+ // hashcode should be positive, flip all the bits if it's negative
+ if (combinedHash < 0) {
+ combinedHash = ~combinedHash;
+ }
+ uint64_t pos = static_cast<uint64_t>(combinedHash) % mNumBits;
+ if (!mBitSet->get(pos)) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ void BloomFilterImpl::merge(const BloomFilterImpl& other) {
+ if (mNumBits != other.mNumBits || mNumHashFunctions != other.mNumHashFunctions) {
+ std::stringstream ss;
+ ss << "BloomFilters are not compatible for merging: "
+ << "this: numBits:" << mNumBits
+ << ",numHashFunctions:" << mNumHashFunctions
+ << ", that: numBits:" << other.mNumBits
+ << ",numHashFunctions:" << other.mNumHashFunctions;
+ throw std::logic_error(ss.str());
+ }
+
+ mBitSet->merge(*other.mBitSet);
+ }
+
+ void BloomFilterImpl::reset() {
+ mBitSet->clear();
+ }
+
+ void BloomFilterImpl::serialize(proto::BloomFilter& bloomFilter) const {
+ bloomFilter.set_numhashfunctions(static_cast<uint32_t>(mNumHashFunctions));
+
+ // According to ORC standard, the encoding is a sequence of bytes with
+ // a little endian encoding in the utf8bitset field.
+ if (isLittleEndian()) {
+ // bytes are already organized in little endian; thus no conversion needed
+ const char * bitset = reinterpret_cast<const char *>(mBitSet->getData());
+ bloomFilter.set_utf8bitset(bitset, sizeInBytes());
+ } else {
+ std::vector<uint64_t> bitset(sizeInBytes() / sizeof(uint64_t), 0);
+ const uint64_t * longs = mBitSet->getData();
+ for (size_t i = 0; i != bitset.size(); ++i) {
+ uint64_t& dst = bitset[i];
+ const uint64_t src = longs[i];
+ // convert big-endian to little-endian
+ for (size_t bit = 0; bit != 64; bit += 8) {
+ dst |= (((src & (0xFFu << bit)) >> bit) << (56 - bit));
+ }
+ }
+ bloomFilter.set_utf8bitset(bitset.data(), sizeInBytes());
+ }
+ }
+
+ bool BloomFilterImpl::operator==(const BloomFilterImpl& other) const {
+ return mNumBits == other.mNumBits &&
+ mNumHashFunctions == other.mNumHashFunctions &&
+ *mBitSet == *other.mBitSet;
+ }
+
+ BloomFilter::~BloomFilter() {
+ // PASS
+ }
+
+ std::unique_ptr<BloomFilter> BloomFilterUTF8Utils::deserialize(
+ const proto::Stream_Kind& streamKind,
+ const proto::ColumnEncoding& encoding,
+ const proto::BloomFilter& bloomFilter) {
+
+ std::unique_ptr<BloomFilter> ret(nullptr);
+
+ // only BLOOM_FILTER_UTF8 is supported
+ if (streamKind != proto::Stream_Kind_BLOOM_FILTER_UTF8) {
+ return ret;
+ }
+
+ // make sure we don't use unknown encodings or original timestamp encodings
+ if (!encoding.has_bloomencoding() || encoding.bloomencoding() != 1) {
+ return ret;
+ }
+
+ // make sure all required fields exist
+ if (!bloomFilter.has_numhashfunctions() || !bloomFilter.has_utf8bitset()) {
+ return ret;
+ }
+
+ ret.reset(new BloomFilterImpl(bloomFilter));
+ return ret;
+ }
+
+}
diff --git a/c++/src/BloomFilter.hh b/c++/src/BloomFilter.hh
new file mode 100644
index 0000000..8c0d73a
--- /dev/null
+++ b/c++/src/BloomFilter.hh
@@ -0,0 +1,201 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ORC_BLOOMFILTER_IMPL_HH
+#define ORC_BLOOMFILTER_IMPL_HH
+
+#include "orc/BloomFilter.hh"
+#include "wrap/orc-proto-wrapper.hh"
+
+#include <cmath>
+#include <sstream>
+#include <vector>
+
+namespace orc {
+
+ /**
+ * Bare metal bit set implementation. For performance reasons, this implementation does not check
+ * for index bounds nor expand the bit set size if the specified index is greater than the size.
+ */
+ class BitSet {
+ public:
+ /**
+ * Creates an empty BitSet
+ *
+ * @param numBits - number of bits used
+ */
+ BitSet(uint64_t numBits);
+
+ /**
+ * Creates BitSet from serialized uint64_t buffer
+ *
+ * @param bits - serialized uint64_t buffer of bitset
+ * @param numBits - number of bits used
+ */
+ BitSet(const uint64_t * bits, uint64_t numBits);
+
+ /**
+ * Sets the bit at specified index.
+ *
+ * @param index - position
+ */
+ void set(uint64_t index);
+
+ /**
+ * Returns true if the bit is set in the specified index.
+ *
+ * @param index - position
+ * @return - value at the bit position
+ */
+ bool get(uint64_t index);
+
+ /**
+ * Number of bits
+ */
+ uint64_t bitSize();
+
+ /**
+ * Combines the two BitSets using bitwise OR.
+ */
+ void merge(const BitSet& other);
+
+ /**
+ * Clears the bit set.
+ */
+ void clear();
+
+ /**
+ * Gets underlying raw data
+ */
+ const uint64_t * getData() const;
+
+ /**
+ * Compares two BitSets
+ */
+ bool operator==(const BitSet& other) const;
+
+ private:
+ std::vector<uint64_t> mData;
+ };
+
+ /**
+ * BloomFilter is a probabilistic data structure for set membership check.
+ * BloomFilters are highly space efficient when compared to using a HashSet.
+ * Because of the probabilistic nature of bloom filter false positive (element
+ * not present in bloom filter but test() says true) are possible but false
+ * negatives are not possible (if element is present then test() will never
+ * say false). The false positive probability is configurable (default: 5%)
+ * depending on which storage requirement may increase or decrease. Lower the
+ * false positive probability greater is the space requirement.
+ *
+ * Bloom filters are sensitive to number of elements that will be inserted in
+ * the bloom filter. During the creation of bloom filter expected number of
+ * entries must be specified. If the number of insertions exceed the specified
+ * initial number of entries then false positive probability will increase
+ * accordingly.
+ *
+ * Internally, this implementation of bloom filter uses Murmur3 fast
+ * non-cryptographic hash algorithm. Although Murmur2 is slightly faster than
+ * Murmur3 in Java, it suffers from hash collisions for specific sequence of
+ * repeating bytes. Check the following link for more info
+ * https://code.google.com/p/smhasher/wiki/MurmurHash2Flaw
+ *
+ * Note that this class is here for backwards compatibility, because it uses
+ * the JVM default character set for strings. All new users should
+ * BloomFilterUtf8, which always uses UTF8 for the encoding.
+ */
+ class BloomFilterImpl : public BloomFilter {
+ public:
+ /**
+ * Creates an empty BloomFilter
+ *
+ * @param expectedEntries - number of entries it will hold
+ * @param fpp - false positive probability
+ */
+ BloomFilterImpl(uint64_t expectedEntries, double fpp);
+
+ BloomFilterImpl(uint64_t expectedEntries)
+ :BloomFilterImpl(expectedEntries, DEFAULT_FPP) {
+ // PASS
+ }
+
+ /**
+ * Creates a BloomFilter by deserializing the proto-buf version
+ *
+ * caller should make sure input proto::BloomFilter is valid
+ */
+ BloomFilterImpl(const proto::BloomFilter& bloomFilter);
+
+ /**
+ * Adds a new element to the BloomFilter
+ */
+ void addBytes(const char * data, int64_t length);
+ void addLong(int64_t data);
+ void addDouble(double data);
+
+ /**
+ * Test if the element exists in BloomFilter
+ */
+ bool testBytes(const char * data, int64_t length) const override;
+ bool testLong(int64_t data) const override;
+ bool testDouble(double data) const override;
+
+ uint64_t sizeInBytes() const;
+ uint64_t getBitSize() const;
+ int32_t getNumHashFunctions() const;
+
+ void merge(const BloomFilterImpl& other);
+
+ void reset();
+
+ bool operator==(const BloomFilterImpl& other) const;
+
+ private:
+ friend struct BloomFilterUTF8Utils;
+
+ // compute k hash values from hash64 and set bits
+ void addHash(uint64_t hash64);
+
+ // compute k hash values from hash64 and check bits
+ bool testHash(uint64_t hash64) const;
+
+ void serialize(proto::BloomFilter& bloomFilter) const;
+
+ private:
+ static constexpr double DEFAULT_FPP = 0.05;
+ uint64_t mNumBits;
+ int32_t mNumHashFunctions;
+ std::unique_ptr<BitSet> mBitSet;
+ };
+
+ struct BloomFilterUTF8Utils {
+ // serialize BloomFilter in protobuf
+ static void serialize(const BloomFilterImpl& in, proto::BloomFilter& out) {
+ in.serialize(out);
+ }
+
+ // deserialize BloomFilter from protobuf
+ static std::unique_ptr<BloomFilter>
+ deserialize(const proto::Stream_Kind& streamKind,
+ const proto::ColumnEncoding& columnEncoding,
+ const proto::BloomFilter& bloomFilter);
+ };
+
+}
+
+#endif //ORC_BLOOMFILTER_IMPL_HH
diff --git a/c++/src/CMakeLists.txt b/c++/src/CMakeLists.txt
index 235ced8..1991f74 100644
--- a/c++/src/CMakeLists.txt
+++ b/c++/src/CMakeLists.txt
@@ -186,6 +186,7 @@ set(SOURCE_FILES
io/OutputStream.cc
wrap/orc-proto-wrapper.cc
Adaptor.cc
+ BloomFilter.cc
ByteRLE.cc
ColumnPrinter.cc
ColumnReader.cc
@@ -196,6 +197,7 @@ set(SOURCE_FILES
Int128.cc
LzoDecompressor.cc
MemoryPool.cc
+ Murmur3.cc
OrcFile.cc
Reader.cc
RLEv1.cc
diff --git a/c++/src/ColumnWriter.cc b/c++/src/ColumnWriter.cc
index 8c21ed3..30d96ac 100644
--- a/c++/src/ColumnWriter.cc
+++ b/c++/src/ColumnWriter.cc
@@ -97,8 +97,10 @@ namespace orc {
rowIndex(),
rowIndexEntry(),
rowIndexPosition(),
+ enableBloomFilter(false),
memPool(*options.getMemoryPool()),
- indexStream() {
+ indexStream(),
+ bloomFilterStream() {
std::unique_ptr<BufferedOutputStream> presentStream =
factory.createStream(proto::Stream_Kind_PRESENT);
@@ -116,6 +118,16 @@ namespace orc {
new RowIndexPositionRecorder(*rowIndexEntry));
indexStream =
factory.createStream(proto::Stream_Kind_ROW_INDEX);
+
+ // BloomFilters for non-UTF8 strings and non-UTC timestamps are not supported
+ if (options.isColumnUseBloomFilter(columnId)
+ && options.getBloomFilterVersion() == BloomFilterVersion::UTF8) {
+ enableBloomFilter = true;
+ bloomFilter.reset(new BloomFilterImpl(
+ options.getRowIndexStride(), options.getBloomFilterFPP()));
+ bloomFilterIndex.reset(new proto::BloomFilterIndex());
+ bloomFilterStream = factory.createStream(proto::Stream_Kind_BLOOM_FILTER_UTF8);
+ }
}
}
@@ -174,9 +186,18 @@ namespace orc {
colStripeStatistics->merge(*colIndexStatistics);
colIndexStatistics->reset();
+ addBloomFilterEntry();
+
recordPosition();
}
+ void ColumnWriter::addBloomFilterEntry() {
+ if (enableBloomFilter) {
+ BloomFilterUTF8Utils::serialize(*bloomFilter, *bloomFilterIndex->add_bloomfilter());
+ bloomFilter->reset();
+ }
+ }
+
void ColumnWriter::writeIndex(std::vector<proto::Stream> &streams) const {
// write row index to output stream
rowIndex->SerializeToZeroCopyStream(indexStream.get());
@@ -187,6 +208,17 @@ namespace orc {
stream.set_column(static_cast<uint32_t>(columnId));
stream.set_length(indexStream->flush());
streams.push_back(stream);
+
+ // write BLOOM_FILTER_UTF8 stream
+ if (enableBloomFilter) {
+ if (!bloomFilterIndex->SerializeToZeroCopyStream(bloomFilterStream.get())) {
+ throw std::logic_error("Failed to write bloom filter stream.");
+ }
+ stream.set_kind(proto::Stream_Kind_BLOOM_FILTER_UTF8);
+ stream.set_column(static_cast<uint32_t>(columnId));
+ stream.set_length(bloomFilterStream->flush());
+ streams.push_back(stream);
+ }
}
void ColumnWriter::recordPosition() const {
@@ -203,6 +235,11 @@ namespace orc {
// write current positions
recordPosition();
}
+
+ if (enableBloomFilter) {
+ bloomFilter->reset();
+ bloomFilterIndex->clear_bloomfilter();
+ }
}
void ColumnWriter::writeDictionary() {
@@ -474,6 +511,9 @@ namespace orc {
for (uint64_t i = 0; i < numValues; ++i) {
if (notNull == nullptr || notNull[i]) {
++count;
+ if (enableBloomFilter) {
+ bloomFilter->addLong(data[i]);
+ }
intStats->update(data[i], 1);
}
}
@@ -504,6 +544,9 @@ namespace orc {
proto::ColumnEncoding encoding;
encoding.set_kind(RleVersionMapper(rleVersion));
encoding.set_dictionarysize(0);
+ if (enableBloomFilter) {
+ encoding.set_bloomencoding(BloomFilterVersion::UTF8);
+ }
encodings.push_back(encoding);
}
@@ -580,6 +623,9 @@ namespace orc {
for (uint64_t i = 0; i < numValues; ++i) {
if (notNull == nullptr || notNull[i]) {
++count;
+ if (enableBloomFilter) {
+ bloomFilter->addLong(data[i]);
+ }
intStats->update(static_cast<int64_t>(byteData[i]), 1);
}
}
@@ -610,6 +656,9 @@ namespace orc {
proto::ColumnEncoding encoding;
encoding.set_kind(proto::ColumnEncoding_Kind_DIRECT);
encoding.set_dictionarysize(0);
+ if (enableBloomFilter) {
+ encoding.set_bloomencoding(BloomFilterVersion::UTF8);
+ }
encodings.push_back(encoding);
}
@@ -686,6 +735,9 @@ namespace orc {
for (uint64_t i = 0; i < numValues; ++i) {
if (notNull == nullptr || notNull[i]) {
++count;
+ if (enableBloomFilter) {
+ bloomFilter->addLong(data[i]);
+ }
boolStats->update(byteData[i] != 0, 1);
}
}
@@ -716,6 +768,9 @@ namespace orc {
proto::ColumnEncoding encoding;
encoding.set_kind(proto::ColumnEncoding_Kind_DIRECT);
encoding.set_dictionarysize(0);
+ if (enableBloomFilter) {
+ encoding.set_bloomencoding(BloomFilterVersion::UTF8);
+ }
encodings.push_back(encoding);
}
@@ -811,6 +866,9 @@ namespace orc {
}
dataStream->write(data, bytes);
++count;
+ if (enableBloomFilter) {
+ bloomFilter->addDouble(doubleData[i]);
+ }
doubleStats->update(doubleData[i]);
}
}
@@ -841,6 +899,9 @@ namespace orc {
proto::ColumnEncoding encoding;
encoding.set_kind(proto::ColumnEncoding_Kind_DIRECT);
encoding.set_dictionarysize(0);
+ if (enableBloomFilter) {
+ encoding.set_bloomencoding(BloomFilterVersion::UTF8);
+ }
encodings.push_back(encoding);
}
@@ -1112,6 +1173,9 @@ namespace orc {
} else {
directDataStream->write(data[i], len);
}
+ if (enableBloomFilter) {
+ bloomFilter->addBytes(data[i], static_cast<int64_t>(len));
+ }
strStats->update(data[i], len);
++count;
}
@@ -1187,6 +1251,9 @@ namespace orc {
proto::ColumnEncoding_Kind_DICTIONARY_V2);
}
encoding.set_dictionarysize(static_cast<uint32_t>(dictionary.size()));
+ if (enableBloomFilter) {
+ encoding.set_bloomencoding(BloomFilterVersion::UTF8);
+ }
encodings.push_back(encoding);
}
@@ -1489,6 +1556,9 @@ namespace orc {
directDataStream->write(charData, static_cast<size_t>(length[i]));
}
+ if (enableBloomFilter) {
+ bloomFilter->addBytes(data[i], length[i]);
+ }
strStats->update(charData, static_cast<size_t>(length[i]));
++count;
}
@@ -1559,6 +1629,9 @@ namespace orc {
directDataStream->write(data[i], static_cast<size_t>(length[i]));
}
+ if (enableBloomFilter) {
+ bloomFilter->addBytes(data[i], length[i]);
+ }
strStats->update(data[i], static_cast<size_t>(length[i]));
++count;
}
@@ -1733,6 +1806,9 @@ namespace orc {
// TimestampVectorBatch already stores data in UTC
int64_t millsUTC = secs[i] * 1000 + nanos[i] / 1000000;
++count;
+ if (enableBloomFilter) {
+ bloomFilter->addLong(millsUTC);
+ }
tsStats->update(millsUTC);
if (secs[i] < 0 && nanos[i] != 0) {
@@ -1780,6 +1856,9 @@ namespace orc {
proto::ColumnEncoding encoding;
encoding.set_kind(RleVersionMapper(rleVersion));
encoding.set_dictionarysize(0);
+ if (enableBloomFilter) {
+ encoding.set_bloomencoding(BloomFilterVersion::UTF8);
+ }
encodings.push_back(encoding);
}
@@ -1838,6 +1917,9 @@ namespace orc {
if (!notNull || notNull[i]) {
++count;
dateStats->update(static_cast<int32_t>(data[i]));
+ if (enableBloomFilter) {
+ bloomFilter->addLong(data[i]);
+ }
}
}
dateStats->increase(count);
@@ -1942,6 +2024,12 @@ namespace orc {
}
valueStream->write(buffer, static_cast<size_t>(data - buffer));
++count;
+ if (enableBloomFilter) {
+ std::string decimal = Decimal(
+ values[i], static_cast<int32_t>(scale)).toString();
+ bloomFilter->addBytes(
+ decimal.c_str(), static_cast<int64_t>(decimal.size()));
+ }
decStats->update(Decimal(values[i], static_cast<int32_t>(scale)));
}
}
@@ -1981,6 +2069,9 @@ namespace orc {
proto::ColumnEncoding encoding;
encoding.set_kind(RleVersionMapper(rleVersion));
encoding.set_dictionarysize(0);
+ if (enableBloomFilter) {
+ encoding.set_bloomencoding(BloomFilterVersion::UTF8);
+ }
encodings.push_back(encoding);
}
@@ -2067,6 +2158,12 @@ namespace orc {
valueStream->write(buffer, static_cast<size_t>(data - buffer));
++count;
+ if (enableBloomFilter) {
+ std::string decimal = Decimal(
+ values[i], static_cast<int32_t>(scale)).toString();
+ bloomFilter->addBytes(
+ decimal.c_str(), static_cast<int64_t>(decimal.size()));
+ }
decStats->update(Decimal(values[i], static_cast<int32_t>(scale)));
}
}
@@ -2188,6 +2285,9 @@ namespace orc {
for (uint64_t i = 0; i < numValues; ++i) {
if (notNull[i]) {
++count;
+ if (enableBloomFilter) {
+ bloomFilter->addLong(offsets[i]);
+ }
}
}
colIndexStatistics->increase(count);
@@ -2233,6 +2333,9 @@ namespace orc {
proto::ColumnEncoding encoding;
encoding.set_kind(RleVersionMapper(rleVersion));
encoding.set_dictionarysize(0);
+ if (enableBloomFilter) {
+ encoding.set_bloomencoding(BloomFilterVersion::UTF8);
+ }
encodings.push_back(encoding);
if (child.get()) {
child->getColumnEncoding(encodings);
@@ -2412,6 +2515,9 @@ namespace orc {
for (uint64_t i = 0; i < numValues; ++i) {
if (notNull[i]) {
++count;
+ if (enableBloomFilter) {
+ bloomFilter->addLong(offsets[i]);
+ }
}
}
colIndexStatistics->increase(count);
@@ -2467,6 +2573,9 @@ namespace orc {
proto::ColumnEncoding encoding;
encoding.set_kind(RleVersionMapper(rleVersion));
encoding.set_dictionarysize(0);
+ if (enableBloomFilter) {
+ encoding.set_bloomencoding(BloomFilterVersion::UTF8);
+ }
encodings.push_back(encoding);
if (keyWriter.get()) {
keyWriter->getColumnEncoding(encodings);
@@ -2668,6 +2777,9 @@ namespace orc {
for (uint64_t i = 0; i < numValues; ++i) {
if (notNull[i]) {
++count;
+ if (enableBloomFilter) {
+ bloomFilter->addLong(tags[i]);
+ }
}
}
colIndexStatistics->increase(count);
@@ -2713,6 +2825,9 @@ namespace orc {
proto::ColumnEncoding encoding;
encoding.set_kind(proto::ColumnEncoding_Kind_DIRECT);
encoding.set_dictionarysize(0);
+ if (enableBloomFilter) {
+ encoding.set_bloomencoding(BloomFilterVersion::UTF8);
+ }
encodings.push_back(encoding);
for (uint32_t i = 0; i < children.size(); ++i) {
children[i]->getColumnEncoding(encodings);
diff --git a/c++/src/ColumnWriter.hh b/c++/src/ColumnWriter.hh
index 2364066..cbbb5d0 100644
--- a/c++/src/ColumnWriter.hh
+++ b/c++/src/ColumnWriter.hh
@@ -21,6 +21,7 @@
#include "orc/Vector.hh"
+#include "BloomFilter.hh"
#include "ByteRLE.hh"
#include "Compression.hh"
#include "orc/Exceptions.hh"
@@ -82,6 +83,11 @@ namespace orc {
std::unique_ptr<proto::RowIndexEntry> rowIndexEntry;
std::unique_ptr<RowIndexPositionRecorder> rowIndexPosition;
+ // bloom filters are recorded per row group
+ bool enableBloomFilter;
+ std::unique_ptr<BloomFilterImpl> bloomFilter;
+ std::unique_ptr<proto::BloomFilterIndex> bloomFilterIndex;
+
public:
ColumnWriter(const Type& type, const StreamsFactory& factory,
const WriterOptions& options);
@@ -153,6 +159,11 @@ namespace orc {
virtual void createRowIndexEntry();
/**
+ * Create a new BloomFilter entry and add the previous one to BloomFilterIndex
+ */
+ virtual void addBloomFilterEntry();
+
+ /**
* Write row index streams for this column.
* @param streams output list of ROW_INDEX streams
*/
@@ -195,6 +206,7 @@ namespace orc {
protected:
MemoryPool& memPool;
std::unique_ptr<BufferedOutputStream> indexStream;
+ std::unique_ptr<BufferedOutputStream> bloomFilterStream;
};
/**
diff --git a/c++/src/Murmur3.cc b/c++/src/Murmur3.cc
new file mode 100644
index 0000000..b45bd6d
--- /dev/null
+++ b/c++/src/Murmur3.cc
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "Adaptor.hh"
+#include "Murmur3.hh"
+
+#define ROTL64(x, r) ((x << r) | (x >> (64 - r)))
+
+namespace orc {
+
+ inline uint64_t rotl64 ( uint64_t x, int8_t r ) {
+ return (x << r) | (x >> (64 - r));
+ }
+
+ inline uint64_t Murmur3::fmix64(uint64_t value) {
+ value ^= (value >> 33);
+ value *= 0xff51afd7ed558ccdL;
+ value ^= (value >> 33);
+ value *= 0xc4ceb9fe1a85ec53L;
+ value ^= (value >> 33);
+ return value;
+ }
+
+ uint64_t Murmur3::hash64(const uint8_t *data, uint32_t len) {
+ return hash64(data, len, DEFAULT_SEED);
+ }
+
+ DIAGNOSTIC_PUSH
+
+#if defined(__clang__)
+ DIAGNOSTIC_IGNORE("-Wimplicit-fallthrough")
+#endif
+
+ uint64_t Murmur3::hash64(const uint8_t *data, uint32_t len, uint32_t seed) {
+ uint64_t h = seed;
+ uint32_t blocks = len >> 3;
+
+ const uint64_t* src = reinterpret_cast<const uint64_t*>(data);
+ uint64_t c1 = 0x87c37b91114253d5L;
+ uint64_t c2 = 0x4cf5ad432745937fL;
+ for (uint32_t i = 0; i < blocks; i++) {
+ uint64_t k = src[i];
+ k *= c1;
+ k = ROTL64(k, 31);
+ k *= c2;
+
+ h ^= k;
+ h = ROTL64(h, 27);
+ h = h * 5 + 0x52dce729;
+ }
+
+ uint64_t k = 0;
+ uint32_t idx = blocks << 3;
+ switch (len - idx) {
+ case 7:
+ k ^= static_cast<uint64_t>(data[idx + 6]) << 48;
+ case 6:
+ k ^= static_cast<uint64_t>(data[idx + 5]) << 40;
+ case 5:
+ k ^= static_cast<uint64_t>(data[idx + 4]) << 32;
+ case 4:
+ k ^= static_cast<uint64_t>(data[idx + 3]) << 24;
+ case 3:
+ k ^= static_cast<uint64_t>(data[idx + 2]) << 16;
+ case 2:
+ k ^= static_cast<uint64_t>(data[idx + 1]) << 8;
+ case 1:
+ k ^= static_cast<uint64_t>(data[idx + 0]);
+
+ k *= c1;
+ k = ROTL64(k, 31);
+ k *= c2;
+ h ^= k;
+ }
+
+ h ^= len;
+ h = fmix64(h);
+ return h;
+ }
+
+ DIAGNOSTIC_POP
+
+}
diff --git a/c++/src/Murmur3.hh b/c++/src/Murmur3.hh
new file mode 100644
index 0000000..0e7b1a2
--- /dev/null
+++ b/c++/src/Murmur3.hh
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ORC_MURMUR3_HH
+#define ORC_MURMUR3_HH
+
+#include "orc/orc-config.hh"
+
+namespace orc {
+
+ class Murmur3 {
+ public:
+ enum { DEFAULT_SEED = 104729 };
+ enum { NULL_HASHCODE = 2862933555777941757LL };
+
+ static uint64_t hash64(const uint8_t *data, uint32_t len);
+
+ private:
+ static uint64_t fmix64(uint64_t value);
+ static uint64_t hash64(const uint8_t* data, uint32_t len, uint32_t seed);
+ };
+
+}
+
+#endif //ORC_MURMUR3_HH
diff --git a/c++/src/Reader.cc b/c++/src/Reader.cc
index 8f29327..c61bb68 100644
--- a/c++/src/Reader.cc
+++ b/c++/src/Reader.cc
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-
+#include "BloomFilter.hh"
#include "Options.hh"
#include "Reader.hh"
#include "Statistics.hh"
@@ -1061,6 +1061,64 @@ namespace orc {
postscriptLength));
}
+ std::map<uint32_t, BloomFilterIndex>
+ ReaderImpl::getBloomFilters(uint32_t stripeIndex,
+ const std::set<uint32_t>& included) const {
+ std::map<uint32_t, BloomFilterIndex> ret;
+
+ // find stripe info
+ if (stripeIndex >= static_cast<uint32_t>(footer->stripes_size())) {
+ throw std::logic_error("Illegal stripe index: " + std::to_string(stripeIndex));
+ }
+ const proto::StripeInformation currentStripeInfo =
+ footer->stripes(static_cast<int>(stripeIndex));
+ const proto::StripeFooter currentStripeFooter =
+ getStripeFooter(currentStripeInfo, *contents);
+
+ // iterate stripe footer to get stream of bloomfilter
+ uint64_t offset = static_cast<uint64_t>(currentStripeInfo.offset());
+ for (int i = 0; i < currentStripeFooter.streams_size(); i++) {
+ const proto::Stream& stream = currentStripeFooter.streams(i);
+ uint32_t column = static_cast<uint32_t>(stream.column());
+ uint64_t length = static_cast<uint64_t>(stream.length());
+
+ // a bloom filter stream from a selected column is found
+ if (stream.kind() == proto::Stream_Kind_BLOOM_FILTER_UTF8 &&
+ (included.empty() || included.find(column) != included.end())) {
+
+ std::unique_ptr<SeekableInputStream> pbStream =
+ createDecompressor(contents->compression,
+ std::unique_ptr<SeekableInputStream>
+ (new SeekableFileInputStream(contents->stream.get(),
+ offset,
+ length,
+ *contents->pool)),
+ contents->blockSize,
+ *(contents->pool));
+
+ proto::BloomFilterIndex pbBFIndex;
+ if (!pbBFIndex.ParseFromZeroCopyStream(pbStream.get())) {
+ throw ParseError("Failed to parse BloomFilterIndex");
+ }
+
+ BloomFilterIndex bfIndex;
+ for (int j = 0; j < pbBFIndex.bloomfilter_size(); j++) {
+ bfIndex.entries.push_back(BloomFilterUTF8Utils::deserialize(
+ stream.kind(),
+ currentStripeFooter.columns(static_cast<int>(stream.column())),
+ pbBFIndex.bloomfilter(j)));
+ }
+
+ // add bloom filters to result for one column
+ ret[column] = bfIndex;
+ }
+
+ offset += length;
+ }
+
+ return ret;
+ }
+
RowReader::~RowReader() {
// PASS
}
diff --git a/c++/src/Reader.hh b/c++/src/Reader.hh
index 4efa894..75eb0bb 100644
--- a/c++/src/Reader.hh
+++ b/c++/src/Reader.hh
@@ -286,6 +286,9 @@ namespace orc {
uint64_t getMemoryUseByName(const std::list<std::string>& names, int stripeIx=-1) override;
uint64_t getMemoryUseByTypeId(const std::list<uint64_t>& include, int stripeIx=-1) override;
+
+ std::map<uint32_t, BloomFilterIndex>
+ getBloomFilters(uint32_t stripeIndex, const std::set<uint32_t>& included) const override;
};
}// namespace
diff --git a/c++/src/Writer.cc b/c++/src/Writer.cc
index 72f7ba7..826334b 100644
--- a/c++/src/Writer.cc
+++ b/c++/src/Writer.cc
@@ -38,6 +38,9 @@ namespace orc {
FileVersion fileVersion;
double dictionaryKeySizeThreshold;
bool enableIndex;
+ std::set<uint64_t> columnsUseBloomFilter;
+ double bloomFilterFalsePositiveProb;
+ BloomFilterVersion bloomFilterVersion;
WriterOptionsPrivate() :
fileVersion(FileVersion::v_0_12()) { // default to Hive_0_12
@@ -51,6 +54,8 @@ namespace orc {
errorStream = &std::cerr;
dictionaryKeySizeThreshold = 0.0;
enableIndex = true;
+ bloomFilterFalsePositiveProb = 0.05;
+ bloomFilterVersion = UTF8;
}
};
@@ -200,6 +205,32 @@ namespace orc {
return privateBits->dictionaryKeySizeThreshold > 0.0;
}
+ WriterOptions& WriterOptions::setColumnsUseBloomFilter(
+ const std::set<uint64_t>& columns) {
+ privateBits->columnsUseBloomFilter = columns;
+ return *this;
+ }
+
+ bool WriterOptions::isColumnUseBloomFilter(uint64_t column) const {
+ return privateBits->columnsUseBloomFilter.find(column) !=
+ privateBits->columnsUseBloomFilter.end();
+ }
+
+ WriterOptions& WriterOptions::setBloomFilterFPP(double fpp) {
+ privateBits->bloomFilterFalsePositiveProb = fpp;
+ return *this;
+ }
+
+ double WriterOptions::getBloomFilterFPP() const {
+ return privateBits->bloomFilterFalsePositiveProb;
+ }
+
+ // delibrately not provide setter to write bloom filter version because
+ // we only support UTF8 for now.
+ BloomFilterVersion WriterOptions::getBloomFilterVersion() const {
+ return privateBits->bloomFilterVersion;
+ }
+
Writer::~Writer() {
// PASS
}
diff --git a/c++/test/CMakeLists.txt b/c++/test/CMakeLists.txt
index db68578..11e8cf4 100644
--- a/c++/test/CMakeLists.txt
+++ b/c++/test/CMakeLists.txt
@@ -26,6 +26,7 @@ add_executable (orc-test
MemoryInputStream.cc
MemoryOutputStream.cc
TestBufferedOutputStream.cc
+ TestBloomFilter.cc
TestByteRle.cc
TestByteRLEEncoder.cc
TestColumnPrinter.cc
diff --git a/c++/test/TestBloomFilter.cc b/c++/test/TestBloomFilter.cc
new file mode 100644
index 0000000..c769f51
--- /dev/null
+++ b/c++/test/TestBloomFilter.cc
@@ -0,0 +1,219 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "BloomFilter.hh"
+#include "orc/OrcFile.hh"
+#include "wrap/gmock.h"
+#include "wrap/gtest-wrapper.h"
+
+namespace orc {
+
+ TEST(TestBloomFilter, testBitSetEqual) {
+ BitSet bitSet64_1(64), bitSet64_2(64), bitSet32(128);
+ EXPECT_TRUE(bitSet64_1 == bitSet64_2);
+ EXPECT_FALSE(bitSet64_1 == bitSet32);
+
+ bitSet64_1.set(6U);
+ bitSet64_1.set(16U);
+ bitSet64_1.set(26U);
+ bitSet64_2.set(6U);
+ bitSet64_2.set(16U);
+ bitSet64_2.set(26U);
+ EXPECT_TRUE(bitSet64_1 == bitSet64_2);
+ EXPECT_EQ(bitSet64_1.get(6U), bitSet64_2.get(6U));
+ EXPECT_EQ(bitSet64_1.get(16U), bitSet64_2.get(16U));
+ EXPECT_EQ(bitSet64_1.get(26U), bitSet64_2.get(26U));
+
+ bitSet64_1.set(36U);
+ bitSet64_2.set(46U);
+ EXPECT_FALSE(bitSet64_1 == bitSet64_2);
+ EXPECT_TRUE(bitSet64_1.get(36U));
+ EXPECT_TRUE(bitSet64_2.get(46U));
+
+ bitSet64_1.clear();
+ bitSet64_2.clear();
+ EXPECT_TRUE(bitSet64_1 == bitSet64_2);
+ }
+
+ // ported from Java ORC
+ TEST(TestBloomFilter, testSetGetBitSet) {
+ BitSet bitset(128);
+
+ // set every 9th bit for a rotating pattern
+ for (uint64_t l = 0; l < 8; ++l) {
+ bitset.set(l * 9);
+ }
+
+ // set every non-9th bit
+ for (uint64_t l = 8; l < 16; ++l) {
+ for(uint64_t b = 0; b < 8; ++b) {
+ if (b != l - 8) {
+ bitset.set(l * 8 + b);
+ }
+ }
+ }
+
+ for(uint64_t b = 0; b < 64; ++b) {
+ EXPECT_EQ(b % 9 == 0, bitset.get(b));
+ }
+
+ for(uint64_t b = 64; b < 128; ++b) {
+ EXPECT_EQ((b % 8) != (b - 64) / 8, bitset.get(b));
+ }
+
+ // test that the longs are mapped correctly
+ const uint64_t * longs = bitset.getData();
+ EXPECT_EQ(128, bitset.bitSize());
+ EXPECT_EQ(0x8040201008040201L, longs[0]);
+ EXPECT_EQ(~0x8040201008040201L, longs[1]);
+ }
+
+ TEST(TestBloomFilter, testBloomFilterBasicOperations) {
+ BloomFilterImpl bloomFilter(128);
+
+ // test integers
+ bloomFilter.reset();
+ EXPECT_FALSE(bloomFilter.testLong(1));
+ EXPECT_FALSE(bloomFilter.testLong(11));
+ EXPECT_FALSE(bloomFilter.testLong(111));
+ EXPECT_FALSE(bloomFilter.testLong(1111));
+ EXPECT_FALSE(bloomFilter.testLong(0));
+ EXPECT_FALSE(bloomFilter.testLong(-1));
+ EXPECT_FALSE(bloomFilter.testLong(-11));
+ EXPECT_FALSE(bloomFilter.testLong(-111));
+ EXPECT_FALSE(bloomFilter.testLong(-1111));
+
+ bloomFilter.addLong(1);
+ bloomFilter.addLong(11);
+ bloomFilter.addLong(111);
+ bloomFilter.addLong(1111);
+ bloomFilter.addLong(0);
+ bloomFilter.addLong(-1);
+ bloomFilter.addLong(-11);
+ bloomFilter.addLong(-111);
+ bloomFilter.addLong(-1111);
+
+ EXPECT_TRUE(bloomFilter.testLong(1));
+ EXPECT_TRUE(bloomFilter.testLong(11));
+ EXPECT_TRUE(bloomFilter.testLong(111));
+ EXPECT_TRUE(bloomFilter.testLong(1111));
+ EXPECT_TRUE(bloomFilter.testLong(0));
+ EXPECT_TRUE(bloomFilter.testLong(-1));
+ EXPECT_TRUE(bloomFilter.testLong(-11));
+ EXPECT_TRUE(bloomFilter.testLong(-111));
+ EXPECT_TRUE(bloomFilter.testLong(-1111));
+
+ // test doubles
+ bloomFilter.reset();
+ EXPECT_FALSE(bloomFilter.testDouble(1.1));
+ EXPECT_FALSE(bloomFilter.testDouble(11.11));
+ EXPECT_FALSE(bloomFilter.testDouble(111.111));
+ EXPECT_FALSE(bloomFilter.testDouble(1111.1111));
+ EXPECT_FALSE(bloomFilter.testDouble(0.0));
+ EXPECT_FALSE(bloomFilter.testDouble(-1.1));
+ EXPECT_FALSE(bloomFilter.testDouble(-11.11));
+ EXPECT_FALSE(bloomFilter.testDouble(-111.111));
+ EXPECT_FALSE(bloomFilter.testDouble(-1111.1111));
+
+ bloomFilter.addDouble(1.1);
+ bloomFilter.addDouble(11.11);
+ bloomFilter.addDouble(111.111);
+ bloomFilter.addDouble(1111.1111);
+ bloomFilter.addDouble(0.0);
+ bloomFilter.addDouble(-1.1);
+ bloomFilter.addDouble(-11.11);
+ bloomFilter.addDouble(-111.111);
+ bloomFilter.addDouble(-1111.1111);
+
+ EXPECT_TRUE(bloomFilter.testDouble(1.1));
+ EXPECT_TRUE(bloomFilter.testDouble(11.11));
+ EXPECT_TRUE(bloomFilter.testDouble(111.111));
+ EXPECT_TRUE(bloomFilter.testDouble(1111.1111));
+ EXPECT_TRUE(bloomFilter.testDouble(0.0));
+ EXPECT_TRUE(bloomFilter.testDouble(-1.1));
+ EXPECT_TRUE(bloomFilter.testDouble(-11.11));
+ EXPECT_TRUE(bloomFilter.testDouble(-111.111));
+ EXPECT_TRUE(bloomFilter.testDouble(-1111.1111));
+
+ // test strings
+ bloomFilter.reset();
+ const char * emptyStr = u8"";
+ const char * enStr = u8"english";
+ const char * cnStr = u8"中国字";
+
+ EXPECT_FALSE(bloomFilter.testBytes(emptyStr,
+ static_cast<int64_t>(strlen(emptyStr))));
+ EXPECT_FALSE(bloomFilter.testBytes(enStr,
+ static_cast<int64_t>(strlen(enStr))));
+ EXPECT_FALSE(bloomFilter.testBytes(cnStr,
+ static_cast<int64_t>(strlen(cnStr))));
+
+ bloomFilter.addBytes(emptyStr, static_cast<int64_t>(strlen(emptyStr)));
+ bloomFilter.addBytes(enStr, static_cast<int64_t>(strlen(enStr)));
+ bloomFilter.addBytes(cnStr, static_cast<int64_t>(strlen(cnStr)));
+
+ EXPECT_TRUE(bloomFilter.testBytes(emptyStr,
+ static_cast<int64_t>(strlen(emptyStr))));
+ EXPECT_TRUE(bloomFilter.testBytes(enStr,
+ static_cast<int64_t>(strlen(enStr))));
+ EXPECT_TRUE(bloomFilter.testBytes(cnStr,
+ static_cast<int64_t>(strlen(cnStr))));
+ }
+
+ TEST(TestBloomFilter, testBloomFilterSerialization) {
+ BloomFilterImpl emptyFilter1(128), emptyFilter2(256);
+ EXPECT_FALSE(emptyFilter1 == emptyFilter2);
+
+ BloomFilterImpl emptyFilter3(128, 0.05), emptyFilter4(128, 0.01);
+ EXPECT_FALSE(emptyFilter3 == emptyFilter4);
+
+ BloomFilterImpl srcBloomFilter(64);
+ srcBloomFilter.addLong(1);
+ srcBloomFilter.addLong(11);
+ srcBloomFilter.addLong(111);
+ srcBloomFilter.addLong(1111);
+ srcBloomFilter.addLong(0);
+ srcBloomFilter.addLong(-1);
+ srcBloomFilter.addLong(-11);
+ srcBloomFilter.addLong(-111);
+ srcBloomFilter.addLong(-1111);
+
+ proto::BloomFilter pbBloomFilter;
+ proto::ColumnEncoding encoding;
+ encoding.set_bloomencoding(1);
+
+ // serialize
+ BloomFilterUTF8Utils::serialize(srcBloomFilter, pbBloomFilter);
+
+ // deserialize
+ std::shared_ptr<BloomFilter> dstBloomFilter = BloomFilterUTF8Utils::deserialize(
+ proto::Stream_Kind_BLOOM_FILTER_UTF8, encoding, pbBloomFilter);
+
+ EXPECT_TRUE(srcBloomFilter == dynamic_cast<BloomFilterImpl&>(*dstBloomFilter));
+ EXPECT_TRUE(dstBloomFilter->testLong(1));
+ EXPECT_TRUE(dstBloomFilter->testLong(11));
+ EXPECT_TRUE(dstBloomFilter->testLong(111));
+ EXPECT_TRUE(dstBloomFilter->testLong(1111));
+ EXPECT_TRUE(dstBloomFilter->testLong(0));
+ EXPECT_TRUE(dstBloomFilter->testLong(-1));
+ EXPECT_TRUE(dstBloomFilter->testLong(-11));
+ EXPECT_TRUE(dstBloomFilter->testLong(-111));
+ EXPECT_TRUE(dstBloomFilter->testLong(-1111));
+ }
+
+}
diff --git a/c++/test/TestWriter.cc b/c++/test/TestWriter.cc
index 12f2ed3..930becc 100644
--- a/c++/test/TestWriter.cc
+++ b/c++/test/TestWriter.cc
@@ -1601,5 +1601,83 @@ namespace orc {
}
}
+ TEST_P(WriterTest, testBloomFilter) {
+ WriterOptions options;
+ options.setStripeSize(1024)
+ .setCompressionBlockSize(64)
+ .setCompression(CompressionKind_ZSTD)
+ .setMemoryPool(getDefaultPool())
+ .setRowIndexStride(10000)
+ .setFileVersion(fileVersion)
+ .setColumnsUseBloomFilter({1, 2});
+
+ // write 65535 rows of data
+ MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE);
+ MemoryPool * pool = getDefaultPool();
+ std::unique_ptr<Type> type(Type::buildTypeFromString(
+ "struct<c1:bigint,c2:string>"));
+
+ char dataBuffer[327675]; // 300k
+ uint64_t offset = 0;
+ uint64_t rowCount = 65535;
+
+ std::unique_ptr<Writer> writer = createWriter(*type, &memStream, options);
+ std::unique_ptr<ColumnVectorBatch> batch = writer->createRowBatch(rowCount);
+ StructVectorBatch& structBatch = dynamic_cast<StructVectorBatch&>(*batch);
+ LongVectorBatch& longBatch = dynamic_cast<LongVectorBatch&>(*structBatch.fields[0]);
+ StringVectorBatch& strBatch = dynamic_cast<StringVectorBatch&>(*structBatch.fields[1]);
+
+ for (uint64_t i = 0; i < rowCount; ++i) {
+ // each row group has a unique value
+ uint64_t data = (i / options.getRowIndexStride());
+
+ // c1
+ longBatch.data[i] = static_cast<int64_t>(data);
+
+ // c2
+ std::ostringstream os;
+ os << data;
+ strBatch.data[i] = dataBuffer + offset;
+ strBatch.length[i] = static_cast<int64_t>(os.str().size());
+ memcpy(dataBuffer + offset, os.str().c_str(), os.str().size());
+ offset += os.str().size();
+ }
+
+ structBatch.numElements = rowCount;
+ longBatch.numElements = rowCount;
+ strBatch.numElements = rowCount;
+ writer->add(*batch);
+ writer->close();
+
+ // verify bloomfilters
+ std::unique_ptr<InputStream> inStream(new MemoryInputStream(
+ memStream.getData(), memStream.getLength()));
+ std::unique_ptr<Reader> reader = createReader(pool, std::move(inStream));
+ EXPECT_EQ(rowCount, reader->getNumberOfRows());
+
+ EXPECT_EQ(2, reader->getBloomFilters(0).size());
+ EXPECT_EQ(1, reader->getBloomFilters(0, {1}).size());
+ EXPECT_EQ(1, reader->getBloomFilters(0, {2}).size());
+
+ std::map<uint32_t, BloomFilterIndex> bfs = reader->getBloomFilters(0, {1, 2});
+ EXPECT_EQ(2, bfs.size());
+ EXPECT_EQ(7, bfs[1].entries.size());
+ EXPECT_EQ(7, bfs[2].entries.size());
+
+ // test bloomfilters
+ for (uint64_t rg = 0; rg <= rowCount / options.getRowIndexStride(); ++rg) {
+ for (uint64_t value = 0; value <= 100; ++value) {
+ std::string str = std::to_string(value);
+ if (value == rg) {
+ EXPECT_TRUE(bfs[1].entries[rg]->testLong(static_cast<int64_t>(value)));
+ EXPECT_TRUE(bfs[2].entries[rg]->testBytes(str.c_str(), static_cast<int64_t>(str.size())));
+ } else {
+ EXPECT_FALSE(bfs[1].entries[rg]->testLong(static_cast<int64_t>(value)));
+ EXPECT_FALSE(bfs[2].entries[rg]->testBytes(str.c_str(), static_cast<int64_t>(str.size())));
+ }
+ }
+ }
+ }
+
INSTANTIATE_TEST_CASE_P(OrcTest, WriterTest, Values(FileVersion::v_0_11(), FileVersion::v_0_12()));
}