You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by bo...@apache.org on 2021/07/14 13:11:50 UTC

[impala] 02/03: IMPALA-10642: Write support for Parquet Bloom filters - most common types

This is an automated email from the ASF dual-hosted git repository.

boroknagyz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit a5de2acc47723fdaee4ebe6d904d16be505b7cfb
Author: Daniel Becker <da...@cloudera.com>
AuthorDate: Thu Mar 18 17:10:49 2021 +0100

    IMPALA-10642: Write support for Parquet Bloom filters - most common types
    
    This change adds support for writing Parquet Bloom filters for the types
    for which read support was added in IMPALA-10640.
    
    Writing of Parquet Bloom filters can be controlled by the
    'parquet_bloom_filter_write' query option and the
    'parquet.bloom.filter.columns' table property. The query option has the
    following possible values:
      NEVER      - never write Parquet Bloom filters
      IF_NO_DICT - write Parquet Bloom filters if specified in the table
                   properties AND if the row group is not fully
                   dictionary encoded (the number of distinct values exceeds
                   the maximum dictionary size)
      ALWAYS     - always write Parquet Bloom filters if specified in the
                   table properties, even if the row group is fully
                   dictionary encoded
    
    The 'parquet.bloom.filter.columns' table property is a comma separated
    list of 'col_name:bytes' pairs. The 'bytes' part means the size of the
    bitset of the Bloom filter, and is optional. If the size is not given,
    it will be the maximal Bloom filter size
    (ParquetBloomFilter::MAX_BYTES).
    Example: "col1:1024,col2,col4:100'.
    
    Testing:
      - Added a test in tests/query_test/test_parquet_bloom_filter.py that
        uses Impala to write the same table as in the test file
        'testdata/data/parquet-bloom-filtering.parquet' and checks whether
        the Parquet Bloom filter header and bitset are identical.
      - 'test_fallback_from_dict' tests falling back from dict encoding to
        plain and using Bloom filters.
      - 'test_fallback_from_dict_if_no_bloom_tbl_props' tests falling back
        from dict encoding to plain when Bloom filters are NOT enabled.
    
    Change-Id: Ie865efd4f0c11b9e111fb94f77d084bf6ee20792
    Reviewed-on: http://gerrit.cloudera.org:8080/17262
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Reviewed-by: Csaba Ringhofer <cs...@cloudera.com>
    Tested-by: Csaba Ringhofer <cs...@cloudera.com>
---
 be/src/exec/hdfs-table-sink.cc                     |   4 +
 be/src/exec/hdfs-table-sink.h                      |   8 +
 be/src/exec/parquet/hdfs-parquet-scanner.cc        |   2 +-
 be/src/exec/parquet/hdfs-parquet-table-writer.cc   | 247 ++++++++++++++++
 be/src/exec/parquet/hdfs-parquet-table-writer.h    |   5 +
 be/src/exec/parquet/parquet-bloom-filter-util.cc   |  10 +
 be/src/exec/parquet/parquet-bloom-filter-util.h    |   3 +
 be/src/service/query-options.cc                    |   7 +
 be/src/service/query-options.h                     |   4 +-
 be/src/util/CMakeLists.txt                         |  26 ++
 be/src/util/debug-util.cc                          |   2 +-
 be/src/util/debug-util.h                           |   1 +
 be/src/util/dict-encoding.h                        |  14 +
 be/src/util/parquet-bloom-filter-avx2.cc           |  92 ++++++
 be/src/util/parquet-bloom-filter-test.cc           |   4 +-
 be/src/util/parquet-bloom-filter.cc                |  85 +++---
 be/src/util/parquet-bloom-filter.h                 |  57 ++--
 common/thrift/DataSinks.thrift                     |   4 +
 common/thrift/ImpalaService.thrift                 |   9 +
 common/thrift/Query.thrift                         |  18 ++
 .../org/apache/impala/planner/HdfsTableSink.java   |  79 ++++++
 .../ParquetBloomFilterTblPropParserTest.java       |  88 ++++++
 tests/query_test/test_parquet_bloom_filter.py      | 310 +++++++++++++++++++--
 23 files changed, 997 insertions(+), 82 deletions(-)

diff --git a/be/src/exec/hdfs-table-sink.cc b/be/src/exec/hdfs-table-sink.cc
index 7d7841f..02099fb 100644
--- a/be/src/exec/hdfs-table-sink.cc
+++ b/be/src/exec/hdfs-table-sink.cc
@@ -104,6 +104,10 @@ HdfsTableSink::HdfsTableSink(TDataSinkId sink_id, const HdfsTableSinkConfig& sin
   if (hdfs_sink.__isset.external_output_partition_depth) {
     external_output_partition_depth_ = hdfs_sink.external_output_partition_depth;
   }
+
+  if (hdfs_sink.__isset.parquet_bloom_filter_col_info) {
+    parquet_bloom_filter_columns_ = hdfs_sink.parquet_bloom_filter_col_info;
+  }
 }
 
 Status HdfsTableSink::Prepare(RuntimeState* state, MemTracker* parent_mem_tracker) {
diff --git a/be/src/exec/hdfs-table-sink.h b/be/src/exec/hdfs-table-sink.h
index 2263531..13d4486 100644
--- a/be/src/exec/hdfs-table-sink.h
+++ b/be/src/exec/hdfs-table-sink.h
@@ -115,6 +115,10 @@ class HdfsTableSink : public DataSink {
   TSortingOrder::type sorting_order() const { return sorting_order_; }
   const HdfsTableDescriptor& TableDesc() { return *table_desc_; }
 
+  const std::map<string, int64_t>& GetParquetBloomFilterColumns() const {
+    return parquet_bloom_filter_columns_;
+  }
+
   RuntimeProfile::Counter* rows_inserted_counter() { return rows_inserted_counter_; }
   RuntimeProfile::Counter* bytes_written_counter() { return bytes_written_counter_; }
   RuntimeProfile::Counter* encode_timer() { return encode_timer_; }
@@ -274,6 +278,10 @@ class HdfsTableSink : public DataSink {
   // sink which directories are pre-created.
   int external_output_partition_depth_ = 0;
 
+  /// Map from column names to Parquet Bloom filter bitset sizes. Columns for which
+  /// Parquet Bloom filtering is not enabled are not listed.
+  std::map<std::string, int64_t> parquet_bloom_filter_columns_;
+
   /// string representation of the unique fragment instance id. Used for per-partition
   /// Hdfs file names, and for tmp Hdfs directories. Set in Prepare();
   std::string unique_id_str_;
diff --git a/be/src/exec/parquet/hdfs-parquet-scanner.cc b/be/src/exec/parquet/hdfs-parquet-scanner.cc
index ff34eee..f804e36 100644
--- a/be/src/exec/parquet/hdfs-parquet-scanner.cc
+++ b/be/src/exec/parquet/hdfs-parquet-scanner.cc
@@ -2065,7 +2065,7 @@ Status HdfsParquetScanner::ProcessBloomFilter(const parquet::RowGroup&
 
       // Construct ParquetBloomFilter instance.
       ParquetBloomFilter bloom_filter;
-      RETURN_IF_ERROR(bloom_filter.Init(data_buffer.buffer(), data_buffer.Size()));
+      RETURN_IF_ERROR(bloom_filter.Init(data_buffer.buffer(), data_buffer.Size(), false));
 
       const uint64_t hash = col_idx_to_hash.second;
       if (!bloom_filter.Find(hash)) {
diff --git a/be/src/exec/parquet/hdfs-parquet-table-writer.cc b/be/src/exec/parquet/hdfs-parquet-table-writer.cc
index 2069f42..01ab616 100644
--- a/be/src/exec/parquet/hdfs-parquet-table-writer.cc
+++ b/be/src/exec/parquet/hdfs-parquet-table-writer.cc
@@ -17,12 +17,14 @@
 
 #include "exec/parquet/hdfs-parquet-table-writer.h"
 
+#include <boost/algorithm/string.hpp>
 #include <boost/unordered_set.hpp>
 
 #include "common/version.h"
 #include "exec/hdfs-table-sink.h"
 #include "exec/parquet/parquet-column-stats.inline.h"
 #include "exec/parquet/parquet-metadata-utils.h"
+#include "exec/parquet/parquet-bloom-filter-util.h"
 #include "exprs/scalar-expr-evaluator.h"
 #include "exprs/scalar-expr.h"
 #include "rpc/thrift-util.h"
@@ -32,6 +34,7 @@
 #include "runtime/raw-value.h"
 #include "runtime/row-batch.h"
 #include "runtime/runtime-state.h"
+#include "runtime/scoped-buffer.h"
 #include "runtime/string-value.inline.h"
 #include "util/bit-stream-utils.h"
 #include "util/bit-util.h"
@@ -40,11 +43,13 @@
 #include "util/debug-util.h"
 #include "util/dict-encoding.h"
 #include "util/hdfs-util.h"
+#include "util/parquet-bloom-filter.h"
 #include "util/pretty-printer.h"
 #include "util/rle-encoding.h"
 #include "util/string-util.h"
 
 #include <sstream>
+#include <string>
 
 #include "gen-cpp/ImpalaService_types.h"
 
@@ -307,6 +312,12 @@ class HdfsParquetTableWriter::BaseColumnWriter {
   // the expression to an int64.
   virtual void* ConvertValue(void* value) { return value; }
 
+  // Some subclasses may write a ParquetBloomFilter, in which case they should override
+  // this method.
+  virtual const ParquetBloomFilter* GetParquetBloomFilter() const {
+    return nullptr;
+  }
+
   // Encodes out all data for the current page and updates the metadata.
   virtual Status FinalizeCurrentPage() WARN_UNUSED_RESULT;
 
@@ -430,9 +441,34 @@ class HdfsParquetTableWriter::ColumnWriter :
       const Codec::CodecInfo& codec_info, const std::string& col_name)
     : BaseColumnWriter(parent, eval, codec_info, col_name),
       num_values_since_dict_size_check_(0),
+      parquet_bloom_filter_bytes_(0),
+      parquet_bloom_filter_buffer_(table_sink_mem_tracker_),
       plain_encoded_value_size_(
           ParquetPlainEncoder::EncodedByteSize(eval->root().type())) {
     DCHECK_NE(eval->root().type().type, TYPE_BOOLEAN);
+
+    const std::map<string, int64_t>& col_to_size =
+      parent->parent_->GetParquetBloomFilterColumns();
+    const auto it = col_to_size.find(column_name());
+
+    if (GetBloomFilterWriteOption() == TParquetBloomFilterWrite::NEVER
+        || it == col_to_size.end()) {
+      // Parquet Bloom filtering is disabled altogether or is not turned on for this
+      // column.
+      parquet_bloom_filter_state_ = ParquetBloomFilterState::DISABLED;
+    } else {
+      // Parquet Bloom filtering is enabled for this column either immediately or if
+      // falling back from dict encoding.
+      parquet_bloom_filter_state_ = ParquetBloomFilterState::UNINITIALIZED;
+
+      // It is the responsibility of the FE to enforce the below constraints.
+      parquet_bloom_filter_bytes_ = it->second;
+      DCHECK_LE(parquet_bloom_filter_bytes_, ParquetBloomFilter::MAX_BYTES);
+      DCHECK_GE(parquet_bloom_filter_bytes_, ParquetBloomFilter::MIN_BYTES);
+      DCHECK(BitUtil::IsPowerOf2(parquet_bloom_filter_bytes_));
+    }
+    parquet_type_ = ParquetMetadataUtils::ConvertInternalToParquetType(type().type,
+        parent_->timestamp_type_);
   }
 
   virtual void Reset() {
@@ -450,6 +486,16 @@ class HdfsParquetTableWriter::ColumnWriter :
     page_stats_base_ = page_stats_.get();
     row_group_stats_.reset(
         new ColumnStats<T>(parent_->per_file_mem_pool_.get(), plain_encoded_value_size_));
+    if (parquet_bloom_filter_state_ != ParquetBloomFilterState::DISABLED) {
+      Status status = InitParquetBloomFilter();
+      if (!status.ok()) {
+        VLOG(google::WARNING)
+            << "Failed to initialise Parquet Bloom filter for column "
+            << column_name() << "."
+            << " Error message: " << status.msg().msg();
+        ReleaseParquetBloomFilterResources();
+      }
+    }
     row_group_stats_base_ = row_group_stats_.get();
   }
 
@@ -467,6 +513,7 @@ class HdfsParquetTableWriter::ColumnWriter :
       // If the dictionary contains the maximum number of values, switch to plain
       // encoding for the next page. The current page is full and must be written out.
       if (UNLIKELY(*bytes_needed < 0)) {
+        FlushDictionaryToParquetBloomFilterIfNeeded();
         next_page_encoding_ = parquet::Encoding::PLAIN;
         return false;
       }
@@ -497,9 +544,15 @@ class HdfsParquetTableWriter::ColumnWriter :
     }
 
     page_stats_->Update(*val);
+    UpdateParquetBloomFilterIfNeeded(val);
+
     return true;
   }
 
+  virtual const ParquetBloomFilter* GetParquetBloomFilter() const {
+    return parquet_bloom_filter_.get();
+  }
+
  private:
   // The period, in # of rows, to check the estimated dictionary page size against
   // the data page size. We want to start a new data page when the estimated size
@@ -525,10 +578,163 @@ class HdfsParquetTableWriter::ColumnWriter :
   // Tracks statistics per row group. This gets reset when starting a new row group.
   scoped_ptr<ColumnStats<T>> row_group_stats_;
 
+
+  enum struct ParquetBloomFilterState {
+    /// Parquet Bloom filtering is turned off either completely or for this column.
+    DISABLED,
+
+    /// The Parquet Bloom filter needs to be initialised before being used.
+    UNINITIALIZED,
+
+    /// The Bloom filter has been initialised but it is not being used as the dictionary
+    /// can hold all elements. If the dictionary becomes full and there are still new
+    /// elements, we fall back from dictionary encoding to plain encoding and start using
+    /// the Bloom filter.
+    WAIT_FOR_FALLBACK_FROM_DICT,
+
+    /// The Parquet Bloom filter is being used.
+    ENABLED,
+
+    /// An error occured with the Parquet Bloom filter so we are not using it.
+    FAILED
+  };
+
+  ParquetBloomFilterState parquet_bloom_filter_state_;
+  uint64_t parquet_bloom_filter_bytes_;
+  ScopedBuffer parquet_bloom_filter_buffer_;
+
+  // The parquet type corresponding to this->type(). Needed by the Parquet Bloom filter.
+  parquet::Type::type parquet_type_;
+
+  // Buffer used when converting values to the form that is used for hashing and insertion
+  // into the ParquetBloomFilter. The conversion function, 'BytesToParquetType' requires a
+  // vector to be able to allocate space if necessary. However, by caching the allocated
+  // buffer here we avoid the overhead of allocation for every conversion - when
+  // 'BytesToParquetType' calls 'resize' on the vector it will already have at least the
+  // desired length in most (or all) cases.
+  //
+  // We prellocate 16 bytes because that is the longest fixed size type (except for fixed
+  // length arrays).
+  std::vector<uint8_t> parquet_bloom_conversion_buffer{16, 0};
+
+  // The ParquetBloomFilter object if one is being written. If
+  // 'ShouldInitParquetBloomFilter()' is false, the combination of the impala type and the
+  // parquet type is not supported or some error occurs during the initialisation of the
+  // ParquetBloomFilter object, it is set to NULL.
+  unique_ptr<ParquetBloomFilter> parquet_bloom_filter_;
+
   // Converts a slot pointer to a raw value suitable for encoding
   inline T* CastValue(void* value) {
     return reinterpret_cast<T*>(value);
   }
+
+  Status InitParquetBloomFilter() WARN_UNUSED_RESULT {
+    DCHECK(parquet_bloom_filter_state_ != ParquetBloomFilterState::DISABLED);
+    const ColumnType& impala_type = type();
+    if (!IsParquetBloomFilterSupported(parquet_type_, impala_type)) {
+      stringstream ss;
+      ss << "Parquet Bloom filtering not supported for parquet type " << parquet_type_
+          << " and impala type " << impala_type << ".";
+      return Status::Expected(ss.str());
+    }
+
+    parquet_bloom_filter_buffer_.Release();
+    if (!parquet_bloom_filter_buffer_.TryAllocate(parquet_bloom_filter_bytes_)) {
+      parquet_bloom_filter_state_ = ParquetBloomFilterState::FAILED;
+      return Status(Substitute("Could not allocate buffer of $0 bytes for Parquet "
+            "Bloom filter data when writing column '$1'.",
+            parquet_bloom_filter_bytes_, column_name()));
+    }
+    std::memset(parquet_bloom_filter_buffer_.buffer(), 0,
+        parquet_bloom_filter_buffer_.Size());
+
+    parquet_bloom_filter_ = make_unique<ParquetBloomFilter>();
+    Status status = parquet_bloom_filter_->Init(parquet_bloom_filter_buffer_.buffer(),
+          parquet_bloom_filter_buffer_.Size(), true);
+
+    if (!status.ok()) {
+      parquet_bloom_filter_state_ = ParquetBloomFilterState::FAILED;
+      return status;
+    } else {
+      const bool should_wait_for_fallback =
+          (GetBloomFilterWriteOption() == TParquetBloomFilterWrite::IF_NO_DICT)
+          && IsDictionaryEncoding(current_encoding_);
+      parquet_bloom_filter_state_ = should_wait_for_fallback
+          ? ParquetBloomFilterState::WAIT_FOR_FALLBACK_FROM_DICT
+          : ParquetBloomFilterState::ENABLED;;
+      return Status::OK();
+    }
+  }
+
+  void UpdateParquetBloomFilterIfNeeded(const void* val) {
+    if (parquet_bloom_filter_state_ == ParquetBloomFilterState::ENABLED) {
+      Status status = UpdateParquetBloomFilter(val);
+      if (!status.ok()) {
+        // If an error happens, for example conversion to the form expected by the Bloom
+        // filter fails, we stop writing the Bloom filter and release resources associated
+        // with it.
+        VLOG(google::WARNING)
+            << "An error happened updating Parquet Bloom filter in column "
+            << column_name() << " at row idx " << parent_->row_idx_ << "."
+            << " Error message: " << status.msg().msg();
+        parquet_bloom_filter_state_ = ParquetBloomFilterState::FAILED;
+        ReleaseParquetBloomFilterResources();
+      }
+    }
+  }
+
+  Status UpdateParquetBloomFilter(const void* val) WARN_UNUSED_RESULT {
+    DCHECK(parquet_bloom_filter_state_ == ParquetBloomFilterState::ENABLED);
+    DCHECK(parquet_bloom_filter_ != nullptr);
+
+    uint8_t* ptr = nullptr;
+    size_t len = -1;
+    const ColumnType& impala_type = type();
+    RETURN_IF_ERROR(BytesToParquetType(val, impala_type, parquet_type_,
+        &parquet_bloom_conversion_buffer, &ptr, &len));
+    DCHECK(ptr != nullptr);
+    DCHECK(len != -1);
+    parquet_bloom_filter_->HashAndInsert(ptr, len);
+
+    return Status::OK();
+  }
+
+  void ReleaseParquetBloomFilterResources() {
+    parquet_bloom_filter_ = nullptr;
+    parquet_bloom_filter_buffer_.Release();
+  }
+
+  void FlushDictionaryToParquetBloomFilterIfNeeded() {
+    if (parquet_bloom_filter_state_
+        == ParquetBloomFilterState::WAIT_FOR_FALLBACK_FROM_DICT) {
+      parquet_bloom_filter_state_ = ParquetBloomFilterState::ENABLED;
+
+      // Write dictionary keys to Parquet Bloom filter if we haven't been filling it so
+      // far (and Bloom filtering is enabled). If there are too many values for a
+      // dictionary, a Bloom filter may still be useful.
+      Status status = DictKeysToParquetBloomFilter();
+      if (!status.ok()) {
+        VLOG(google::WARNING)
+            << "Failed to add dictionary keys to Parquet Bloom filter for column "
+            << column_name()
+            << " when falling back from dictionary encoding to plain encoding."
+            << " Error message: " << status.msg().msg();
+        parquet_bloom_filter_state_ = ParquetBloomFilterState::FAILED;
+        ReleaseParquetBloomFilterResources();
+      }
+    }
+  }
+
+  Status DictKeysToParquetBloomFilter() {
+    return dict_encoder_->ForEachDictKey([this](const T& value) {
+        return UpdateParquetBloomFilter(&value);
+        });
+  }
+
+  TParquetBloomFilterWrite::type GetBloomFilterWriteOption() {
+    return parent_->state_->query_options().parquet_bloom_filter_write;
+  }
+
  protected:
   // Size of each encoded value in plain encoding. -1 if the type is variable-length.
   int64_t plain_encoded_value_size_;
@@ -1386,6 +1592,43 @@ Status HdfsParquetTableWriter::WriteFileHeader() {
   return Status::OK();
 }
 
+Status HdfsParquetTableWriter::WriteParquetBloomFilter(BaseColumnWriter* col_writer,
+    parquet::ColumnMetaData* meta_data) {
+  DCHECK(col_writer != nullptr);
+  DCHECK(meta_data != nullptr);
+
+  const ParquetBloomFilter* bloom_filter = col_writer->GetParquetBloomFilter();
+  if (bloom_filter == nullptr || bloom_filter->AlwaysFalse()) {
+    // If there is no Bloom filter for this column or if it is empty we don't need to do
+    // anything.
+    // If bloom_filter->AlwaysFalse() is true, it means the Bloom filter was initialised
+    // but no element was inserted, probably because we have not fallen back to plain
+    // encoding from dictionary encoding.
+    return Status::OK();
+  }
+
+  // Update metadata.
+  meta_data->__set_bloom_filter_offset(file_pos_);
+
+  // Write the header to the file.
+  parquet::BloomFilterHeader header = CreateBloomFilterHeader(*bloom_filter);
+  uint8_t* buffer = nullptr;
+  uint32_t len = 0;
+  RETURN_IF_ERROR(thrift_serializer_->SerializeToBuffer(&header, &len, &buffer));
+  DCHECK(buffer != nullptr);
+  DCHECK_GT(len, 0);
+  RETURN_IF_ERROR(Write(buffer, len));
+  file_pos_ += len;
+
+  // Write the Bloom filter directory (bitset) to the file.
+  const uint8_t* directory = bloom_filter->directory();
+  const int64_t directory_size = bloom_filter->directory_size();
+  RETURN_IF_ERROR(Write(directory, directory_size));
+  file_pos_ += directory_size;
+
+  return Status::OK();
+}
+
 Status HdfsParquetTableWriter::FlushCurrentRowGroup() {
   if (current_row_group_ == nullptr) return Status::OK();
 
@@ -1442,6 +1685,10 @@ Status HdfsParquetTableWriter::FlushCurrentRowGroup() {
     // Build column statistics and add them to the header.
     col_writer->EncodeRowGroupStats(&current_row_group_->columns[i].meta_data);
 
+    // Write Bloom filter and update metadata.
+    RETURN_IF_ERROR(WriteParquetBloomFilter(col_writer,
+        &current_row_group_->columns[i].meta_data));
+
     // Since we don't supported complex schemas, all columns should have the same
     // number of values.
     DCHECK_EQ(current_row_group_->columns[0].meta_data.num_values,
diff --git a/be/src/exec/parquet/hdfs-parquet-table-writer.h b/be/src/exec/parquet/hdfs-parquet-table-writer.h
index aadad1f..cdc6c08 100644
--- a/be/src/exec/parquet/hdfs-parquet-table-writer.h
+++ b/be/src/exec/parquet/hdfs-parquet-table-writer.h
@@ -149,6 +149,11 @@ class HdfsParquetTableWriter : public HdfsTableWriter {
   /// Writes the file metadata and footer.
   Status WriteFileFooter();
 
+  /// Writes the ParquetBloomFilter of 'col_writer' if it has one, including the header,
+  /// and updates '*meta_data'.
+  Status WriteParquetBloomFilter(BaseColumnWriter* col_writer,
+      parquet::ColumnMetaData* meta_data) WARN_UNUSED_RESULT;
+
   /// Flushes the current row group to file.  This will compute the final
   /// offsets of column chunks, updating the file metadata.
   Status FlushCurrentRowGroup();
diff --git a/be/src/exec/parquet/parquet-bloom-filter-util.cc b/be/src/exec/parquet/parquet-bloom-filter-util.cc
index 08893c2..002c98f 100644
--- a/be/src/exec/parquet/parquet-bloom-filter-util.cc
+++ b/be/src/exec/parquet/parquet-bloom-filter-util.cc
@@ -216,4 +216,14 @@ Status LiteralToParquetType(const Literal& literal, ScalarExprEvaluator* eval,
   return BytesToParquetType(value, literal.type(), parquet_type, storage, ptr, len);
 }
 
+parquet::BloomFilterHeader CreateBloomFilterHeader(const ParquetBloomFilter& bloom_filter)
+{
+  parquet::BloomFilterHeader header;
+  header.algorithm.__set_BLOCK(parquet::SplitBlockAlgorithm());
+  header.hash.__set_XXHASH(parquet::XxHash());
+  header.compression.__set_UNCOMPRESSED(parquet::Uncompressed());
+  header.__set_numBytes(bloom_filter.directory_size());
+  return header;
+}
+
 } // namespace impala
diff --git a/be/src/exec/parquet/parquet-bloom-filter-util.h b/be/src/exec/parquet/parquet-bloom-filter-util.h
index 911be1b..a3f4c84 100644
--- a/be/src/exec/parquet/parquet-bloom-filter-util.h
+++ b/be/src/exec/parquet/parquet-bloom-filter-util.h
@@ -57,5 +57,8 @@ Status LiteralToParquetType(const Literal& literal, ScalarExprEvaluator* eval,
     const parquet::Type::type& parquet_type, vector<uint8_t>* storage,
     uint8_t** ptr, size_t* len) WARN_UNUSED_RESULT;
 
+/// Creates a 'parquet::BloomFilterHeader' object based on 'bloom_filter'.
+parquet::BloomFilterHeader CreateBloomFilterHeader(
+    const ParquetBloomFilter& bloom_filter);
 
 } // namespace impala
diff --git a/be/src/service/query-options.cc b/be/src/service/query-options.cc
index 5ba518a..e92e81f 100644
--- a/be/src/service/query-options.cc
+++ b/be/src/service/query-options.cc
@@ -514,6 +514,13 @@ Status impala::SetQueryOption(const string& key, const string& value,
         query_options->__set_parquet_bloom_filtering(IsTrue(value));
         break;
       }
+      case TImpalaQueryOptions::PARQUET_BLOOM_FILTER_WRITE: {
+        TParquetBloomFilterWrite::type enum_type;
+        RETURN_IF_ERROR(GetThriftEnum(value, "Parquet Bloom filter write",
+           _TParquetBloomFilterWrite_VALUES_TO_NAMES, &enum_type));
+        query_options->__set_parquet_bloom_filter_write(enum_type);
+        break;
+      }
       case TImpalaQueryOptions::PARQUET_READ_STATISTICS: {
         query_options->__set_parquet_read_statistics(IsTrue(value));
         break;
diff --git a/be/src/service/query-options.h b/be/src/service/query-options.h
index 943e906..65113cb 100644
--- a/be/src/service/query-options.h
+++ b/be/src/service/query-options.h
@@ -47,7 +47,7 @@ typedef std::unordered_map<string, beeswax::TQueryOptionLevel::type>
 // time we add or remove a query option to/from the enum TImpalaQueryOptions.
 #define QUERY_OPTS_TABLE\
   DCHECK_EQ(_TImpalaQueryOptions_VALUES_TO_NAMES.size(),\
-      TImpalaQueryOptions::MINMAX_FILTER_PARTITION_COLUMNS + 1);\
+      TImpalaQueryOptions::PARQUET_BLOOM_FILTER_WRITE + 1);\
   REMOVED_QUERY_OPT_FN(abort_on_default_limit_exceeded, ABORT_ON_DEFAULT_LIMIT_EXCEEDED)\
   QUERY_OPT_FN(abort_on_error, ABORT_ON_ERROR, TQueryOptionLevel::REGULAR)\
   REMOVED_QUERY_OPT_FN(allow_unsupported_formats, ALLOW_UNSUPPORTED_FORMATS)\
@@ -257,6 +257,8 @@ typedef std::unordered_map<string, beeswax::TQueryOptionLevel::type>
       TQueryOptionLevel::DEVELOPMENT)\
   QUERY_OPT_FN(minmax_filter_partition_columns, MINMAX_FILTER_PARTITION_COLUMNS,\
       TQueryOptionLevel::ADVANCED)\
+  QUERY_OPT_FN(parquet_bloom_filter_write, PARQUET_BLOOM_FILTER_WRITE,\
+      TQueryOptionLevel::ADVANCED)\
   ;
 
 /// Enforce practical limits on some query options to avoid undesired query state.
diff --git a/be/src/util/CMakeLists.txt b/be/src/util/CMakeLists.txt
index 168112a..6c7643f 100644
--- a/be/src/util/CMakeLists.txt
+++ b/be/src/util/CMakeLists.txt
@@ -110,6 +110,32 @@ set(UTIL_SRCS
   ${MUSTACHE_SRC_DIR}/mustache.cc
   ${MPFIT_SRC_DIR}/mpfit.c)
 
+# Detect AVX2 support
+set(AVX2_CMD "echo | ${CMAKE_CXX_COMPILER} -mavx2 -dM -E - | awk '$2 == \"__AVX2__\" { print $3 }'")
+execute_process(
+  COMMAND bash -c ${AVX2_CMD}
+  OUTPUT_VARIABLE AVX2_SUPPORT
+  OUTPUT_STRIP_TRAILING_WHITESPACE
+)
+
+# parquet-bloom-filter-avx2.cc uses AVX2 operations.
+if (AVX2_SUPPORT)
+  list(APPEND UTIL_SRCS parquet-bloom-filter-avx2.cc)
+
+  set_source_files_properties(parquet-bloom-filter-avx2.cc PROPERTIES COMPILE_FLAGS "-mavx2")
+  # parquet-bloom-filter-avx2.cc is not compiled explicitly with AVX2
+  # instructions(-mavx2) but it needs to know at compile time whether AVX2 support is
+  # available, hence the custom definition instead of relying on __AVX2__ defined by
+  # compiler with -mavx2.
+  # This is beause it derives from Kudu code at
+  # be/src/kudu/util/block_bloom_filter_avx2.cc.
+  set_source_files_properties(parquet-bloom-filter-avx2.cc parquet-bloom-filter.cc
+                              PROPERTIES COMPILE_DEFINITIONS "USE_AVX2=1")
+  message("Compiler supports AVX2")
+else()
+  message("Compiler does not support AVX2")
+endif()
+
 add_library(Util ${UTIL_SRCS})
 add_dependencies(Util gen-deps gen_ir_descriptions)
 
diff --git a/be/src/util/debug-util.cc b/be/src/util/debug-util.cc
index 8dd6814..2c3f5be 100644
--- a/be/src/util/debug-util.cc
+++ b/be/src/util/debug-util.cc
@@ -105,7 +105,7 @@ PRINT_THRIFT_ENUM_IMPL(TEnabledRuntimeFilterTypes)
 PRINT_THRIFT_ENUM_IMPL(TMinmaxFilteringLevel)
 PRINT_THRIFT_ENUM_IMPL(TKuduReplicaSelection)
 PRINT_THRIFT_ENUM_IMPL(TMinmaxFilterFastCodePathMode)
-
+PRINT_THRIFT_ENUM_IMPL(TParquetBloomFilterWrite)
 
 string PrintId(const TUniqueId& id, const string& separator) {
   stringstream out;
diff --git a/be/src/util/debug-util.h b/be/src/util/debug-util.h
index 2cd0ae3..edec713 100644
--- a/be/src/util/debug-util.h
+++ b/be/src/util/debug-util.h
@@ -83,6 +83,7 @@ std::string PrintThriftEnum(const TEnabledRuntimeFilterTypes::type& value);
 std::string PrintThriftEnum(const TMinmaxFilteringLevel::type& value);
 std::string PrintThriftEnum(const TKuduReplicaSelection::type& value);
 std::string PrintThriftEnum(const TMinmaxFilterFastCodePathMode::type& value);
+std::string PrintThriftEnum(const TParquetBloomFilterWrite::type& value);
 
 std::string PrintTuple(const Tuple* t, const TupleDescriptor& d);
 std::string PrintRow(TupleRow* row, const RowDescriptor& d);
diff --git a/be/src/util/dict-encoding.h b/be/src/util/dict-encoding.h
index e6e01bc..3a33801 100644
--- a/be/src/util/dict-encoding.h
+++ b/be/src/util/dict-encoding.h
@@ -188,6 +188,20 @@ class DictEncoder : public DictEncoderBase {
 
   virtual int num_entries() const { return nodes_.size(); }
 
+  /// Execute 'func' for each key that is present in the dictionary. Stops execution the
+  /// first time 'func' returns an error, propagating the error. Returns OK otherwise.
+  ///
+  /// Can be useful if we fall back to plain encoding from dict encoding but still want to
+  /// use a Bloom filter. In this case the filter can be filled with all elements that
+  /// have occured so far.
+  Status ForEachDictKey(const std::function<Status(const T&)>& func) {
+    for (auto pair : nodes_) {
+      RETURN_IF_ERROR(func(pair.value));
+    }
+
+    return Status::OK();
+  }
+
  private:
   /// Size of the table. Must be a power of 2.
   enum { HASH_TABLE_SIZE = 1 << 16 };
diff --git a/be/src/util/parquet-bloom-filter-avx2.cc b/be/src/util/parquet-bloom-filter-avx2.cc
new file mode 100644
index 0000000..52fb978
--- /dev/null
+++ b/be/src/util/parquet-bloom-filter-avx2.cc
@@ -0,0 +1,92 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// This file is partially a copy of Kudu BlockBloomFilter code. We wanted to reuse the
+// existing implementation but also extend/modify some parts. This would not have been
+// possible without modifying the Kudu source code in Impala
+// (be/src/kudu/util/block_bloom_filter*). On the other hand, we have to maintain binary
+// compatibility between the the Kudu code in Impala and actual Kudu code, so we decided
+// against modifying the code in be/src/kudu/util/block_bloom_filter*.
+
+// This file is conditionally compiled if compiler supports AVX2. However the tidy bot
+// appears to compile this file regardless and does not define the USE_AVX2 macro raising
+// incorrect errors.
+
+// This file is partially a copy of Kudu BlockBloomFilter code. We wanted to reuse the
+// existing implementation but also extend/modify some parts. This would not have been
+// possible without modifying the Kudu source code in Impala
+// (be/src/kudu/util/block_bloom_filter*). On the other hand, we have to maintain binary
+// compatibility between the the Kudu code in Impala and actual Kudu code, so we decided
+// against modifying the code in be/src/kudu/util/block_bloom_filter*.
+#if defined(CLANG_TIDY)
+#define USE_AVX2 1
+#endif
+
+#include "util/parquet-bloom-filter.h"
+
+#include <immintrin.h>
+
+#include <cstddef>
+#include <cstdint>
+#include <ostream>
+
+#include <glog/logging.h>
+
+#include "kudu/gutil/port.h"
+
+namespace impala {
+
+// A static helper function for the AVX2 methods. Turns a 32-bit hash into a 256-bit
+// Bucket with 1 single 1-bit set in each 32-bit lane.
+static inline ATTRIBUTE_ALWAYS_INLINE __attribute__((__target__("avx2"))) __m256i
+MakeMask(const uint32_t hash) {
+  const __m256i ones = _mm256_set1_epi32(1);
+  const __m256i rehash = _mm256_setr_epi32(BLOOM_HASH_CONSTANTS);
+  // Load hash into a YMM register, repeated eight times
+  __m256i hash_data = _mm256_set1_epi32(hash);
+  // Multiply-shift hashing ala Dietzfelbinger et al.: multiply 'hash' by eight different
+  // odd constants, then keep the 5 most significant bits from each product.
+  hash_data = _mm256_mullo_epi32(rehash, hash_data);
+  hash_data = _mm256_srli_epi32(hash_data, 27);
+  // Use these 5 bits to shift a single bit to a location in each 32-bit lane
+  return _mm256_sllv_epi32(ones, hash_data);
+}
+
+void ParquetBloomFilter::BucketInsertAVX2(const uint32_t bucket_idx,
+    const uint32_t hash) noexcept {
+  const __m256i mask = MakeMask(hash);
+  __m256i* const bucket = &(reinterpret_cast<__m256i*>(directory_)[bucket_idx]);
+  _mm256_store_si256(bucket, _mm256_or_si256(*bucket, mask));
+  // For SSE compatibility, unset the high bits of each YMM register so SSE instructions
+  // dont have to save them off before using XMM registers.
+  _mm256_zeroupper();
+}
+
+bool ParquetBloomFilter::BucketFindAVX2(const uint32_t bucket_idx,
+    const uint32_t hash) const noexcept {
+  const __m256i mask = MakeMask(hash);
+  const __m256i bucket = reinterpret_cast<__m256i*>(directory_)[bucket_idx];
+  // We should return true if 'bucket' has a one wherever 'mask' does. _mm256_testc_si256
+  // takes the negation of its first argument and ands that with its second argument. In
+  // our case, the result is zero everywhere iff there is a one in 'bucket' wherever
+  // 'mask' is one. testc returns 1 if the result is 0 everywhere and returns 0 otherwise.
+  const bool result = _mm256_testc_si256(bucket, mask);
+  _mm256_zeroupper();
+  return result;
+}
+
+} // namespace kudu
diff --git a/be/src/util/parquet-bloom-filter-test.cc b/be/src/util/parquet-bloom-filter-test.cc
index 96570b1..7ae3449 100644
--- a/be/src/util/parquet-bloom-filter-test.cc
+++ b/be/src/util/parquet-bloom-filter-test.cc
@@ -67,7 +67,7 @@ BloomWrapper CreateBloomFilter(int ndv, double fpp) {
   BloomWrapper res;
   res.bloom = std::make_unique<ParquetBloomFilter>();
   res.storage = std::make_unique<vector<uint8_t>>(storage_size, 0);
-  Status status = res.bloom->Init(res.storage->data(), storage_size);
+  Status status = res.bloom->Init(res.storage->data(), storage_size, true);
   EXPECT_TRUE(status.ok()) << status.GetDetail();
   return res;
 }
@@ -113,7 +113,7 @@ TEST(ParquetBloomFilter, Find) {
   }
 }
 
-TEST(ParquetBloomFilter, TestHashAndFind) {
+TEST(ParquetBloomFilter, HashAndFind) {
   srand(0);
   for (int ndv = 100; ndv <= 100000; ndv *= 10) {
     BloomWrapper wrapper = CreateBloomFilter(ndv, 0.01);
diff --git a/be/src/util/parquet-bloom-filter.cc b/be/src/util/parquet-bloom-filter.cc
index 93116b9..d1f9366 100644
--- a/be/src/util/parquet-bloom-filter.cc
+++ b/be/src/util/parquet-bloom-filter.cc
@@ -41,6 +41,13 @@
 using namespace std;
 using strings::Substitute;
 
+// TODO: Reconcile with legacy AVX support.
+DEFINE_bool(disable_parquetbloomfilter_avx2, false,
+    "Disable AVX2 operations in ParquetBloomFilter. This flag has no effect if the "
+    "target CPU doesn't support AVX2 at run-time or ParquetBloomFilter was built with "
+    "a compiler that doesn't support AVX2.");
+DECLARE_bool(enable_legacy_avx_support);
+
 namespace impala {
 
 // This is needed to avoid undefined reference errors.
@@ -51,12 +58,28 @@ constexpr uint32_t ParquetBloomFilter::SALT[8] __attribute__((aligned(32)));
 ParquetBloomFilter::ParquetBloomFilter() :
   log_num_buckets_(0),
   directory_mask_(0),
-  directory_(nullptr) {
+  directory_(nullptr),
+  always_false_(false) {
+#ifdef USE_AVX2
+  if (has_avx2()) {
+    bucket_insert_func_ptr_ = &ParquetBloomFilter::BucketInsertAVX2;
+    bucket_find_func_ptr_ = &ParquetBloomFilter::BucketFindAVX2;
+  } else {
+    bucket_insert_func_ptr_ = &ParquetBloomFilter::BucketInsert;
+    bucket_find_func_ptr_ = &ParquetBloomFilter::BucketFind;
+  }
+#else
+  bucket_insert_func_ptr_ = &ParquetBloomFilter::BucketInsert;
+  bucket_find_func_ptr_ = &ParquetBloomFilter::BucketFind;
+#endif
+
+  DCHECK(bucket_insert_func_ptr_);
+  DCHECK(bucket_find_func_ptr_);
 }
 
 ParquetBloomFilter::~ParquetBloomFilter() {}
 
-Status ParquetBloomFilter::Init(uint8_t* directory, size_t dir_size) {
+Status ParquetBloomFilter::Init(uint8_t* directory, size_t dir_size, bool always_false) {
   const int log_space_bytes = std::log2(dir_size);
   DCHECK_EQ(1ULL << log_space_bytes, dir_size);
 
@@ -70,8 +93,16 @@ Status ParquetBloomFilter::Init(uint8_t* directory, size_t dir_size) {
           log_space_bytes));
   }
   DCHECK_EQ(directory_size(), dir_size);
+  DCHECK(directory != nullptr);
   directory_ = reinterpret_cast<Bucket*>(directory);
 
+  if (always_false) {
+    // Check the assumption that the directory is empty.
+    DCHECK(std::all_of(directory, directory + dir_size,
+          [](uint8_t byte) { return byte == 0; }));
+    always_false_ = true;
+  }
+
   // Don't use log_num_buckets_ if it will lead to undefined behavior by a shift
   // that is too large.
   directory_mask_ = (1ULL << log_num_buckets_) - 1;
@@ -79,9 +110,11 @@ Status ParquetBloomFilter::Init(uint8_t* directory, size_t dir_size) {
 }
 
 void ParquetBloomFilter::Insert(const uint64_t hash) noexcept {
+  always_false_ = false;
   uint32_t idx = DetermineBucketIdx(hash);
   uint32_t hash_lower = hash;
-  BucketInsert(idx, hash_lower);
+  DCHECK(bucket_insert_func_ptr_);
+  (this->*bucket_insert_func_ptr_)(idx, hash_lower);
 }
 
 void ParquetBloomFilter::HashAndInsert(const uint8_t* input, size_t size) noexcept {
@@ -90,9 +123,11 @@ void ParquetBloomFilter::HashAndInsert(const uint8_t* input, size_t size) noexce
 }
 
 bool ParquetBloomFilter::Find(const uint64_t hash) const noexcept {
+  if (always_false_) return false;
   uint32_t idx = DetermineBucketIdx(hash);
   uint32_t hash_lower = hash;
-  return BucketFind(idx, hash_lower);
+  DCHECK(bucket_find_func_ptr_);
+  return (this->*bucket_find_func_ptr_)(idx, hash_lower);
 }
 
 bool ParquetBloomFilter::HashAndFind(const uint8_t* input, size_t size) const noexcept {
@@ -135,7 +170,6 @@ uint64_t ParquetBloomFilter::Hash(const uint8_t* input, size_t size) {
   return hash;
 }
 
-#ifdef __aarch64__
 ATTRIBUTE_NO_SANITIZE_INTEGER
 void ParquetBloomFilter::BucketInsert(const uint32_t bucket_idx,
     const uint32_t hash) noexcept {
@@ -170,45 +204,10 @@ bool ParquetBloomFilter::BucketFind(
   }
   return true;
 }
-#else
-// A static helper function for the AVX2 methods. Turns a 32-bit hash into a 256-bit
-// Bucket with 1 single 1-bit set in each 32-bit lane.
-static inline ATTRIBUTE_ALWAYS_INLINE __attribute__((__target__("avx2"))) __m256i
-MakeMask(const uint32_t hash) {
-  const __m256i ones = _mm256_set1_epi32(1);
-  const __m256i rehash = _mm256_setr_epi32(BLOOM_HASH_CONSTANTS);
-  // Load hash into a YMM register, repeated eight times
-  __m256i hash_data = _mm256_set1_epi32(hash);
-  // Multiply-shift hashing ala Dietzfelbinger et al.: multiply 'hash' by eight different
-  // odd constants, then keep the 5 most significant bits from each product.
-  hash_data = _mm256_mullo_epi32(rehash, hash_data);
-  hash_data = _mm256_srli_epi32(hash_data, 27);
-  // Use these 5 bits to shift a single bit to a location in each 32-bit lane
-  return _mm256_sllv_epi32(ones, hash_data);
-}
-
-void ParquetBloomFilter::BucketInsert(const uint32_t bucket_idx,
-    const uint32_t hash) noexcept {
-  const __m256i mask = MakeMask(hash);
-  __m256i* const bucket = &(reinterpret_cast<__m256i*>(directory_)[bucket_idx]);
-  _mm256_store_si256(bucket, _mm256_or_si256(*bucket, mask));
-  // For SSE compatibility, unset the high bits of each YMM register so SSE instructions
-  // dont have to save them off before using XMM registers.
-  _mm256_zeroupper();
-}
 
-bool ParquetBloomFilter::BucketFind(const uint32_t bucket_idx,
-    const uint32_t hash) const noexcept {
-  const __m256i mask = MakeMask(hash);
-  const __m256i bucket = reinterpret_cast<__m256i*>(directory_)[bucket_idx];
-  // We should return true if 'bucket' has a one wherever 'mask' does. _mm256_testc_si256
-  // takes the negation of its first argument and ands that with its second argument. In
-  // our case, the result is zero everywhere iff there is a one in 'bucket' wherever
-  // 'mask' is one. testc returns 1 if the result is 0 everywhere and returns 0 otherwise.
-  const bool result = _mm256_testc_si256(bucket, mask);
-  _mm256_zeroupper();
-  return result;
+bool ParquetBloomFilter::has_avx2() {
+  return !FLAGS_disable_parquetbloomfilter_avx2 && !FLAGS_enable_legacy_avx_support
+      && CpuInfo::IsSupported(CpuInfo::AVX2);
 }
-#endif // #ifdef __aarch64__
 
 } // namespace impala
diff --git a/be/src/util/parquet-bloom-filter.h b/be/src/util/parquet-bloom-filter.h
index dfc6e91..c3404d0 100644
--- a/be/src/util/parquet-bloom-filter.h
+++ b/be/src/util/parquet-bloom-filter.h
@@ -38,7 +38,10 @@ class ParquetBloomFilter {
   /// Initialises the directory (bitset) of the Bloom filter. The data is not copied and
   /// is not owned by this object. The buffer must be valid as long as this object uses
   /// it.
-  Status Init(uint8_t* directory, size_t dir_size);
+  /// If 'always_false_' is true, the implementation assumes that the directory is empty.
+  /// If the directory contains any bytes other than zero, 'always_false_' should be
+  /// false.
+  Status Init(uint8_t* directory, size_t dir_size, bool always_false);
 
   void Insert(const uint64_t hash) noexcept;
   void HashAndInsert(const uint8_t* input, size_t size) noexcept;
@@ -48,6 +51,19 @@ class ParquetBloomFilter {
   bool Find(const uint64_t hash) const noexcept;
   bool HashAndFind(const uint8_t* input, size_t size) const noexcept;
 
+  const uint8_t* directory() const {
+    return reinterpret_cast<const uint8_t*>(directory_);
+  }
+
+  // Size of the internal directory structure in bytes.
+  int64_t directory_size() const {
+    return 1ULL << log_space_bytes();
+  }
+
+  bool AlwaysFalse() const {
+    return always_false_;
+  }
+
   static int OptimalByteSize(const size_t ndv, const double fpp);
 
   // If we expect to fill a Bloom filter with 'ndv' different unique elements and we
@@ -95,6 +111,9 @@ class ParquetBloomFilter {
   static constexpr uint32_t SALT[8]
       __attribute__((aligned(32))) = {BLOOM_HASH_CONSTANTS};
 
+  // Detect at run-time whether CPU supports AVX2
+  static bool has_avx2();
+
   // log_num_buckets_ is the log (base 2) of the number of buckets in the directory.
   int log_num_buckets_;
 
@@ -104,30 +123,36 @@ class ParquetBloomFilter {
 
   Bucket* directory_;
 
-/// AVX2 is not available on ARM so we have to implement the functions differently.
-#ifdef __aarch64__
+  // Indicates whether the Bloom filter is empty and therefore all *Find* calls will
+  // return false without further checks.
+  bool always_false_;
+
   // Does the actual work of Insert(). bucket_idx is the index of the bucket to insert
   // into and 'hash' is the value passed to Insert().
-  void BucketInsert(const uint32_t bucket_idx, const uint32_t hash) noexcept;
-  bool BucketFind(const uint32_t bucket_idx, const uint32_t hash) const noexcept;
-#else
-  // Same as above but using AVX2.
-  void BucketInsert(const uint32_t bucket_idx, const uint32_t hash) noexcept
-    __attribute__((__target__("avx2")));
-  bool BucketFind(const uint32_t bucket_idx, const uint32_t hash) const noexcept
-    __attribute__((__target__("avx2")));
+  void BucketInsert(uint32_t bucket_idx, uint32_t hash) noexcept;
+
+  bool BucketFind(uint32_t bucket_idx, uint32_t hash) const noexcept;
+
+#ifdef USE_AVX2
+  // A faster SIMD version of BucketInsert().
+  void BucketInsertAVX2(const uint32_t bucket_idx, const uint32_t hash) noexcept
+      __attribute__((__target__("avx2")));
+
+  // A faster SIMD version of BucketFind().
+  bool BucketFindAVX2(const uint32_t bucket_idx, const uint32_t hash) const noexcept
+      __attribute__((__target__("avx2")));
 #endif
 
+  // Function pointers initialized in the constructor to avoid run-time cost in hot-path
+  // of Find and Insert operations.
+  decltype(&ParquetBloomFilter::BucketInsert) bucket_insert_func_ptr_;
+  decltype(&ParquetBloomFilter::BucketFind) bucket_find_func_ptr_;
+
   // Returns amount of space used in log2 bytes.
   int log_space_bytes() const {
     return log_num_buckets_ + kLogBucketByteSize;
   }
 
-  // Size of the internal directory structure in bytes.
-  int64_t directory_size() const {
-    return 1ULL << log_space_bytes();
-  }
-
   uint32_t DetermineBucketIdx(const uint64_t hash) const noexcept {
     const uint64_t hash_top_bits = hash >> 32;
     const uint64_t num_buckets = 1ULL << log_num_buckets_;
diff --git a/common/thrift/DataSinks.thrift b/common/thrift/DataSinks.thrift
index 1a9cdfe..03f6a69 100644
--- a/common/thrift/DataSinks.thrift
+++ b/common/thrift/DataSinks.thrift
@@ -97,6 +97,10 @@ struct THdfsTableSink {
   // Indicates how deep into the partition specification in which to start creating
   // partition directories
   10: optional i32 external_output_partition_depth;
+
+  // Mapping from column names to Parquet Bloom filter bitset sizes. Columns for which no
+  // Parquet Bloom filter should be written should not be listed here.
+  11: optional map<string, i64> parquet_bloom_filter_col_info;
 }
 
 // Structure to encapsulate specific options that are passed down to the KuduTableSink
diff --git a/common/thrift/ImpalaService.thrift b/common/thrift/ImpalaService.thrift
index 94109e0..ad0d1fc 100644
--- a/common/thrift/ImpalaService.thrift
+++ b/common/thrift/ImpalaService.thrift
@@ -684,6 +684,15 @@ enum TImpalaQueryOptions {
 
   // Indicates whether to use min/max filtering on partition columns
   MINMAX_FILTER_PARTITION_COLUMNS = 133
+
+  // Controls when to write Parquet Bloom filters.
+  //     NEVER      - never write Parquet Bloom filters
+  //     TBL_PROPS  - write Parquet Bloom filters as set in table properties
+  //     IF_NO_DICT - write Parquet Bloom filters if the row group is not fully
+  //                  dictionary encoded
+  //     ALWAYS     - always write Parquet Bloom filters, even if the row group is fully
+  //                  dictionary encoded
+  PARQUET_BLOOM_FILTER_WRITE = 134
 }
 
 // The summary of a DML statement.
diff --git a/common/thrift/Query.thrift b/common/thrift/Query.thrift
index d061578..f5c3cd0 100644
--- a/common/thrift/Query.thrift
+++ b/common/thrift/Query.thrift
@@ -85,6 +85,20 @@ enum TMinmaxFilterFastCodePathMode {
   VERIFICATION=2
 }
 
+// Options for when to write Parquet Bloom filters for supported types.
+enum TParquetBloomFilterWrite {
+  // Never write Parquet Bloom filters.
+  NEVER,
+
+  // Write Parquet Bloom filters if specified in the table properties AND the row group
+  // is not fully dictionary encoded.
+  IF_NO_DICT,
+
+  // Always write Parquet Bloom filters if specified in the table properties,
+  // even if the row group is fully dictionary encoded.
+  ALWAYS
+}
+
 // constants for TQueryOptions.num_nodes
 const i32 NUM_NODES_ALL = 0
 const i32 NUM_NODES_ALL_RACKS = -1
@@ -528,6 +542,10 @@ struct TQueryOptions {
 
   // See comment in ImpalaService.thrift
   134: optional bool minmax_filter_partition_columns = true;
+
+  // See comment in ImpalaService.thrift
+  135: optional TParquetBloomFilterWrite parquet_bloom_filter_write =
+      TParquetBloomFilterWrite.IF_NO_DICT;
 }
 
 // Impala currently has three types of sessions: Beeswax, HiveServer2 and external
diff --git a/fe/src/main/java/org/apache/impala/planner/HdfsTableSink.java b/fe/src/main/java/org/apache/impala/planner/HdfsTableSink.java
index 0fc4e96..76b68a1 100644
--- a/fe/src/main/java/org/apache/impala/planner/HdfsTableSink.java
+++ b/fe/src/main/java/org/apache/impala/planner/HdfsTableSink.java
@@ -18,6 +18,8 @@
 package org.apache.impala.planner;
 
 import java.util.ArrayList;
+import java.util.Map;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Set;
 
@@ -36,18 +38,36 @@ import org.apache.impala.thrift.TQueryOptions;
 import org.apache.impala.thrift.TSortingOrder;
 import org.apache.impala.thrift.TTableSink;
 import org.apache.impala.thrift.TTableSinkType;
+import org.apache.impala.util.BitUtil;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Sets;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 /**
  * Sink for inserting into filesystem-backed tables.
  *
  * TODO(vercegovac): rename to FsTableSink
  */
 public class HdfsTableSink extends TableSink {
+  private final static Logger LOG = LoggerFactory.getLogger(HdfsTableSink.class);
+
+  // The name of the table property that sets the parameters of writing Parquet Bloom
+  // filters.
+  public static final String PARQUET_BLOOM_FILTER_WRITING_TBL_PROPERTY =
+    "parquet.bloom.filter.columns";
+
+  // These constants are the maximal and minimal size of the bitset of a
+  // ParquetBloomFilter object (in the BE). These should be kept in sync with the values
+  // in be/src/util/parquet-bloom-filter.h.
+  public static final long PARQUET_BLOOM_FILTER_MAX_BYTES = 128 * 1024 * 1024;
+  public static final long PARQUET_BLOOM_FILTER_MIN_BYTES = 64;
+
   // Default number of partitions used for computeResourceProfile() in the absence of
   // column stats.
   protected final long DEFAULT_NUM_PARTITIONS = 10;
@@ -221,6 +241,55 @@ public class HdfsTableSink extends TableSink {
     return "HDFS WRITER";
   }
 
+  // The table property has the following format: a comma separated list of
+  // 'col_name:bitset_size' pairs. The 'bitset_size' part means the size of the bitset of
+  // the Bloom filter, and is optional. Values will be rounded up to the smallest power of
+  // 2 not less than the given number. If the size is not given, it will be the maximal
+  // Bloom filter size (PARQUET_BLOOM_FILTER_MAX_BYTES). No Bloom filter will be written
+  // for columns not listed here.
+  // Example: "col1:1024,col2,col4:100'.
+  @VisibleForTesting
+  static Map<String, Long> parseParquetBloomFilterWritingTblProp(final String tbl_prop) {
+    Map<String, Long> result = new HashMap<>();
+    String[] colSizePairs = tbl_prop.split(",");
+    for (String colSizePair : colSizePairs) {
+      String[] tokens = colSizePair.split(":");
+
+      if (tokens.length == 0 || tokens.length > 2) {
+        String err = "Invalid token in table property "
+          + PARQUET_BLOOM_FILTER_WRITING_TBL_PROPERTY + ": "
+          + colSizePair.trim()
+          + ". Expected either a column name or a column name and a size "
+          + "separated by a colon (';').";
+        LOG.warn(err);
+        return null;
+      }
+
+      long size;
+      if (tokens.length == 1) {
+        size = PARQUET_BLOOM_FILTER_MAX_BYTES;
+      } else {
+        assert tokens.length == 2;
+        try {
+          size = Long.parseLong(tokens[1].trim());
+        } catch (NumberFormatException e) {
+          String err =
+                "Invalid bitset size in table property "
+                + PARQUET_BLOOM_FILTER_WRITING_TBL_PROPERTY + ": "
+                + tokens[1].trim();
+          LOG.warn(err);
+          return null;
+        }
+
+        size = Long.max(PARQUET_BLOOM_FILTER_MIN_BYTES, size);
+        size = Long.min(PARQUET_BLOOM_FILTER_MAX_BYTES, size);
+        size = BitUtil.roundUpToPowerOf2(size);
+      }
+      result.put(tokens[0].trim(), size);
+    }
+    return result;
+  }
+
   @Override
   protected void toThriftImpl(TDataSink tsink) {
     THdfsTableSink hdfsTableSink = new THdfsTableSink(
@@ -234,6 +303,16 @@ public class HdfsTableSink extends TableSink {
     if (skipHeaderLineCount > 0) {
       hdfsTableSink.setSkip_header_line_count(skipHeaderLineCount);
     }
+
+    org.apache.hadoop.hive.metastore.api.Table msTbl = table.getMetaStoreTable();
+    Map<String, String> params = msTbl.getParameters();
+    String parquetBloomTblProp = params.get(PARQUET_BLOOM_FILTER_WRITING_TBL_PROPERTY);
+    if (parquetBloomTblProp != null) {
+      Map<String, Long> parsedProperties = parseParquetBloomFilterWritingTblProp(
+          parquetBloomTblProp);
+      hdfsTableSink.setParquet_bloom_filter_col_info(parsedProperties);
+    }
+
     hdfsTableSink.setSort_columns(sortColumns_);
     hdfsTableSink.setSorting_order(sortingOrder_);
     hdfsTableSink.setIs_result_sink(isResultSink_);
diff --git a/fe/src/test/java/org/apache/impala/planner/ParquetBloomFilterTblPropParserTest.java b/fe/src/test/java/org/apache/impala/planner/ParquetBloomFilterTblPropParserTest.java
new file mode 100644
index 0000000..b0ede1a
--- /dev/null
+++ b/fe/src/test/java/org/apache/impala/planner/ParquetBloomFilterTblPropParserTest.java
@@ -0,0 +1,88 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.planner;
+
+import org.apache.log4j.Logger;
+import org.junit.Test;
+import static org.junit.Assert.assertEquals;
+
+import com.google.common.collect.ImmutableMap;
+
+import java.util.List;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.Random;
+
+public class ParquetBloomFilterTblPropParserTest {
+  private static final Logger LOG = Logger.getLogger(
+      ParquetBloomFilterTblPropParserTest.class);
+
+  @Test
+  public void testParsingOnlyColNames() throws Exception {
+    final String props = "col1,col2,col3";
+    final Map<String, Long> exp = ImmutableMap.of(
+      "col1", HdfsTableSink.PARQUET_BLOOM_FILTER_MAX_BYTES,
+      "col2", HdfsTableSink.PARQUET_BLOOM_FILTER_MAX_BYTES,
+      "col3", HdfsTableSink.PARQUET_BLOOM_FILTER_MAX_BYTES
+      );
+
+    parseAndCheck(props, exp);
+  }
+
+  @Test
+  public void testParsingAllSizesGiven() {
+    final String props = "col1:128,col2:256,col3:64";
+    final Map<String, Long> exp = ImmutableMap.of(
+      "col1", 128l,
+      "col2", 256l,
+      "col3", 64l
+      );
+
+    parseAndCheck(props, exp);
+  }
+
+  @Test
+  public void testParsingSomeSizesGiven() {
+    final String props = "col1:128,col2,col3:64";
+    final Map<String, Long> exp = ImmutableMap.of(
+      "col1", 128l,
+      "col2", HdfsTableSink.PARQUET_BLOOM_FILTER_MAX_BYTES,
+      "col3", 64l
+    );
+
+    parseAndCheck(props, exp);
+  }
+
+  @Test
+  public void testParsingContainsWhitespace() {
+    final String props = "col1 : 128, col2, \ncol3: 64 \t";
+    final Map<String, Long> exp = ImmutableMap.of(
+      "col1", 128l,
+      "col2", HdfsTableSink.PARQUET_BLOOM_FILTER_MAX_BYTES,
+      "col3", 64l
+    );
+
+    parseAndCheck(props, exp);
+  }
+
+  private void parseAndCheck(final String tbl_props, final Map<String, Long> exp_res) {
+    final Map<String, Long> res = HdfsTableSink.parseParquetBloomFilterWritingTblProp(
+        tbl_props);
+    assertEquals(exp_res, res);
+  }
+}
diff --git a/tests/query_test/test_parquet_bloom_filter.py b/tests/query_test/test_parquet_bloom_filter.py
index 8ae2eac..998cfd3 100644
--- a/tests/query_test/test_parquet_bloom_filter.py
+++ b/tests/query_test/test_parquet_bloom_filter.py
@@ -15,8 +15,20 @@
 # specific language governing permissions and limitations
 # under the License.
 
+import math
+import os
+
+from collections import namedtuple
+from parquet.ttypes import BloomFilterHeader
+from subprocess import check_call
+
 from tests.common.file_utils import create_table_and_copy_files
 from tests.common.impala_test_suite import ImpalaTestSuite
+from tests.util.filesystem_utils import get_fs_path
+from tests.util.get_parquet_metadata import (
+    get_parquet_metadata,
+    read_serialized_object
+)
 
 
 class TestParquetBloomFilter(ImpalaTestSuite):
@@ -24,6 +36,35 @@ class TestParquetBloomFilter(ImpalaTestSuite):
   This suite tests Parquet Bloom filter optimizations.
   """
 
+  # Filename relative to $IMPALA_HOME. Some functions ('create_table_and_copy_files')
+  # prepend $IMPALA_HOME automatically, so we cannot include it here. Other functions,
+  # like 'get_parquet_metadata'  need a full path.
+  PARQUET_TEST_FILE = 'testdata/data/parquet-bloom-filtering.parquet'
+  BloomFilterData = namedtuple('BloomFilterData', ['header', 'directory'])
+
+  # The statement used to create the test tables.
+  create_stmt = 'create table {db}.{tbl} (        '\
+                '  int8_col TINYINT,              '\
+                '  int16_col SMALLINT,            '\
+                '  int32_col INT,                 '\
+                '  int64_col BIGINT,              '\
+                '  float_col FLOAT,               '\
+                '  double_col DOUBLE,             '\
+                '  string_col STRING,             '\
+                '  char_col VARCHAR(3)            '\
+                ')                                '\
+                'stored as parquet                '
+  set_tbl_props_stmt = 'alter table {db}.{tbl} '                            \
+                       'set TBLPROPERTIES ("parquet.bloom.filter.columns"="'\
+                       'int8_col   : 512,'                                  \
+                       'int16_col  : 512,'                                  \
+                       'int32_col  : 512,'                                  \
+                       'int64_col  : 512,'                                  \
+                       'float_col  : 512,'                                  \
+                       'double_col : 512,'                                  \
+                       'string_col : 512,'                                  \
+                       '");'
+
   @classmethod
   def get_workload(cls):
     return 'functional-query'
@@ -36,7 +77,7 @@ class TestParquetBloomFilter(ImpalaTestSuite):
 
   def test_parquet_bloom_filtering(self, vector, unique_database):
     """ Tests that Parquet Bloom filtering works when it is enabled. """
-    self.create_test_database(unique_database)
+    self._create_test_table_from_file(str(unique_database), self.PARQUET_TEST_FILE)
 
     # The test makes assumptions about the number of row groups that are processed and
     # skipped inside a fragment, so we ensure that the tests run in a single fragment.
@@ -44,9 +85,9 @@ class TestParquetBloomFilter(ImpalaTestSuite):
     vector.get_value('exec_option')['parquet_bloom_filtering'] = True
     self.run_test_case('QueryTest/parquet-bloom-filter', vector, unique_database)
 
-  def test_parquet_bloom_filtering_off(self, vector, unique_database):
+  def test_parquet_bloom_filtering_disabled(self, vector, unique_database):
     """ Check that there is no Parquet Bloom filtering when it is disabled. """
-    self.create_test_database(unique_database)
+    self._create_test_table_from_file(str(unique_database), self.PARQUET_TEST_FILE)
 
     # The test makes assumptions about the number of row groups that are processed and
     # skipped inside a fragment, so we ensure that the tests run in a single fragment.
@@ -54,18 +95,251 @@ class TestParquetBloomFilter(ImpalaTestSuite):
     vector.get_value('exec_option')['parquet_bloom_filtering'] = False
     self.run_test_case('QueryTest/parquet-bloom-filter-disabled', vector, unique_database)
 
-  def create_test_database(self, unique_database):
-    create_stmt = 'create table {db}.{tbl} (        '\
-                  '  int8_col TINYINT,              '\
-                  '  int16_col SMALLINT,            '\
-                  '  int32_col INT,                 '\
-                  '  int64_col BIGINT,              '\
-                  '  float_col FLOAT,               '\
-                  '  double_col DOUBLE,             '\
-                  '  string_col STRING,             '\
-                  '  char_col VARCHAR(3)            '\
-                  ')                                '\
-                  'stored as parquet                '
-    create_table_and_copy_files(self.client, create_stmt,
-                                unique_database, 'parquet_bloom_filter',
-                                ['testdata/data/parquet-bloom-filtering.parquet'])
+  def test_write_parquet_bloom_filter(self, vector, unique_database, tmpdir):
+    # Get Bloom filters from the first row group of file PARQUET_TEST_FILE.
+    reference_col_to_bloom_filter = self._get_first_row_group_bloom_filters(
+        self.PARQUET_TEST_FILE)
+
+    # Create a new Parquet file with the same data as in PARQUET_TEST_FILE.
+    tbl_name = 'parquet_bloom_filter_writing'
+    hdfs_path = self._create_empty_test_table(vector, str(unique_database), tbl_name)
+    self._set_tbl_props_to_match_test_file(vector, str(unique_database), tbl_name)
+    self._populate_table(vector, str(unique_database), tbl_name)
+
+    # Get the created Parquet file and extract the Bloom filters.
+    col_to_bloom_filter_list = self._get_first_row_group_bloom_filters_from_hdfs_dir(
+        hdfs_path, tmpdir)
+    # There should be exactly one file as we have written one row group.
+    assert len(col_to_bloom_filter_list) == 1
+    col_to_bloom_filter = col_to_bloom_filter_list[0]
+    self._compare_bloom_filters_to_reference(
+        reference_col_to_bloom_filter, col_to_bloom_filter)
+
+    # Query an element that is and one that is not present in the table in column
+    # 'column_name'. In the first case there should be no skipping, in the second case we
+    # should skip the row group because of dictionary filtering, not Bloom filtering as
+    # all the elements fit in the dictionary and if there is a dictionary we use that, not
+    # the Bloom filter.
+    column_name = 'int64_col'
+    self._query_element_check_profile(vector, str(unique_database), tbl_name, column_name,
+        0, ['NumBloomFilteredRowGroups: 0 (0)'], ['NumBloomFilteredRowGroups: 1 (1)'])
+    self._query_element_check_profile(vector, str(unique_database), tbl_name, column_name,
+        1, ['NumBloomFilteredRowGroups: 0 (0)', 'NumDictFilteredRowGroups: 1 (1)'],
+        ['NumBloomFilteredRowGroups: 1 (1)', 'NumDictFilteredRowGroups: 0 (0)'])
+
+  def test_fallback_from_dict(self, vector, unique_database, tmpdir):
+    """ Tests falling back from dict encoding to plain encoding and using a Bloom filter
+    after reaching the max dict size. """
+    tbl_name = 'fallback_from_dict'
+    column_name = 'col'
+    self._create_table_dict_overflow(vector, str(unique_database), tbl_name,
+        column_name, True)
+
+    # Get the created Parquet file and extract the Bloom filters.
+    hdfs_path = self._get_hdfs_path(str(unique_database), tbl_name)
+    col_idx_to_bloom_filter_list = self._get_first_row_group_bloom_filters_from_hdfs_dir(
+        hdfs_path, tmpdir)
+    # There should be exactly one file as we have written one row group.
+    assert len(col_idx_to_bloom_filter_list) == 1
+    col_idx_to_bloom_filter = col_idx_to_bloom_filter_list[0]
+    # The index of the only column is 0.
+    bloom_filter_data = col_idx_to_bloom_filter[0]
+    bitset = bloom_filter_data.directory
+
+    # We should have inserted 'max_dict' + 1 elements into the Bloom filter when falling
+    # back. If the implementation is incorrect and we did not copy the elements in the
+    # dictionary of the dict encoding to the Bloom filter, only 1 element will have been
+    # inserted, meaning that exactly 8 bytes have non-zero values. If there are more
+    # non-zero bytes we can assume that the implementation does not have this error.
+    assert isinstance(bitset, bytes)
+    nonzero = 0
+    for byte in bitset:
+      if byte != 0:
+        nonzero += 1
+    assert nonzero > 8
+
+    # Query an element that is and one that is not present in the table and check whether
+    # we correctly do not skip and skip the row group, respectively.
+    self._query_element_check_profile(vector, str(unique_database), tbl_name, column_name,
+        0, ['NumBloomFilteredRowGroups: 0 (0)'], ['NumBloomFilteredRowGroups: 1 (1)'])
+    self._query_element_check_profile(vector, str(unique_database), tbl_name, column_name,
+        1, ['NumBloomFilteredRowGroups: 1 (1)'], ['NumBloomFilteredRowGroups: 0 (0)'])
+
+  def test_fallback_from_dict_if_no_bloom_tbl_props(self, vector, unique_database,
+      tmpdir):
+    """ Tests falling back from dict encoding to plain encoding when Bloom filtering is
+    not enabled in table properties, after reaching the max dict size. We check that no
+    Bloom filter is written. """
+    tbl_name = 'fallback_from_dict'
+    column_name = 'col'
+    self._create_table_dict_overflow(vector, str(unique_database), tbl_name, column_name,
+        False)
+
+    # Get the created Parquet file.
+    hdfs_path = self._get_hdfs_path(str(unique_database), tbl_name)
+    col_idx_to_bloom_filter_list = self._get_first_row_group_bloom_filters_from_hdfs_dir(
+        hdfs_path, tmpdir)
+    # There should be exactly one file as we have written one row group.
+    assert len(col_idx_to_bloom_filter_list) == 1
+    col_idx_to_bloom_filter = col_idx_to_bloom_filter_list[0]
+    # There should be no Bloom filter.
+    assert(len(col_idx_to_bloom_filter) == 0)
+
+  def _query_element_check_profile(self, vector, db_name, tbl_name, col_name,
+          element, strings_in_profile, strings_not_in_profile):
+    """ Run a query filtering on column 'col_name' having value 'element' and asserts
+    that the query profile contains the strings in 'strings_in_profile' and that it does
+    not contain the strings in 'strings_not_in_profile'. Can be used for example to
+    check whether the Bloom filter was used. """
+    query_stmt = 'select {col_name} from {db}.{tbl} where {col_name} = {value}'
+
+    result_in_table = self.execute_query(query_stmt.format(col_name=col_name,
+        db=db_name, tbl=tbl_name, value=element),
+        vector.get_value('exec_option'))
+    for s in strings_in_profile:
+      assert s in result_in_table.runtime_profile
+    for s in strings_not_in_profile:
+      assert s not in result_in_table.runtime_profile
+
+  def _create_table_dict_overflow(self, vector, db_name, tbl_name, column_name,
+      bloom_tbl_prop):
+    max_dict_size = 40000
+    ndv = max_dict_size + 1
+    fpp = 0.05
+    bitset_size = self._optimal_bitset_size(ndv, fpp)
+
+    # We create a table with a single BIGINT column, optionally with table properties for
+    # Bloom filtering.
+    create_stmt = 'create table {db}.{tbl} ({col_name} BIGINT) stored as parquet'
+    if bloom_tbl_prop:
+      create_stmt += ' TBLPROPERTIES("parquet.bloom.filter.columns"="{col_name}:{size}")'
+    create_stmt = create_stmt.format(
+        db=db_name, tbl=tbl_name, col_name=column_name, size=bitset_size)
+
+    # We only insert even numbers so an odd number should be filtered out based on the
+    # Bloom filter.
+    values = ['({})'.format(i * 2) for i in range(ndv)]
+    insert_stmt = 'insert into {db}.{tbl} values {values}'.format(
+        db=db_name, tbl=tbl_name, values=','.join(values))
+
+    vector.get_value('exec_option')['num_nodes'] = 1
+    vector.get_value('exec_option')['parquet_bloom_filter_write'] = 'IF_NO_DICT'
+    self.execute_query(create_stmt, vector.get_value('exec_option'))
+    self.execute_query(insert_stmt, vector.get_value('exec_option'))
+
+  def _optimal_bitset_size(self, ndv, fpp):
+    """ Based on ParquetBloomFilter::OptimalByteSize() in
+    be/src/util/parquet-bloom-filter.h """
+    log_res = None
+    if (ndv == 0):
+      log_res = 0
+    else:
+      words_in_bucket = 8.0
+      m = -words_in_bucket * ndv / math.log(1 - math.pow(fpp, 1.0 / words_in_bucket))
+      log_res = max(0, math.ceil(math.log(m / 8, 2)))
+    return int(2 ** log_res)
+
+  def _create_test_table_from_file(self, db_name, filename):
+    create_table_and_copy_files(self.client, self.create_stmt,
+                                db_name, 'parquet_bloom_filter', [filename])
+
+  def _create_empty_test_table(self, vector, db_name, tbl_name):
+    self.execute_query("drop table if exists {0}.{1}".format(db_name, tbl_name))
+    vector.get_value('exec_option')['num_nodes'] = 1
+    query = self.create_stmt.format(db=db_name, tbl=tbl_name)
+    self.execute_query(query, vector.get_value('exec_option'))
+    return self._get_hdfs_path(db_name, tbl_name)
+
+  def _get_hdfs_path(self, db_name, tbl_name):
+    return get_fs_path('/test-warehouse/{0}.db/{1}/'.format(db_name, tbl_name))
+
+  def _set_tbl_props_to_match_test_file(self, vector, db_name, tbl_name):
+    vector.get_value('exec_option')['num_nodes'] = 1
+    query = self.set_tbl_props_stmt.format(db=db_name, tbl=tbl_name)
+    self.execute_query(query, vector.get_value('exec_option'))
+
+  def _populate_table(self, vector, db_name, tbl_name):
+    # Populate the table with even numbers, as in the first row group in the file
+    # PARQUET_TEST_FILE
+    query_format = 'insert into {db}.{tbl} values {{values}}'.format(
+        db=db_name, tbl=tbl_name)
+    rows = []
+    for i in range(100):
+      k = (i * 2) % 128
+      row_values = '({0}, {0}, {0}, {0}, {0}.0, {0}.0, \
+          "{0}", cast("{0}" as VARCHAR(3)))'.format(k)
+      rows.append(row_values)
+    vector.get_value('exec_option')['num_nodes'] = 1
+    vector.get_value('exec_option')['parquet_bloom_filter_write'] = 'ALWAYS'
+    self.execute_query(query_format.format(values=", ".join(rows)),
+        vector.get_value('exec_option'))
+
+  def _get_first_row_group_bloom_filters_from_hdfs_dir(self, hdfs_path, tmpdir):
+    """ Returns the bloom filters from the first row group (like
+    _get_first_row_group_bloom_filters) from each file in the hdfs directory. """
+    # Get the created Parquet file and extract the Bloom filters.
+    check_call(['hdfs', 'dfs', '-get', hdfs_path, tmpdir.strpath])
+    col_to_bloom_filter_list = []
+    for root, _subdirs, files in os.walk(tmpdir.strpath):
+      for filename in files:
+        parquet_file = os.path.join(root, str(filename))
+        col_to_bloom_filter_list.append(
+            self._get_first_row_group_bloom_filters(parquet_file))
+    return col_to_bloom_filter_list
+
+  def _get_first_row_group_bloom_filters(self, parquet_file):
+    # While other functions require a filename relative to $IMPALA_HOME, and prepend the
+    # path of $IMPALA_HOME but this one does not so we have to prepend it ourselves.
+    filename = os.path.join(os.environ['IMPALA_HOME'],
+        parquet_file)
+    file_meta_data = get_parquet_metadata(filename)
+    # We only support flat schemas, the additional element is the root element.
+    schemas = file_meta_data.schema[1:]
+    # We are only interested in the first row group.
+    row_group = file_meta_data.row_groups[0]
+    assert len(schemas) == len(row_group.columns)
+    col_to_bloom_filter = dict()
+    with open(filename) as file_handle:
+      for i, column in enumerate(row_group.columns):
+        column_meta_data = column.meta_data
+        if column_meta_data and column_meta_data.bloom_filter_offset:
+          bloom_filter = self._try_read_bloom_filter(file_handle,
+              column_meta_data.bloom_filter_offset)
+          if bloom_filter:
+            col_to_bloom_filter[i] = bloom_filter
+    return col_to_bloom_filter
+
+  def _try_read_bloom_filter(self, file_handle, bloom_filter_offset):
+    (header, header_size) = self._try_read_bloom_filter_header(
+        file_handle, bloom_filter_offset)
+    if header is None:
+      return None
+    file_handle.seek(bloom_filter_offset + header_size)
+    bloom_filter_bytes = file_handle.read(header.numBytes)
+    return self.BloomFilterData(header, bloom_filter_bytes)
+
+  def _try_read_bloom_filter_header(self, file_handle, bloom_filter_offset):
+    """ Returns the Bloom filter header and its size. If it is not found, None is returned
+    instead of the header and the size is unspecified. """
+    header = None
+    header_size = 8
+    while (header_size <= 1024 and header is None):
+      try:
+        header = read_serialized_object(BloomFilterHeader, file_handle,
+          bloom_filter_offset, header_size)
+      except EOFError:
+        header_size *= 2
+    return (header, header_size)
+
+  def _compare_bloom_filters_to_reference(self,
+       reference_col_to_bloom_filter, col_to_bloom_filter):
+    expected_cols = [0, 1, 2, 3, 4, 5, 6]
+    assert sorted(col_to_bloom_filter.keys()) == expected_cols,\
+        "All columns except the last one (VARCHAR(3)) should have a Bloom filter."
+    for col in expected_cols:
+      (exp_header, exp_directory) = reference_col_to_bloom_filter[col]
+      (header, directory) = col_to_bloom_filter[col]
+
+      assert exp_header == header
+      assert exp_directory == directory,\
+          "Incorrect directory for Bloom filter for column no. {}.".format(col)