You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by bo...@apache.org on 2021/07/14 13:11:48 UTC

[impala] branch master updated (77283d8 -> 9d46255)

This is an automated email from the ASF dual-hosted git repository.

boroknagyz pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git.


    from 77283d8  IMPALA-10703: Fix crash on reading ACID table while printing SchemaPath of tuple/slots.
     new 85ebfe2  IMPALA-10490: Fix illegalStateException in drop stats
     new a5de2ac  IMPALA-10642: Write support for Parquet Bloom filters - most common types
     new 9d46255  IMPALA-7087, IMPALA-8131: Read decimals from Parquet files with different precision/scale

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 be/src/exec/hdfs-table-sink.cc                     |   4 +
 be/src/exec/hdfs-table-sink.h                      |   8 +
 be/src/exec/parquet/hdfs-parquet-scanner.cc        |   2 +-
 be/src/exec/parquet/hdfs-parquet-table-writer.cc   | 247 ++++++++++++++++
 be/src/exec/parquet/hdfs-parquet-table-writer.h    |   5 +
 be/src/exec/parquet/parquet-bloom-filter-util.cc   |  10 +
 be/src/exec/parquet/parquet-bloom-filter-util.h    |   3 +
 be/src/exec/parquet/parquet-column-readers.cc      | 142 ++++------
 be/src/exec/parquet/parquet-column-stats.cc        |  43 ++-
 be/src/exec/parquet/parquet-column-stats.h         |   9 +-
 be/src/exec/parquet/parquet-data-converter.h       | 184 ++++++++++++
 be/src/exec/parquet/parquet-metadata-utils.cc      |  30 +-
 be/src/exprs/decimal-operators-ir.cc               |  18 +-
 be/src/runtime/decimal-value.h                     |   4 +-
 be/src/runtime/decimal-value.inline.h              |   8 +-
 be/src/service/query-options.cc                    |   7 +
 be/src/service/query-options.h                     |   4 +-
 be/src/util/CMakeLists.txt                         |  26 ++
 be/src/util/debug-util.cc                          |   2 +-
 be/src/util/debug-util.h                           |   1 +
 be/src/util/dict-encoding.h                        |  14 +
 .../parquet-bloom-filter-avx2.cc}                  |  61 ++--
 be/src/util/parquet-bloom-filter-test.cc           |   4 +-
 be/src/util/parquet-bloom-filter.cc                |  85 +++---
 be/src/util/parquet-bloom-filter.h                 |  57 ++--
 common/thrift/DataSinks.thrift                     |   4 +
 common/thrift/ImpalaService.thrift                 |   9 +
 common/thrift/Query.thrift                         |  18 ++
 .../org/apache/impala/planner/HdfsTableSink.java   |  79 ++++++
 .../apache/impala/service/CatalogOpExecutor.java   |  12 +
 .../impala/service/IcebergCatalogOpExecutor.java   |   8 +-
 .../ParquetBloomFilterTblPropParserTest.java       |  88 ++++++
 testdata/data/README                               |   5 +
 ...ry_decimal_precision_and_scale_widening.parquet | Bin 0 -> 984 bytes
 .../queries/QueryTest/iceberg-alter.test           |  21 ++
 .../queries/QueryTest/iceberg-negative.test        |   6 -
 ...rquet-decimal-precision-and-scale-altering.test | 236 ++++++++++++++++
 ...rquet-decimal-precision-and-scale-widening.test | 104 +++++++
 tests/custom_cluster/test_event_processing.py      |  17 +-
 tests/query_test/test_parquet_bloom_filter.py      | 310 +++++++++++++++++++--
 tests/query_test/test_scanners.py                  |  55 ++++
 41 files changed, 1676 insertions(+), 274 deletions(-)
 create mode 100644 be/src/exec/parquet/parquet-data-converter.h
 copy be/src/{kudu/util/block_bloom_filter_avx2.cc => util/parquet-bloom-filter-avx2.cc} (64%)
 create mode 100644 fe/src/test/java/org/apache/impala/planner/ParquetBloomFilterTblPropParserTest.java
 create mode 100644 testdata/data/binary_decimal_precision_and_scale_widening.parquet
 create mode 100644 testdata/workloads/functional-query/queries/QueryTest/parquet-decimal-precision-and-scale-altering.test
 create mode 100644 testdata/workloads/functional-query/queries/QueryTest/parquet-decimal-precision-and-scale-widening.test

[impala] 02/03: IMPALA-10642: Write support for Parquet Bloom filters - most common types

Posted by bo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

boroknagyz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit a5de2acc47723fdaee4ebe6d904d16be505b7cfb
Author: Daniel Becker <da...@cloudera.com>
AuthorDate: Thu Mar 18 17:10:49 2021 +0100

    IMPALA-10642: Write support for Parquet Bloom filters - most common types
    
    This change adds support for writing Parquet Bloom filters for the types
    for which read support was added in IMPALA-10640.
    
    Writing of Parquet Bloom filters can be controlled by the
    'parquet_bloom_filter_write' query option and the
    'parquet.bloom.filter.columns' table property. The query option has the
    following possible values:
      NEVER      - never write Parquet Bloom filters
      IF_NO_DICT - write Parquet Bloom filters if specified in the table
                   properties AND if the row group is not fully
                   dictionary encoded (the number of distinct values exceeds
                   the maximum dictionary size)
      ALWAYS     - always write Parquet Bloom filters if specified in the
                   table properties, even if the row group is fully
                   dictionary encoded
    
    The 'parquet.bloom.filter.columns' table property is a comma separated
    list of 'col_name:bytes' pairs. The 'bytes' part means the size of the
    bitset of the Bloom filter, and is optional. If the size is not given,
    it will be the maximal Bloom filter size
    (ParquetBloomFilter::MAX_BYTES).
    Example: "col1:1024,col2,col4:100'.
    
    Testing:
      - Added a test in tests/query_test/test_parquet_bloom_filter.py that
        uses Impala to write the same table as in the test file
        'testdata/data/parquet-bloom-filtering.parquet' and checks whether
        the Parquet Bloom filter header and bitset are identical.
      - 'test_fallback_from_dict' tests falling back from dict encoding to
        plain and using Bloom filters.
      - 'test_fallback_from_dict_if_no_bloom_tbl_props' tests falling back
        from dict encoding to plain when Bloom filters are NOT enabled.
    
    Change-Id: Ie865efd4f0c11b9e111fb94f77d084bf6ee20792
    Reviewed-on: http://gerrit.cloudera.org:8080/17262
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Reviewed-by: Csaba Ringhofer <cs...@cloudera.com>
    Tested-by: Csaba Ringhofer <cs...@cloudera.com>
---
 be/src/exec/hdfs-table-sink.cc                     |   4 +
 be/src/exec/hdfs-table-sink.h                      |   8 +
 be/src/exec/parquet/hdfs-parquet-scanner.cc        |   2 +-
 be/src/exec/parquet/hdfs-parquet-table-writer.cc   | 247 ++++++++++++++++
 be/src/exec/parquet/hdfs-parquet-table-writer.h    |   5 +
 be/src/exec/parquet/parquet-bloom-filter-util.cc   |  10 +
 be/src/exec/parquet/parquet-bloom-filter-util.h    |   3 +
 be/src/service/query-options.cc                    |   7 +
 be/src/service/query-options.h                     |   4 +-
 be/src/util/CMakeLists.txt                         |  26 ++
 be/src/util/debug-util.cc                          |   2 +-
 be/src/util/debug-util.h                           |   1 +
 be/src/util/dict-encoding.h                        |  14 +
 be/src/util/parquet-bloom-filter-avx2.cc           |  92 ++++++
 be/src/util/parquet-bloom-filter-test.cc           |   4 +-
 be/src/util/parquet-bloom-filter.cc                |  85 +++---
 be/src/util/parquet-bloom-filter.h                 |  57 ++--
 common/thrift/DataSinks.thrift                     |   4 +
 common/thrift/ImpalaService.thrift                 |   9 +
 common/thrift/Query.thrift                         |  18 ++
 .../org/apache/impala/planner/HdfsTableSink.java   |  79 ++++++
 .../ParquetBloomFilterTblPropParserTest.java       |  88 ++++++
 tests/query_test/test_parquet_bloom_filter.py      | 310 +++++++++++++++++++--
 23 files changed, 997 insertions(+), 82 deletions(-)

diff --git a/be/src/exec/hdfs-table-sink.cc b/be/src/exec/hdfs-table-sink.cc
index 7d7841f..02099fb 100644
--- a/be/src/exec/hdfs-table-sink.cc
+++ b/be/src/exec/hdfs-table-sink.cc
@@ -104,6 +104,10 @@ HdfsTableSink::HdfsTableSink(TDataSinkId sink_id, const HdfsTableSinkConfig& sin
   if (hdfs_sink.__isset.external_output_partition_depth) {
     external_output_partition_depth_ = hdfs_sink.external_output_partition_depth;
   }
+
+  if (hdfs_sink.__isset.parquet_bloom_filter_col_info) {
+    parquet_bloom_filter_columns_ = hdfs_sink.parquet_bloom_filter_col_info;
+  }
 }
 
 Status HdfsTableSink::Prepare(RuntimeState* state, MemTracker* parent_mem_tracker) {
diff --git a/be/src/exec/hdfs-table-sink.h b/be/src/exec/hdfs-table-sink.h
index 2263531..13d4486 100644
--- a/be/src/exec/hdfs-table-sink.h
+++ b/be/src/exec/hdfs-table-sink.h
@@ -115,6 +115,10 @@ class HdfsTableSink : public DataSink {
   TSortingOrder::type sorting_order() const { return sorting_order_; }
   const HdfsTableDescriptor& TableDesc() { return *table_desc_; }
 
+  const std::map<string, int64_t>& GetParquetBloomFilterColumns() const {
+    return parquet_bloom_filter_columns_;
+  }
+
   RuntimeProfile::Counter* rows_inserted_counter() { return rows_inserted_counter_; }
   RuntimeProfile::Counter* bytes_written_counter() { return bytes_written_counter_; }
   RuntimeProfile::Counter* encode_timer() { return encode_timer_; }
@@ -274,6 +278,10 @@ class HdfsTableSink : public DataSink {
   // sink which directories are pre-created.
   int external_output_partition_depth_ = 0;
 
+  /// Map from column names to Parquet Bloom filter bitset sizes. Columns for which
+  /// Parquet Bloom filtering is not enabled are not listed.
+  std::map<std::string, int64_t> parquet_bloom_filter_columns_;
+
   /// string representation of the unique fragment instance id. Used for per-partition
   /// Hdfs file names, and for tmp Hdfs directories. Set in Prepare();
   std::string unique_id_str_;
diff --git a/be/src/exec/parquet/hdfs-parquet-scanner.cc b/be/src/exec/parquet/hdfs-parquet-scanner.cc
index ff34eee..f804e36 100644
--- a/be/src/exec/parquet/hdfs-parquet-scanner.cc
+++ b/be/src/exec/parquet/hdfs-parquet-scanner.cc
@@ -2065,7 +2065,7 @@ Status HdfsParquetScanner::ProcessBloomFilter(const parquet::RowGroup&
 
       // Construct ParquetBloomFilter instance.
       ParquetBloomFilter bloom_filter;
-      RETURN_IF_ERROR(bloom_filter.Init(data_buffer.buffer(), data_buffer.Size()));
+      RETURN_IF_ERROR(bloom_filter.Init(data_buffer.buffer(), data_buffer.Size(), false));
 
       const uint64_t hash = col_idx_to_hash.second;
       if (!bloom_filter.Find(hash)) {
diff --git a/be/src/exec/parquet/hdfs-parquet-table-writer.cc b/be/src/exec/parquet/hdfs-parquet-table-writer.cc
index 2069f42..01ab616 100644
--- a/be/src/exec/parquet/hdfs-parquet-table-writer.cc
+++ b/be/src/exec/parquet/hdfs-parquet-table-writer.cc
@@ -17,12 +17,14 @@
 
 #include "exec/parquet/hdfs-parquet-table-writer.h"
 
+#include <boost/algorithm/string.hpp>
 #include <boost/unordered_set.hpp>
 
 #include "common/version.h"
 #include "exec/hdfs-table-sink.h"
 #include "exec/parquet/parquet-column-stats.inline.h"
 #include "exec/parquet/parquet-metadata-utils.h"
+#include "exec/parquet/parquet-bloom-filter-util.h"
 #include "exprs/scalar-expr-evaluator.h"
 #include "exprs/scalar-expr.h"
 #include "rpc/thrift-util.h"
@@ -32,6 +34,7 @@
 #include "runtime/raw-value.h"
 #include "runtime/row-batch.h"
 #include "runtime/runtime-state.h"
+#include "runtime/scoped-buffer.h"
 #include "runtime/string-value.inline.h"
 #include "util/bit-stream-utils.h"
 #include "util/bit-util.h"
@@ -40,11 +43,13 @@
 #include "util/debug-util.h"
 #include "util/dict-encoding.h"
 #include "util/hdfs-util.h"
+#include "util/parquet-bloom-filter.h"
 #include "util/pretty-printer.h"
 #include "util/rle-encoding.h"
 #include "util/string-util.h"
 
 #include <sstream>
+#include <string>
 
 #include "gen-cpp/ImpalaService_types.h"
 
@@ -307,6 +312,12 @@ class HdfsParquetTableWriter::BaseColumnWriter {
   // the expression to an int64.
   virtual void* ConvertValue(void* value) { return value; }
 
+  // Some subclasses may write a ParquetBloomFilter, in which case they should override
+  // this method.
+  virtual const ParquetBloomFilter* GetParquetBloomFilter() const {
+    return nullptr;
+  }
+
   // Encodes out all data for the current page and updates the metadata.
   virtual Status FinalizeCurrentPage() WARN_UNUSED_RESULT;
 
@@ -430,9 +441,34 @@ class HdfsParquetTableWriter::ColumnWriter :
       const Codec::CodecInfo& codec_info, const std::string& col_name)
     : BaseColumnWriter(parent, eval, codec_info, col_name),
       num_values_since_dict_size_check_(0),
+      parquet_bloom_filter_bytes_(0),
+      parquet_bloom_filter_buffer_(table_sink_mem_tracker_),
       plain_encoded_value_size_(
           ParquetPlainEncoder::EncodedByteSize(eval->root().type())) {
     DCHECK_NE(eval->root().type().type, TYPE_BOOLEAN);
+
+    const std::map<string, int64_t>& col_to_size =
+      parent->parent_->GetParquetBloomFilterColumns();
+    const auto it = col_to_size.find(column_name());
+
+    if (GetBloomFilterWriteOption() == TParquetBloomFilterWrite::NEVER
+        || it == col_to_size.end()) {
+      // Parquet Bloom filtering is disabled altogether or is not turned on for this
+      // column.
+      parquet_bloom_filter_state_ = ParquetBloomFilterState::DISABLED;
+    } else {
+      // Parquet Bloom filtering is enabled for this column either immediately or if
+      // falling back from dict encoding.
+      parquet_bloom_filter_state_ = ParquetBloomFilterState::UNINITIALIZED;
+
+      // It is the responsibility of the FE to enforce the below constraints.
+      parquet_bloom_filter_bytes_ = it->second;
+      DCHECK_LE(parquet_bloom_filter_bytes_, ParquetBloomFilter::MAX_BYTES);
+      DCHECK_GE(parquet_bloom_filter_bytes_, ParquetBloomFilter::MIN_BYTES);
+      DCHECK(BitUtil::IsPowerOf2(parquet_bloom_filter_bytes_));
+    }
+    parquet_type_ = ParquetMetadataUtils::ConvertInternalToParquetType(type().type,
+        parent_->timestamp_type_);
   }
 
   virtual void Reset() {
@@ -450,6 +486,16 @@ class HdfsParquetTableWriter::ColumnWriter :
     page_stats_base_ = page_stats_.get();
     row_group_stats_.reset(
         new ColumnStats<T>(parent_->per_file_mem_pool_.get(), plain_encoded_value_size_));
+    if (parquet_bloom_filter_state_ != ParquetBloomFilterState::DISABLED) {
+      Status status = InitParquetBloomFilter();
+      if (!status.ok()) {
+        VLOG(google::WARNING)
+            << "Failed to initialise Parquet Bloom filter for column "
+            << column_name() << "."
+            << " Error message: " << status.msg().msg();
+        ReleaseParquetBloomFilterResources();
+      }
+    }
     row_group_stats_base_ = row_group_stats_.get();
   }
 
@@ -467,6 +513,7 @@ class HdfsParquetTableWriter::ColumnWriter :
       // If the dictionary contains the maximum number of values, switch to plain
       // encoding for the next page. The current page is full and must be written out.
       if (UNLIKELY(*bytes_needed < 0)) {
+        FlushDictionaryToParquetBloomFilterIfNeeded();
         next_page_encoding_ = parquet::Encoding::PLAIN;
         return false;
       }
@@ -497,9 +544,15 @@ class HdfsParquetTableWriter::ColumnWriter :
     }
 
     page_stats_->Update(*val);
+    UpdateParquetBloomFilterIfNeeded(val);
+
     return true;
   }
 
+  virtual const ParquetBloomFilter* GetParquetBloomFilter() const {
+    return parquet_bloom_filter_.get();
+  }
+
  private:
   // The period, in # of rows, to check the estimated dictionary page size against
   // the data page size. We want to start a new data page when the estimated size
@@ -525,10 +578,163 @@ class HdfsParquetTableWriter::ColumnWriter :
   // Tracks statistics per row group. This gets reset when starting a new row group.
   scoped_ptr<ColumnStats<T>> row_group_stats_;
 
+
+  enum struct ParquetBloomFilterState {
+    /// Parquet Bloom filtering is turned off either completely or for this column.
+    DISABLED,
+
+    /// The Parquet Bloom filter needs to be initialised before being used.
+    UNINITIALIZED,
+
+    /// The Bloom filter has been initialised but it is not being used as the dictionary
+    /// can hold all elements. If the dictionary becomes full and there are still new
+    /// elements, we fall back from dictionary encoding to plain encoding and start using
+    /// the Bloom filter.
+    WAIT_FOR_FALLBACK_FROM_DICT,
+
+    /// The Parquet Bloom filter is being used.
+    ENABLED,
+
+    /// An error occured with the Parquet Bloom filter so we are not using it.
+    FAILED
+  };
+
+  ParquetBloomFilterState parquet_bloom_filter_state_;
+  uint64_t parquet_bloom_filter_bytes_;
+  ScopedBuffer parquet_bloom_filter_buffer_;
+
+  // The parquet type corresponding to this->type(). Needed by the Parquet Bloom filter.
+  parquet::Type::type parquet_type_;
+
+  // Buffer used when converting values to the form that is used for hashing and insertion
+  // into the ParquetBloomFilter. The conversion function, 'BytesToParquetType' requires a
+  // vector to be able to allocate space if necessary. However, by caching the allocated
+  // buffer here we avoid the overhead of allocation for every conversion - when
+  // 'BytesToParquetType' calls 'resize' on the vector it will already have at least the
+  // desired length in most (or all) cases.
+  //
+  // We prellocate 16 bytes because that is the longest fixed size type (except for fixed
+  // length arrays).
+  std::vector<uint8_t> parquet_bloom_conversion_buffer{16, 0};
+
+  // The ParquetBloomFilter object if one is being written. If
+  // 'ShouldInitParquetBloomFilter()' is false, the combination of the impala type and the
+  // parquet type is not supported or some error occurs during the initialisation of the
+  // ParquetBloomFilter object, it is set to NULL.
+  unique_ptr<ParquetBloomFilter> parquet_bloom_filter_;
+
   // Converts a slot pointer to a raw value suitable for encoding
   inline T* CastValue(void* value) {
     return reinterpret_cast<T*>(value);
   }
+
+  Status InitParquetBloomFilter() WARN_UNUSED_RESULT {
+    DCHECK(parquet_bloom_filter_state_ != ParquetBloomFilterState::DISABLED);
+    const ColumnType& impala_type = type();
+    if (!IsParquetBloomFilterSupported(parquet_type_, impala_type)) {
+      stringstream ss;
+      ss << "Parquet Bloom filtering not supported for parquet type " << parquet_type_
+          << " and impala type " << impala_type << ".";
+      return Status::Expected(ss.str());
+    }
+
+    parquet_bloom_filter_buffer_.Release();
+    if (!parquet_bloom_filter_buffer_.TryAllocate(parquet_bloom_filter_bytes_)) {
+      parquet_bloom_filter_state_ = ParquetBloomFilterState::FAILED;
+      return Status(Substitute("Could not allocate buffer of $0 bytes for Parquet "
+            "Bloom filter data when writing column '$1'.",
+            parquet_bloom_filter_bytes_, column_name()));
+    }
+    std::memset(parquet_bloom_filter_buffer_.buffer(), 0,
+        parquet_bloom_filter_buffer_.Size());
+
+    parquet_bloom_filter_ = make_unique<ParquetBloomFilter>();
+    Status status = parquet_bloom_filter_->Init(parquet_bloom_filter_buffer_.buffer(),
+          parquet_bloom_filter_buffer_.Size(), true);
+
+    if (!status.ok()) {
+      parquet_bloom_filter_state_ = ParquetBloomFilterState::FAILED;
+      return status;
+    } else {
+      const bool should_wait_for_fallback =
+          (GetBloomFilterWriteOption() == TParquetBloomFilterWrite::IF_NO_DICT)
+          && IsDictionaryEncoding(current_encoding_);
+      parquet_bloom_filter_state_ = should_wait_for_fallback
+          ? ParquetBloomFilterState::WAIT_FOR_FALLBACK_FROM_DICT
+          : ParquetBloomFilterState::ENABLED;;
+      return Status::OK();
+    }
+  }
+
+  void UpdateParquetBloomFilterIfNeeded(const void* val) {
+    if (parquet_bloom_filter_state_ == ParquetBloomFilterState::ENABLED) {
+      Status status = UpdateParquetBloomFilter(val);
+      if (!status.ok()) {
+        // If an error happens, for example conversion to the form expected by the Bloom
+        // filter fails, we stop writing the Bloom filter and release resources associated
+        // with it.
+        VLOG(google::WARNING)
+            << "An error happened updating Parquet Bloom filter in column "
+            << column_name() << " at row idx " << parent_->row_idx_ << "."
+            << " Error message: " << status.msg().msg();
+        parquet_bloom_filter_state_ = ParquetBloomFilterState::FAILED;
+        ReleaseParquetBloomFilterResources();
+      }
+    }
+  }
+
+  Status UpdateParquetBloomFilter(const void* val) WARN_UNUSED_RESULT {
+    DCHECK(parquet_bloom_filter_state_ == ParquetBloomFilterState::ENABLED);
+    DCHECK(parquet_bloom_filter_ != nullptr);
+
+    uint8_t* ptr = nullptr;
+    size_t len = -1;
+    const ColumnType& impala_type = type();
+    RETURN_IF_ERROR(BytesToParquetType(val, impala_type, parquet_type_,
+        &parquet_bloom_conversion_buffer, &ptr, &len));
+    DCHECK(ptr != nullptr);
+    DCHECK(len != -1);
+    parquet_bloom_filter_->HashAndInsert(ptr, len);
+
+    return Status::OK();
+  }
+
+  void ReleaseParquetBloomFilterResources() {
+    parquet_bloom_filter_ = nullptr;
+    parquet_bloom_filter_buffer_.Release();
+  }
+
+  void FlushDictionaryToParquetBloomFilterIfNeeded() {
+    if (parquet_bloom_filter_state_
+        == ParquetBloomFilterState::WAIT_FOR_FALLBACK_FROM_DICT) {
+      parquet_bloom_filter_state_ = ParquetBloomFilterState::ENABLED;
+
+      // Write dictionary keys to Parquet Bloom filter if we haven't been filling it so
+      // far (and Bloom filtering is enabled). If there are too many values for a
+      // dictionary, a Bloom filter may still be useful.
+      Status status = DictKeysToParquetBloomFilter();
+      if (!status.ok()) {
+        VLOG(google::WARNING)
+            << "Failed to add dictionary keys to Parquet Bloom filter for column "
+            << column_name()
+            << " when falling back from dictionary encoding to plain encoding."
+            << " Error message: " << status.msg().msg();
+        parquet_bloom_filter_state_ = ParquetBloomFilterState::FAILED;
+        ReleaseParquetBloomFilterResources();
+      }
+    }
+  }
+
+  Status DictKeysToParquetBloomFilter() {
+    return dict_encoder_->ForEachDictKey([this](const T& value) {
+        return UpdateParquetBloomFilter(&value);
+        });
+  }
+
+  TParquetBloomFilterWrite::type GetBloomFilterWriteOption() {
+    return parent_->state_->query_options().parquet_bloom_filter_write;
+  }
+
  protected:
   // Size of each encoded value in plain encoding. -1 if the type is variable-length.
   int64_t plain_encoded_value_size_;
@@ -1386,6 +1592,43 @@ Status HdfsParquetTableWriter::WriteFileHeader() {
   return Status::OK();
 }
 
+Status HdfsParquetTableWriter::WriteParquetBloomFilter(BaseColumnWriter* col_writer,
+    parquet::ColumnMetaData* meta_data) {
+  DCHECK(col_writer != nullptr);
+  DCHECK(meta_data != nullptr);
+
+  const ParquetBloomFilter* bloom_filter = col_writer->GetParquetBloomFilter();
+  if (bloom_filter == nullptr || bloom_filter->AlwaysFalse()) {
+    // If there is no Bloom filter for this column or if it is empty we don't need to do
+    // anything.
+    // If bloom_filter->AlwaysFalse() is true, it means the Bloom filter was initialised
+    // but no element was inserted, probably because we have not fallen back to plain
+    // encoding from dictionary encoding.
+    return Status::OK();
+  }
+
+  // Update metadata.
+  meta_data->__set_bloom_filter_offset(file_pos_);
+
+  // Write the header to the file.
+  parquet::BloomFilterHeader header = CreateBloomFilterHeader(*bloom_filter);
+  uint8_t* buffer = nullptr;
+  uint32_t len = 0;
+  RETURN_IF_ERROR(thrift_serializer_->SerializeToBuffer(&header, &len, &buffer));
+  DCHECK(buffer != nullptr);
+  DCHECK_GT(len, 0);
+  RETURN_IF_ERROR(Write(buffer, len));
+  file_pos_ += len;
+
+  // Write the Bloom filter directory (bitset) to the file.
+  const uint8_t* directory = bloom_filter->directory();
+  const int64_t directory_size = bloom_filter->directory_size();
+  RETURN_IF_ERROR(Write(directory, directory_size));
+  file_pos_ += directory_size;
+
+  return Status::OK();
+}
+
 Status HdfsParquetTableWriter::FlushCurrentRowGroup() {
   if (current_row_group_ == nullptr) return Status::OK();
 
@@ -1442,6 +1685,10 @@ Status HdfsParquetTableWriter::FlushCurrentRowGroup() {
     // Build column statistics and add them to the header.
     col_writer->EncodeRowGroupStats(&current_row_group_->columns[i].meta_data);
 
+    // Write Bloom filter and update metadata.
+    RETURN_IF_ERROR(WriteParquetBloomFilter(col_writer,
+        &current_row_group_->columns[i].meta_data));
+
     // Since we don't supported complex schemas, all columns should have the same
     // number of values.
     DCHECK_EQ(current_row_group_->columns[0].meta_data.num_values,
diff --git a/be/src/exec/parquet/hdfs-parquet-table-writer.h b/be/src/exec/parquet/hdfs-parquet-table-writer.h
index aadad1f..cdc6c08 100644
--- a/be/src/exec/parquet/hdfs-parquet-table-writer.h
+++ b/be/src/exec/parquet/hdfs-parquet-table-writer.h
@@ -149,6 +149,11 @@ class HdfsParquetTableWriter : public HdfsTableWriter {
   /// Writes the file metadata and footer.
   Status WriteFileFooter();
 
+  /// Writes the ParquetBloomFilter of 'col_writer' if it has one, including the header,
+  /// and updates '*meta_data'.
+  Status WriteParquetBloomFilter(BaseColumnWriter* col_writer,
+      parquet::ColumnMetaData* meta_data) WARN_UNUSED_RESULT;
+
   /// Flushes the current row group to file.  This will compute the final
   /// offsets of column chunks, updating the file metadata.
   Status FlushCurrentRowGroup();
diff --git a/be/src/exec/parquet/parquet-bloom-filter-util.cc b/be/src/exec/parquet/parquet-bloom-filter-util.cc
index 08893c2..002c98f 100644
--- a/be/src/exec/parquet/parquet-bloom-filter-util.cc
+++ b/be/src/exec/parquet/parquet-bloom-filter-util.cc
@@ -216,4 +216,14 @@ Status LiteralToParquetType(const Literal& literal, ScalarExprEvaluator* eval,
   return BytesToParquetType(value, literal.type(), parquet_type, storage, ptr, len);
 }
 
+parquet::BloomFilterHeader CreateBloomFilterHeader(const ParquetBloomFilter& bloom_filter)
+{
+  parquet::BloomFilterHeader header;
+  header.algorithm.__set_BLOCK(parquet::SplitBlockAlgorithm());
+  header.hash.__set_XXHASH(parquet::XxHash());
+  header.compression.__set_UNCOMPRESSED(parquet::Uncompressed());
+  header.__set_numBytes(bloom_filter.directory_size());
+  return header;
+}
+
 } // namespace impala
diff --git a/be/src/exec/parquet/parquet-bloom-filter-util.h b/be/src/exec/parquet/parquet-bloom-filter-util.h
index 911be1b..a3f4c84 100644
--- a/be/src/exec/parquet/parquet-bloom-filter-util.h
+++ b/be/src/exec/parquet/parquet-bloom-filter-util.h
@@ -57,5 +57,8 @@ Status LiteralToParquetType(const Literal& literal, ScalarExprEvaluator* eval,
     const parquet::Type::type& parquet_type, vector<uint8_t>* storage,
     uint8_t** ptr, size_t* len) WARN_UNUSED_RESULT;
 
+/// Creates a 'parquet::BloomFilterHeader' object based on 'bloom_filter'.
+parquet::BloomFilterHeader CreateBloomFilterHeader(
+    const ParquetBloomFilter& bloom_filter);
 
 } // namespace impala
diff --git a/be/src/service/query-options.cc b/be/src/service/query-options.cc
index 5ba518a..e92e81f 100644
--- a/be/src/service/query-options.cc
+++ b/be/src/service/query-options.cc
@@ -514,6 +514,13 @@ Status impala::SetQueryOption(const string& key, const string& value,
         query_options->__set_parquet_bloom_filtering(IsTrue(value));
         break;
       }
+      case TImpalaQueryOptions::PARQUET_BLOOM_FILTER_WRITE: {
+        TParquetBloomFilterWrite::type enum_type;
+        RETURN_IF_ERROR(GetThriftEnum(value, "Parquet Bloom filter write",
+           _TParquetBloomFilterWrite_VALUES_TO_NAMES, &enum_type));
+        query_options->__set_parquet_bloom_filter_write(enum_type);
+        break;
+      }
       case TImpalaQueryOptions::PARQUET_READ_STATISTICS: {
         query_options->__set_parquet_read_statistics(IsTrue(value));
         break;
diff --git a/be/src/service/query-options.h b/be/src/service/query-options.h
index 943e906..65113cb 100644
--- a/be/src/service/query-options.h
+++ b/be/src/service/query-options.h
@@ -47,7 +47,7 @@ typedef std::unordered_map<string, beeswax::TQueryOptionLevel::type>
 // time we add or remove a query option to/from the enum TImpalaQueryOptions.
 #define QUERY_OPTS_TABLE\
   DCHECK_EQ(_TImpalaQueryOptions_VALUES_TO_NAMES.size(),\
-      TImpalaQueryOptions::MINMAX_FILTER_PARTITION_COLUMNS + 1);\
+      TImpalaQueryOptions::PARQUET_BLOOM_FILTER_WRITE + 1);\
   REMOVED_QUERY_OPT_FN(abort_on_default_limit_exceeded, ABORT_ON_DEFAULT_LIMIT_EXCEEDED)\
   QUERY_OPT_FN(abort_on_error, ABORT_ON_ERROR, TQueryOptionLevel::REGULAR)\
   REMOVED_QUERY_OPT_FN(allow_unsupported_formats, ALLOW_UNSUPPORTED_FORMATS)\
@@ -257,6 +257,8 @@ typedef std::unordered_map<string, beeswax::TQueryOptionLevel::type>
       TQueryOptionLevel::DEVELOPMENT)\
   QUERY_OPT_FN(minmax_filter_partition_columns, MINMAX_FILTER_PARTITION_COLUMNS,\
       TQueryOptionLevel::ADVANCED)\
+  QUERY_OPT_FN(parquet_bloom_filter_write, PARQUET_BLOOM_FILTER_WRITE,\
+      TQueryOptionLevel::ADVANCED)\
   ;
 
 /// Enforce practical limits on some query options to avoid undesired query state.
diff --git a/be/src/util/CMakeLists.txt b/be/src/util/CMakeLists.txt
index 168112a..6c7643f 100644
--- a/be/src/util/CMakeLists.txt
+++ b/be/src/util/CMakeLists.txt
@@ -110,6 +110,32 @@ set(UTIL_SRCS
   ${MUSTACHE_SRC_DIR}/mustache.cc
   ${MPFIT_SRC_DIR}/mpfit.c)
 
+# Detect AVX2 support
+set(AVX2_CMD "echo | ${CMAKE_CXX_COMPILER} -mavx2 -dM -E - | awk '$2 == \"__AVX2__\" { print $3 }'")
+execute_process(
+  COMMAND bash -c ${AVX2_CMD}
+  OUTPUT_VARIABLE AVX2_SUPPORT
+  OUTPUT_STRIP_TRAILING_WHITESPACE
+)
+
+# parquet-bloom-filter-avx2.cc uses AVX2 operations.
+if (AVX2_SUPPORT)
+  list(APPEND UTIL_SRCS parquet-bloom-filter-avx2.cc)
+
+  set_source_files_properties(parquet-bloom-filter-avx2.cc PROPERTIES COMPILE_FLAGS "-mavx2")
+  # parquet-bloom-filter-avx2.cc is not compiled explicitly with AVX2
+  # instructions(-mavx2) but it needs to know at compile time whether AVX2 support is
+  # available, hence the custom definition instead of relying on __AVX2__ defined by
+  # compiler with -mavx2.
+  # This is beause it derives from Kudu code at
+  # be/src/kudu/util/block_bloom_filter_avx2.cc.
+  set_source_files_properties(parquet-bloom-filter-avx2.cc parquet-bloom-filter.cc
+                              PROPERTIES COMPILE_DEFINITIONS "USE_AVX2=1")
+  message("Compiler supports AVX2")
+else()
+  message("Compiler does not support AVX2")
+endif()
+
 add_library(Util ${UTIL_SRCS})
 add_dependencies(Util gen-deps gen_ir_descriptions)
 
diff --git a/be/src/util/debug-util.cc b/be/src/util/debug-util.cc
index 8dd6814..2c3f5be 100644
--- a/be/src/util/debug-util.cc
+++ b/be/src/util/debug-util.cc
@@ -105,7 +105,7 @@ PRINT_THRIFT_ENUM_IMPL(TEnabledRuntimeFilterTypes)
 PRINT_THRIFT_ENUM_IMPL(TMinmaxFilteringLevel)
 PRINT_THRIFT_ENUM_IMPL(TKuduReplicaSelection)
 PRINT_THRIFT_ENUM_IMPL(TMinmaxFilterFastCodePathMode)
-
+PRINT_THRIFT_ENUM_IMPL(TParquetBloomFilterWrite)
 
 string PrintId(const TUniqueId& id, const string& separator) {
   stringstream out;
diff --git a/be/src/util/debug-util.h b/be/src/util/debug-util.h
index 2cd0ae3..edec713 100644
--- a/be/src/util/debug-util.h
+++ b/be/src/util/debug-util.h
@@ -83,6 +83,7 @@ std::string PrintThriftEnum(const TEnabledRuntimeFilterTypes::type& value);
 std::string PrintThriftEnum(const TMinmaxFilteringLevel::type& value);
 std::string PrintThriftEnum(const TKuduReplicaSelection::type& value);
 std::string PrintThriftEnum(const TMinmaxFilterFastCodePathMode::type& value);
+std::string PrintThriftEnum(const TParquetBloomFilterWrite::type& value);
 
 std::string PrintTuple(const Tuple* t, const TupleDescriptor& d);
 std::string PrintRow(TupleRow* row, const RowDescriptor& d);
diff --git a/be/src/util/dict-encoding.h b/be/src/util/dict-encoding.h
index e6e01bc..3a33801 100644
--- a/be/src/util/dict-encoding.h
+++ b/be/src/util/dict-encoding.h
@@ -188,6 +188,20 @@ class DictEncoder : public DictEncoderBase {
 
   virtual int num_entries() const { return nodes_.size(); }
 
+  /// Execute 'func' for each key that is present in the dictionary. Stops execution the
+  /// first time 'func' returns an error, propagating the error. Returns OK otherwise.
+  ///
+  /// Can be useful if we fall back to plain encoding from dict encoding but still want to
+  /// use a Bloom filter. In this case the filter can be filled with all elements that
+  /// have occured so far.
+  Status ForEachDictKey(const std::function<Status(const T&)>& func) {
+    for (auto pair : nodes_) {
+      RETURN_IF_ERROR(func(pair.value));
+    }
+
+    return Status::OK();
+  }
+
  private:
   /// Size of the table. Must be a power of 2.
   enum { HASH_TABLE_SIZE = 1 << 16 };
diff --git a/be/src/util/parquet-bloom-filter-avx2.cc b/be/src/util/parquet-bloom-filter-avx2.cc
new file mode 100644
index 0000000..52fb978
--- /dev/null
+++ b/be/src/util/parquet-bloom-filter-avx2.cc
@@ -0,0 +1,92 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// This file is partially a copy of Kudu BlockBloomFilter code. We wanted to reuse the
+// existing implementation but also extend/modify some parts. This would not have been
+// possible without modifying the Kudu source code in Impala
+// (be/src/kudu/util/block_bloom_filter*). On the other hand, we have to maintain binary
+// compatibility between the the Kudu code in Impala and actual Kudu code, so we decided
+// against modifying the code in be/src/kudu/util/block_bloom_filter*.
+
+// This file is conditionally compiled if compiler supports AVX2. However the tidy bot
+// appears to compile this file regardless and does not define the USE_AVX2 macro raising
+// incorrect errors.
+
+// This file is partially a copy of Kudu BlockBloomFilter code. We wanted to reuse the
+// existing implementation but also extend/modify some parts. This would not have been
+// possible without modifying the Kudu source code in Impala
+// (be/src/kudu/util/block_bloom_filter*). On the other hand, we have to maintain binary
+// compatibility between the the Kudu code in Impala and actual Kudu code, so we decided
+// against modifying the code in be/src/kudu/util/block_bloom_filter*.
+#if defined(CLANG_TIDY)
+#define USE_AVX2 1
+#endif
+
+#include "util/parquet-bloom-filter.h"
+
+#include <immintrin.h>
+
+#include <cstddef>
+#include <cstdint>
+#include <ostream>
+
+#include <glog/logging.h>
+
+#include "kudu/gutil/port.h"
+
+namespace impala {
+
+// A static helper function for the AVX2 methods. Turns a 32-bit hash into a 256-bit
+// Bucket with 1 single 1-bit set in each 32-bit lane.
+static inline ATTRIBUTE_ALWAYS_INLINE __attribute__((__target__("avx2"))) __m256i
+MakeMask(const uint32_t hash) {
+  const __m256i ones = _mm256_set1_epi32(1);
+  const __m256i rehash = _mm256_setr_epi32(BLOOM_HASH_CONSTANTS);
+  // Load hash into a YMM register, repeated eight times
+  __m256i hash_data = _mm256_set1_epi32(hash);
+  // Multiply-shift hashing ala Dietzfelbinger et al.: multiply 'hash' by eight different
+  // odd constants, then keep the 5 most significant bits from each product.
+  hash_data = _mm256_mullo_epi32(rehash, hash_data);
+  hash_data = _mm256_srli_epi32(hash_data, 27);
+  // Use these 5 bits to shift a single bit to a location in each 32-bit lane
+  return _mm256_sllv_epi32(ones, hash_data);
+}
+
+void ParquetBloomFilter::BucketInsertAVX2(const uint32_t bucket_idx,
+    const uint32_t hash) noexcept {
+  const __m256i mask = MakeMask(hash);
+  __m256i* const bucket = &(reinterpret_cast<__m256i*>(directory_)[bucket_idx]);
+  _mm256_store_si256(bucket, _mm256_or_si256(*bucket, mask));
+  // For SSE compatibility, unset the high bits of each YMM register so SSE instructions
+  // dont have to save them off before using XMM registers.
+  _mm256_zeroupper();
+}
+
+bool ParquetBloomFilter::BucketFindAVX2(const uint32_t bucket_idx,
+    const uint32_t hash) const noexcept {
+  const __m256i mask = MakeMask(hash);
+  const __m256i bucket = reinterpret_cast<__m256i*>(directory_)[bucket_idx];
+  // We should return true if 'bucket' has a one wherever 'mask' does. _mm256_testc_si256
+  // takes the negation of its first argument and ands that with its second argument. In
+  // our case, the result is zero everywhere iff there is a one in 'bucket' wherever
+  // 'mask' is one. testc returns 1 if the result is 0 everywhere and returns 0 otherwise.
+  const bool result = _mm256_testc_si256(bucket, mask);
+  _mm256_zeroupper();
+  return result;
+}
+
+} // namespace kudu
diff --git a/be/src/util/parquet-bloom-filter-test.cc b/be/src/util/parquet-bloom-filter-test.cc
index 96570b1..7ae3449 100644
--- a/be/src/util/parquet-bloom-filter-test.cc
+++ b/be/src/util/parquet-bloom-filter-test.cc
@@ -67,7 +67,7 @@ BloomWrapper CreateBloomFilter(int ndv, double fpp) {
   BloomWrapper res;
   res.bloom = std::make_unique<ParquetBloomFilter>();
   res.storage = std::make_unique<vector<uint8_t>>(storage_size, 0);
-  Status status = res.bloom->Init(res.storage->data(), storage_size);
+  Status status = res.bloom->Init(res.storage->data(), storage_size, true);
   EXPECT_TRUE(status.ok()) << status.GetDetail();
   return res;
 }
@@ -113,7 +113,7 @@ TEST(ParquetBloomFilter, Find) {
   }
 }
 
-TEST(ParquetBloomFilter, TestHashAndFind) {
+TEST(ParquetBloomFilter, HashAndFind) {
   srand(0);
   for (int ndv = 100; ndv <= 100000; ndv *= 10) {
     BloomWrapper wrapper = CreateBloomFilter(ndv, 0.01);
diff --git a/be/src/util/parquet-bloom-filter.cc b/be/src/util/parquet-bloom-filter.cc
index 93116b9..d1f9366 100644
--- a/be/src/util/parquet-bloom-filter.cc
+++ b/be/src/util/parquet-bloom-filter.cc
@@ -41,6 +41,13 @@
 using namespace std;
 using strings::Substitute;
 
+// TODO: Reconcile with legacy AVX support.
+DEFINE_bool(disable_parquetbloomfilter_avx2, false,
+    "Disable AVX2 operations in ParquetBloomFilter. This flag has no effect if the "
+    "target CPU doesn't support AVX2 at run-time or ParquetBloomFilter was built with "
+    "a compiler that doesn't support AVX2.");
+DECLARE_bool(enable_legacy_avx_support);
+
 namespace impala {
 
 // This is needed to avoid undefined reference errors.
@@ -51,12 +58,28 @@ constexpr uint32_t ParquetBloomFilter::SALT[8] __attribute__((aligned(32)));
 ParquetBloomFilter::ParquetBloomFilter() :
   log_num_buckets_(0),
   directory_mask_(0),
-  directory_(nullptr) {
+  directory_(nullptr),
+  always_false_(false) {
+#ifdef USE_AVX2
+  if (has_avx2()) {
+    bucket_insert_func_ptr_ = &ParquetBloomFilter::BucketInsertAVX2;
+    bucket_find_func_ptr_ = &ParquetBloomFilter::BucketFindAVX2;
+  } else {
+    bucket_insert_func_ptr_ = &ParquetBloomFilter::BucketInsert;
+    bucket_find_func_ptr_ = &ParquetBloomFilter::BucketFind;
+  }
+#else
+  bucket_insert_func_ptr_ = &ParquetBloomFilter::BucketInsert;
+  bucket_find_func_ptr_ = &ParquetBloomFilter::BucketFind;
+#endif
+
+  DCHECK(bucket_insert_func_ptr_);
+  DCHECK(bucket_find_func_ptr_);
 }
 
 ParquetBloomFilter::~ParquetBloomFilter() {}
 
-Status ParquetBloomFilter::Init(uint8_t* directory, size_t dir_size) {
+Status ParquetBloomFilter::Init(uint8_t* directory, size_t dir_size, bool always_false) {
   const int log_space_bytes = std::log2(dir_size);
   DCHECK_EQ(1ULL << log_space_bytes, dir_size);
 
@@ -70,8 +93,16 @@ Status ParquetBloomFilter::Init(uint8_t* directory, size_t dir_size) {
           log_space_bytes));
   }
   DCHECK_EQ(directory_size(), dir_size);
+  DCHECK(directory != nullptr);
   directory_ = reinterpret_cast<Bucket*>(directory);
 
+  if (always_false) {
+    // Check the assumption that the directory is empty.
+    DCHECK(std::all_of(directory, directory + dir_size,
+          [](uint8_t byte) { return byte == 0; }));
+    always_false_ = true;
+  }
+
   // Don't use log_num_buckets_ if it will lead to undefined behavior by a shift
   // that is too large.
   directory_mask_ = (1ULL << log_num_buckets_) - 1;
@@ -79,9 +110,11 @@ Status ParquetBloomFilter::Init(uint8_t* directory, size_t dir_size) {
 }
 
 void ParquetBloomFilter::Insert(const uint64_t hash) noexcept {
+  always_false_ = false;
   uint32_t idx = DetermineBucketIdx(hash);
   uint32_t hash_lower = hash;
-  BucketInsert(idx, hash_lower);
+  DCHECK(bucket_insert_func_ptr_);
+  (this->*bucket_insert_func_ptr_)(idx, hash_lower);
 }
 
 void ParquetBloomFilter::HashAndInsert(const uint8_t* input, size_t size) noexcept {
@@ -90,9 +123,11 @@ void ParquetBloomFilter::HashAndInsert(const uint8_t* input, size_t size) noexce
 }
 
 bool ParquetBloomFilter::Find(const uint64_t hash) const noexcept {
+  if (always_false_) return false;
   uint32_t idx = DetermineBucketIdx(hash);
   uint32_t hash_lower = hash;
-  return BucketFind(idx, hash_lower);
+  DCHECK(bucket_find_func_ptr_);
+  return (this->*bucket_find_func_ptr_)(idx, hash_lower);
 }
 
 bool ParquetBloomFilter::HashAndFind(const uint8_t* input, size_t size) const noexcept {
@@ -135,7 +170,6 @@ uint64_t ParquetBloomFilter::Hash(const uint8_t* input, size_t size) {
   return hash;
 }
 
-#ifdef __aarch64__
 ATTRIBUTE_NO_SANITIZE_INTEGER
 void ParquetBloomFilter::BucketInsert(const uint32_t bucket_idx,
     const uint32_t hash) noexcept {
@@ -170,45 +204,10 @@ bool ParquetBloomFilter::BucketFind(
   }
   return true;
 }
-#else
-// A static helper function for the AVX2 methods. Turns a 32-bit hash into a 256-bit
-// Bucket with 1 single 1-bit set in each 32-bit lane.
-static inline ATTRIBUTE_ALWAYS_INLINE __attribute__((__target__("avx2"))) __m256i
-MakeMask(const uint32_t hash) {
-  const __m256i ones = _mm256_set1_epi32(1);
-  const __m256i rehash = _mm256_setr_epi32(BLOOM_HASH_CONSTANTS);
-  // Load hash into a YMM register, repeated eight times
-  __m256i hash_data = _mm256_set1_epi32(hash);
-  // Multiply-shift hashing ala Dietzfelbinger et al.: multiply 'hash' by eight different
-  // odd constants, then keep the 5 most significant bits from each product.
-  hash_data = _mm256_mullo_epi32(rehash, hash_data);
-  hash_data = _mm256_srli_epi32(hash_data, 27);
-  // Use these 5 bits to shift a single bit to a location in each 32-bit lane
-  return _mm256_sllv_epi32(ones, hash_data);
-}
-
-void ParquetBloomFilter::BucketInsert(const uint32_t bucket_idx,
-    const uint32_t hash) noexcept {
-  const __m256i mask = MakeMask(hash);
-  __m256i* const bucket = &(reinterpret_cast<__m256i*>(directory_)[bucket_idx]);
-  _mm256_store_si256(bucket, _mm256_or_si256(*bucket, mask));
-  // For SSE compatibility, unset the high bits of each YMM register so SSE instructions
-  // dont have to save them off before using XMM registers.
-  _mm256_zeroupper();
-}
 
-bool ParquetBloomFilter::BucketFind(const uint32_t bucket_idx,
-    const uint32_t hash) const noexcept {
-  const __m256i mask = MakeMask(hash);
-  const __m256i bucket = reinterpret_cast<__m256i*>(directory_)[bucket_idx];
-  // We should return true if 'bucket' has a one wherever 'mask' does. _mm256_testc_si256
-  // takes the negation of its first argument and ands that with its second argument. In
-  // our case, the result is zero everywhere iff there is a one in 'bucket' wherever
-  // 'mask' is one. testc returns 1 if the result is 0 everywhere and returns 0 otherwise.
-  const bool result = _mm256_testc_si256(bucket, mask);
-  _mm256_zeroupper();
-  return result;
+bool ParquetBloomFilter::has_avx2() {
+  return !FLAGS_disable_parquetbloomfilter_avx2 && !FLAGS_enable_legacy_avx_support
+      && CpuInfo::IsSupported(CpuInfo::AVX2);
 }
-#endif // #ifdef __aarch64__
 
 } // namespace impala
diff --git a/be/src/util/parquet-bloom-filter.h b/be/src/util/parquet-bloom-filter.h
index dfc6e91..c3404d0 100644
--- a/be/src/util/parquet-bloom-filter.h
+++ b/be/src/util/parquet-bloom-filter.h
@@ -38,7 +38,10 @@ class ParquetBloomFilter {
   /// Initialises the directory (bitset) of the Bloom filter. The data is not copied and
   /// is not owned by this object. The buffer must be valid as long as this object uses
   /// it.
-  Status Init(uint8_t* directory, size_t dir_size);
+  /// If 'always_false_' is true, the implementation assumes that the directory is empty.
+  /// If the directory contains any bytes other than zero, 'always_false_' should be
+  /// false.
+  Status Init(uint8_t* directory, size_t dir_size, bool always_false);
 
   void Insert(const uint64_t hash) noexcept;
   void HashAndInsert(const uint8_t* input, size_t size) noexcept;
@@ -48,6 +51,19 @@ class ParquetBloomFilter {
   bool Find(const uint64_t hash) const noexcept;
   bool HashAndFind(const uint8_t* input, size_t size) const noexcept;
 
+  const uint8_t* directory() const {
+    return reinterpret_cast<const uint8_t*>(directory_);
+  }
+
+  // Size of the internal directory structure in bytes.
+  int64_t directory_size() const {
+    return 1ULL << log_space_bytes();
+  }
+
+  bool AlwaysFalse() const {
+    return always_false_;
+  }
+
   static int OptimalByteSize(const size_t ndv, const double fpp);
 
   // If we expect to fill a Bloom filter with 'ndv' different unique elements and we
@@ -95,6 +111,9 @@ class ParquetBloomFilter {
   static constexpr uint32_t SALT[8]
       __attribute__((aligned(32))) = {BLOOM_HASH_CONSTANTS};
 
+  // Detect at run-time whether CPU supports AVX2
+  static bool has_avx2();
+
   // log_num_buckets_ is the log (base 2) of the number of buckets in the directory.
   int log_num_buckets_;
 
@@ -104,30 +123,36 @@ class ParquetBloomFilter {
 
   Bucket* directory_;
 
-/// AVX2 is not available on ARM so we have to implement the functions differently.
-#ifdef __aarch64__
+  // Indicates whether the Bloom filter is empty and therefore all *Find* calls will
+  // return false without further checks.
+  bool always_false_;
+
   // Does the actual work of Insert(). bucket_idx is the index of the bucket to insert
   // into and 'hash' is the value passed to Insert().
-  void BucketInsert(const uint32_t bucket_idx, const uint32_t hash) noexcept;
-  bool BucketFind(const uint32_t bucket_idx, const uint32_t hash) const noexcept;
-#else
-  // Same as above but using AVX2.
-  void BucketInsert(const uint32_t bucket_idx, const uint32_t hash) noexcept
-    __attribute__((__target__("avx2")));
-  bool BucketFind(const uint32_t bucket_idx, const uint32_t hash) const noexcept
-    __attribute__((__target__("avx2")));
+  void BucketInsert(uint32_t bucket_idx, uint32_t hash) noexcept;
+
+  bool BucketFind(uint32_t bucket_idx, uint32_t hash) const noexcept;
+
+#ifdef USE_AVX2
+  // A faster SIMD version of BucketInsert().
+  void BucketInsertAVX2(const uint32_t bucket_idx, const uint32_t hash) noexcept
+      __attribute__((__target__("avx2")));
+
+  // A faster SIMD version of BucketFind().
+  bool BucketFindAVX2(const uint32_t bucket_idx, const uint32_t hash) const noexcept
+      __attribute__((__target__("avx2")));
 #endif
 
+  // Function pointers initialized in the constructor to avoid run-time cost in hot-path
+  // of Find and Insert operations.
+  decltype(&ParquetBloomFilter::BucketInsert) bucket_insert_func_ptr_;
+  decltype(&ParquetBloomFilter::BucketFind) bucket_find_func_ptr_;
+
   // Returns amount of space used in log2 bytes.
   int log_space_bytes() const {
     return log_num_buckets_ + kLogBucketByteSize;
   }
 
-  // Size of the internal directory structure in bytes.
-  int64_t directory_size() const {
-    return 1ULL << log_space_bytes();
-  }
-
   uint32_t DetermineBucketIdx(const uint64_t hash) const noexcept {
     const uint64_t hash_top_bits = hash >> 32;
     const uint64_t num_buckets = 1ULL << log_num_buckets_;
diff --git a/common/thrift/DataSinks.thrift b/common/thrift/DataSinks.thrift
index 1a9cdfe..03f6a69 100644
--- a/common/thrift/DataSinks.thrift
+++ b/common/thrift/DataSinks.thrift
@@ -97,6 +97,10 @@ struct THdfsTableSink {
   // Indicates how deep into the partition specification in which to start creating
   // partition directories
   10: optional i32 external_output_partition_depth;
+
+  // Mapping from column names to Parquet Bloom filter bitset sizes. Columns for which no
+  // Parquet Bloom filter should be written should not be listed here.
+  11: optional map<string, i64> parquet_bloom_filter_col_info;
 }
 
 // Structure to encapsulate specific options that are passed down to the KuduTableSink
diff --git a/common/thrift/ImpalaService.thrift b/common/thrift/ImpalaService.thrift
index 94109e0..ad0d1fc 100644
--- a/common/thrift/ImpalaService.thrift
+++ b/common/thrift/ImpalaService.thrift
@@ -684,6 +684,15 @@ enum TImpalaQueryOptions {
 
   // Indicates whether to use min/max filtering on partition columns
   MINMAX_FILTER_PARTITION_COLUMNS = 133
+
+  // Controls when to write Parquet Bloom filters.
+  //     NEVER      - never write Parquet Bloom filters
+  //     TBL_PROPS  - write Parquet Bloom filters as set in table properties
+  //     IF_NO_DICT - write Parquet Bloom filters if the row group is not fully
+  //                  dictionary encoded
+  //     ALWAYS     - always write Parquet Bloom filters, even if the row group is fully
+  //                  dictionary encoded
+  PARQUET_BLOOM_FILTER_WRITE = 134
 }
 
 // The summary of a DML statement.
diff --git a/common/thrift/Query.thrift b/common/thrift/Query.thrift
index d061578..f5c3cd0 100644
--- a/common/thrift/Query.thrift
+++ b/common/thrift/Query.thrift
@@ -85,6 +85,20 @@ enum TMinmaxFilterFastCodePathMode {
   VERIFICATION=2
 }
 
+// Options for when to write Parquet Bloom filters for supported types.
+enum TParquetBloomFilterWrite {
+  // Never write Parquet Bloom filters.
+  NEVER,
+
+  // Write Parquet Bloom filters if specified in the table properties AND the row group
+  // is not fully dictionary encoded.
+  IF_NO_DICT,
+
+  // Always write Parquet Bloom filters if specified in the table properties,
+  // even if the row group is fully dictionary encoded.
+  ALWAYS
+}
+
 // constants for TQueryOptions.num_nodes
 const i32 NUM_NODES_ALL = 0
 const i32 NUM_NODES_ALL_RACKS = -1
@@ -528,6 +542,10 @@ struct TQueryOptions {
 
   // See comment in ImpalaService.thrift
   134: optional bool minmax_filter_partition_columns = true;
+
+  // See comment in ImpalaService.thrift
+  135: optional TParquetBloomFilterWrite parquet_bloom_filter_write =
+      TParquetBloomFilterWrite.IF_NO_DICT;
 }
 
 // Impala currently has three types of sessions: Beeswax, HiveServer2 and external
diff --git a/fe/src/main/java/org/apache/impala/planner/HdfsTableSink.java b/fe/src/main/java/org/apache/impala/planner/HdfsTableSink.java
index 0fc4e96..76b68a1 100644
--- a/fe/src/main/java/org/apache/impala/planner/HdfsTableSink.java
+++ b/fe/src/main/java/org/apache/impala/planner/HdfsTableSink.java
@@ -18,6 +18,8 @@
 package org.apache.impala.planner;
 
 import java.util.ArrayList;
+import java.util.Map;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Set;
 
@@ -36,18 +38,36 @@ import org.apache.impala.thrift.TQueryOptions;
 import org.apache.impala.thrift.TSortingOrder;
 import org.apache.impala.thrift.TTableSink;
 import org.apache.impala.thrift.TTableSinkType;
+import org.apache.impala.util.BitUtil;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Sets;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 /**
  * Sink for inserting into filesystem-backed tables.
  *
  * TODO(vercegovac): rename to FsTableSink
  */
 public class HdfsTableSink extends TableSink {
+  private final static Logger LOG = LoggerFactory.getLogger(HdfsTableSink.class);
+
+  // The name of the table property that sets the parameters of writing Parquet Bloom
+  // filters.
+  public static final String PARQUET_BLOOM_FILTER_WRITING_TBL_PROPERTY =
+    "parquet.bloom.filter.columns";
+
+  // These constants are the maximal and minimal size of the bitset of a
+  // ParquetBloomFilter object (in the BE). These should be kept in sync with the values
+  // in be/src/util/parquet-bloom-filter.h.
+  public static final long PARQUET_BLOOM_FILTER_MAX_BYTES = 128 * 1024 * 1024;
+  public static final long PARQUET_BLOOM_FILTER_MIN_BYTES = 64;
+
   // Default number of partitions used for computeResourceProfile() in the absence of
   // column stats.
   protected final long DEFAULT_NUM_PARTITIONS = 10;
@@ -221,6 +241,55 @@ public class HdfsTableSink extends TableSink {
     return "HDFS WRITER";
   }
 
+  // The table property has the following format: a comma separated list of
+  // 'col_name:bitset_size' pairs. The 'bitset_size' part means the size of the bitset of
+  // the Bloom filter, and is optional. Values will be rounded up to the smallest power of
+  // 2 not less than the given number. If the size is not given, it will be the maximal
+  // Bloom filter size (PARQUET_BLOOM_FILTER_MAX_BYTES). No Bloom filter will be written
+  // for columns not listed here.
+  // Example: "col1:1024,col2,col4:100'.
+  @VisibleForTesting
+  static Map<String, Long> parseParquetBloomFilterWritingTblProp(final String tbl_prop) {
+    Map<String, Long> result = new HashMap<>();
+    String[] colSizePairs = tbl_prop.split(",");
+    for (String colSizePair : colSizePairs) {
+      String[] tokens = colSizePair.split(":");
+
+      if (tokens.length == 0 || tokens.length > 2) {
+        String err = "Invalid token in table property "
+          + PARQUET_BLOOM_FILTER_WRITING_TBL_PROPERTY + ": "
+          + colSizePair.trim()
+          + ". Expected either a column name or a column name and a size "
+          + "separated by a colon (';').";
+        LOG.warn(err);
+        return null;
+      }
+
+      long size;
+      if (tokens.length == 1) {
+        size = PARQUET_BLOOM_FILTER_MAX_BYTES;
+      } else {
+        assert tokens.length == 2;
+        try {
+          size = Long.parseLong(tokens[1].trim());
+        } catch (NumberFormatException e) {
+          String err =
+                "Invalid bitset size in table property "
+                + PARQUET_BLOOM_FILTER_WRITING_TBL_PROPERTY + ": "
+                + tokens[1].trim();
+          LOG.warn(err);
+          return null;
+        }
+
+        size = Long.max(PARQUET_BLOOM_FILTER_MIN_BYTES, size);
+        size = Long.min(PARQUET_BLOOM_FILTER_MAX_BYTES, size);
+        size = BitUtil.roundUpToPowerOf2(size);
+      }
+      result.put(tokens[0].trim(), size);
+    }
+    return result;
+  }
+
   @Override
   protected void toThriftImpl(TDataSink tsink) {
     THdfsTableSink hdfsTableSink = new THdfsTableSink(
@@ -234,6 +303,16 @@ public class HdfsTableSink extends TableSink {
     if (skipHeaderLineCount > 0) {
       hdfsTableSink.setSkip_header_line_count(skipHeaderLineCount);
     }
+
+    org.apache.hadoop.hive.metastore.api.Table msTbl = table.getMetaStoreTable();
+    Map<String, String> params = msTbl.getParameters();
+    String parquetBloomTblProp = params.get(PARQUET_BLOOM_FILTER_WRITING_TBL_PROPERTY);
+    if (parquetBloomTblProp != null) {
+      Map<String, Long> parsedProperties = parseParquetBloomFilterWritingTblProp(
+          parquetBloomTblProp);
+      hdfsTableSink.setParquet_bloom_filter_col_info(parsedProperties);
+    }
+
     hdfsTableSink.setSort_columns(sortColumns_);
     hdfsTableSink.setSorting_order(sortingOrder_);
     hdfsTableSink.setIs_result_sink(isResultSink_);
diff --git a/fe/src/test/java/org/apache/impala/planner/ParquetBloomFilterTblPropParserTest.java b/fe/src/test/java/org/apache/impala/planner/ParquetBloomFilterTblPropParserTest.java
new file mode 100644
index 0000000..b0ede1a
--- /dev/null
+++ b/fe/src/test/java/org/apache/impala/planner/ParquetBloomFilterTblPropParserTest.java
@@ -0,0 +1,88 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.planner;
+
+import org.apache.log4j.Logger;
+import org.junit.Test;
+import static org.junit.Assert.assertEquals;
+
+import com.google.common.collect.ImmutableMap;
+
+import java.util.List;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.Random;
+
+public class ParquetBloomFilterTblPropParserTest {
+  private static final Logger LOG = Logger.getLogger(
+      ParquetBloomFilterTblPropParserTest.class);
+
+  @Test
+  public void testParsingOnlyColNames() throws Exception {
+    final String props = "col1,col2,col3";
+    final Map<String, Long> exp = ImmutableMap.of(
+      "col1", HdfsTableSink.PARQUET_BLOOM_FILTER_MAX_BYTES,
+      "col2", HdfsTableSink.PARQUET_BLOOM_FILTER_MAX_BYTES,
+      "col3", HdfsTableSink.PARQUET_BLOOM_FILTER_MAX_BYTES
+      );
+
+    parseAndCheck(props, exp);
+  }
+
+  @Test
+  public void testParsingAllSizesGiven() {
+    final String props = "col1:128,col2:256,col3:64";
+    final Map<String, Long> exp = ImmutableMap.of(
+      "col1", 128l,
+      "col2", 256l,
+      "col3", 64l
+      );
+
+    parseAndCheck(props, exp);
+  }
+
+  @Test
+  public void testParsingSomeSizesGiven() {
+    final String props = "col1:128,col2,col3:64";
+    final Map<String, Long> exp = ImmutableMap.of(
+      "col1", 128l,
+      "col2", HdfsTableSink.PARQUET_BLOOM_FILTER_MAX_BYTES,
+      "col3", 64l
+    );
+
+    parseAndCheck(props, exp);
+  }
+
+  @Test
+  public void testParsingContainsWhitespace() {
+    final String props = "col1 : 128, col2, \ncol3: 64 \t";
+    final Map<String, Long> exp = ImmutableMap.of(
+      "col1", 128l,
+      "col2", HdfsTableSink.PARQUET_BLOOM_FILTER_MAX_BYTES,
+      "col3", 64l
+    );
+
+    parseAndCheck(props, exp);
+  }
+
+  private void parseAndCheck(final String tbl_props, final Map<String, Long> exp_res) {
+    final Map<String, Long> res = HdfsTableSink.parseParquetBloomFilterWritingTblProp(
+        tbl_props);
+    assertEquals(exp_res, res);
+  }
+}
diff --git a/tests/query_test/test_parquet_bloom_filter.py b/tests/query_test/test_parquet_bloom_filter.py
index 8ae2eac..998cfd3 100644
--- a/tests/query_test/test_parquet_bloom_filter.py
+++ b/tests/query_test/test_parquet_bloom_filter.py
@@ -15,8 +15,20 @@
 # specific language governing permissions and limitations
 # under the License.
 
+import math
+import os
+
+from collections import namedtuple
+from parquet.ttypes import BloomFilterHeader
+from subprocess import check_call
+
 from tests.common.file_utils import create_table_and_copy_files
 from tests.common.impala_test_suite import ImpalaTestSuite
+from tests.util.filesystem_utils import get_fs_path
+from tests.util.get_parquet_metadata import (
+    get_parquet_metadata,
+    read_serialized_object
+)
 
 
 class TestParquetBloomFilter(ImpalaTestSuite):
@@ -24,6 +36,35 @@ class TestParquetBloomFilter(ImpalaTestSuite):
   This suite tests Parquet Bloom filter optimizations.
   """
 
+  # Filename relative to $IMPALA_HOME. Some functions ('create_table_and_copy_files')
+  # prepend $IMPALA_HOME automatically, so we cannot include it here. Other functions,
+  # like 'get_parquet_metadata'  need a full path.
+  PARQUET_TEST_FILE = 'testdata/data/parquet-bloom-filtering.parquet'
+  BloomFilterData = namedtuple('BloomFilterData', ['header', 'directory'])
+
+  # The statement used to create the test tables.
+  create_stmt = 'create table {db}.{tbl} (        '\
+                '  int8_col TINYINT,              '\
+                '  int16_col SMALLINT,            '\
+                '  int32_col INT,                 '\
+                '  int64_col BIGINT,              '\
+                '  float_col FLOAT,               '\
+                '  double_col DOUBLE,             '\
+                '  string_col STRING,             '\
+                '  char_col VARCHAR(3)            '\
+                ')                                '\
+                'stored as parquet                '
+  set_tbl_props_stmt = 'alter table {db}.{tbl} '                            \
+                       'set TBLPROPERTIES ("parquet.bloom.filter.columns"="'\
+                       'int8_col   : 512,'                                  \
+                       'int16_col  : 512,'                                  \
+                       'int32_col  : 512,'                                  \
+                       'int64_col  : 512,'                                  \
+                       'float_col  : 512,'                                  \
+                       'double_col : 512,'                                  \
+                       'string_col : 512,'                                  \
+                       '");'
+
   @classmethod
   def get_workload(cls):
     return 'functional-query'
@@ -36,7 +77,7 @@ class TestParquetBloomFilter(ImpalaTestSuite):
 
   def test_parquet_bloom_filtering(self, vector, unique_database):
     """ Tests that Parquet Bloom filtering works when it is enabled. """
-    self.create_test_database(unique_database)
+    self._create_test_table_from_file(str(unique_database), self.PARQUET_TEST_FILE)
 
     # The test makes assumptions about the number of row groups that are processed and
     # skipped inside a fragment, so we ensure that the tests run in a single fragment.
@@ -44,9 +85,9 @@ class TestParquetBloomFilter(ImpalaTestSuite):
     vector.get_value('exec_option')['parquet_bloom_filtering'] = True
     self.run_test_case('QueryTest/parquet-bloom-filter', vector, unique_database)
 
-  def test_parquet_bloom_filtering_off(self, vector, unique_database):
+  def test_parquet_bloom_filtering_disabled(self, vector, unique_database):
     """ Check that there is no Parquet Bloom filtering when it is disabled. """
-    self.create_test_database(unique_database)
+    self._create_test_table_from_file(str(unique_database), self.PARQUET_TEST_FILE)
 
     # The test makes assumptions about the number of row groups that are processed and
     # skipped inside a fragment, so we ensure that the tests run in a single fragment.
@@ -54,18 +95,251 @@ class TestParquetBloomFilter(ImpalaTestSuite):
     vector.get_value('exec_option')['parquet_bloom_filtering'] = False
     self.run_test_case('QueryTest/parquet-bloom-filter-disabled', vector, unique_database)
 
-  def create_test_database(self, unique_database):
-    create_stmt = 'create table {db}.{tbl} (        '\
-                  '  int8_col TINYINT,              '\
-                  '  int16_col SMALLINT,            '\
-                  '  int32_col INT,                 '\
-                  '  int64_col BIGINT,              '\
-                  '  float_col FLOAT,               '\
-                  '  double_col DOUBLE,             '\
-                  '  string_col STRING,             '\
-                  '  char_col VARCHAR(3)            '\
-                  ')                                '\
-                  'stored as parquet                '
-    create_table_and_copy_files(self.client, create_stmt,
-                                unique_database, 'parquet_bloom_filter',
-                                ['testdata/data/parquet-bloom-filtering.parquet'])
+  def test_write_parquet_bloom_filter(self, vector, unique_database, tmpdir):
+    # Get Bloom filters from the first row group of file PARQUET_TEST_FILE.
+    reference_col_to_bloom_filter = self._get_first_row_group_bloom_filters(
+        self.PARQUET_TEST_FILE)
+
+    # Create a new Parquet file with the same data as in PARQUET_TEST_FILE.
+    tbl_name = 'parquet_bloom_filter_writing'
+    hdfs_path = self._create_empty_test_table(vector, str(unique_database), tbl_name)
+    self._set_tbl_props_to_match_test_file(vector, str(unique_database), tbl_name)
+    self._populate_table(vector, str(unique_database), tbl_name)
+
+    # Get the created Parquet file and extract the Bloom filters.
+    col_to_bloom_filter_list = self._get_first_row_group_bloom_filters_from_hdfs_dir(
+        hdfs_path, tmpdir)
+    # There should be exactly one file as we have written one row group.
+    assert len(col_to_bloom_filter_list) == 1
+    col_to_bloom_filter = col_to_bloom_filter_list[0]
+    self._compare_bloom_filters_to_reference(
+        reference_col_to_bloom_filter, col_to_bloom_filter)
+
+    # Query an element that is and one that is not present in the table in column
+    # 'column_name'. In the first case there should be no skipping, in the second case we
+    # should skip the row group because of dictionary filtering, not Bloom filtering as
+    # all the elements fit in the dictionary and if there is a dictionary we use that, not
+    # the Bloom filter.
+    column_name = 'int64_col'
+    self._query_element_check_profile(vector, str(unique_database), tbl_name, column_name,
+        0, ['NumBloomFilteredRowGroups: 0 (0)'], ['NumBloomFilteredRowGroups: 1 (1)'])
+    self._query_element_check_profile(vector, str(unique_database), tbl_name, column_name,
+        1, ['NumBloomFilteredRowGroups: 0 (0)', 'NumDictFilteredRowGroups: 1 (1)'],
+        ['NumBloomFilteredRowGroups: 1 (1)', 'NumDictFilteredRowGroups: 0 (0)'])
+
+  def test_fallback_from_dict(self, vector, unique_database, tmpdir):
+    """ Tests falling back from dict encoding to plain encoding and using a Bloom filter
+    after reaching the max dict size. """
+    tbl_name = 'fallback_from_dict'
+    column_name = 'col'
+    self._create_table_dict_overflow(vector, str(unique_database), tbl_name,
+        column_name, True)
+
+    # Get the created Parquet file and extract the Bloom filters.
+    hdfs_path = self._get_hdfs_path(str(unique_database), tbl_name)
+    col_idx_to_bloom_filter_list = self._get_first_row_group_bloom_filters_from_hdfs_dir(
+        hdfs_path, tmpdir)
+    # There should be exactly one file as we have written one row group.
+    assert len(col_idx_to_bloom_filter_list) == 1
+    col_idx_to_bloom_filter = col_idx_to_bloom_filter_list[0]
+    # The index of the only column is 0.
+    bloom_filter_data = col_idx_to_bloom_filter[0]
+    bitset = bloom_filter_data.directory
+
+    # We should have inserted 'max_dict' + 1 elements into the Bloom filter when falling
+    # back. If the implementation is incorrect and we did not copy the elements in the
+    # dictionary of the dict encoding to the Bloom filter, only 1 element will have been
+    # inserted, meaning that exactly 8 bytes have non-zero values. If there are more
+    # non-zero bytes we can assume that the implementation does not have this error.
+    assert isinstance(bitset, bytes)
+    nonzero = 0
+    for byte in bitset:
+      if byte != 0:
+        nonzero += 1
+    assert nonzero > 8
+
+    # Query an element that is and one that is not present in the table and check whether
+    # we correctly do not skip and skip the row group, respectively.
+    self._query_element_check_profile(vector, str(unique_database), tbl_name, column_name,
+        0, ['NumBloomFilteredRowGroups: 0 (0)'], ['NumBloomFilteredRowGroups: 1 (1)'])
+    self._query_element_check_profile(vector, str(unique_database), tbl_name, column_name,
+        1, ['NumBloomFilteredRowGroups: 1 (1)'], ['NumBloomFilteredRowGroups: 0 (0)'])
+
+  def test_fallback_from_dict_if_no_bloom_tbl_props(self, vector, unique_database,
+      tmpdir):
+    """ Tests falling back from dict encoding to plain encoding when Bloom filtering is
+    not enabled in table properties, after reaching the max dict size. We check that no
+    Bloom filter is written. """
+    tbl_name = 'fallback_from_dict'
+    column_name = 'col'
+    self._create_table_dict_overflow(vector, str(unique_database), tbl_name, column_name,
+        False)
+
+    # Get the created Parquet file.
+    hdfs_path = self._get_hdfs_path(str(unique_database), tbl_name)
+    col_idx_to_bloom_filter_list = self._get_first_row_group_bloom_filters_from_hdfs_dir(
+        hdfs_path, tmpdir)
+    # There should be exactly one file as we have written one row group.
+    assert len(col_idx_to_bloom_filter_list) == 1
+    col_idx_to_bloom_filter = col_idx_to_bloom_filter_list[0]
+    # There should be no Bloom filter.
+    assert(len(col_idx_to_bloom_filter) == 0)
+
+  def _query_element_check_profile(self, vector, db_name, tbl_name, col_name,
+          element, strings_in_profile, strings_not_in_profile):
+    """ Run a query filtering on column 'col_name' having value 'element' and asserts
+    that the query profile contains the strings in 'strings_in_profile' and that it does
+    not contain the strings in 'strings_not_in_profile'. Can be used for example to
+    check whether the Bloom filter was used. """
+    query_stmt = 'select {col_name} from {db}.{tbl} where {col_name} = {value}'
+
+    result_in_table = self.execute_query(query_stmt.format(col_name=col_name,
+        db=db_name, tbl=tbl_name, value=element),
+        vector.get_value('exec_option'))
+    for s in strings_in_profile:
+      assert s in result_in_table.runtime_profile
+    for s in strings_not_in_profile:
+      assert s not in result_in_table.runtime_profile
+
+  def _create_table_dict_overflow(self, vector, db_name, tbl_name, column_name,
+      bloom_tbl_prop):
+    max_dict_size = 40000
+    ndv = max_dict_size + 1
+    fpp = 0.05
+    bitset_size = self._optimal_bitset_size(ndv, fpp)
+
+    # We create a table with a single BIGINT column, optionally with table properties for
+    # Bloom filtering.
+    create_stmt = 'create table {db}.{tbl} ({col_name} BIGINT) stored as parquet'
+    if bloom_tbl_prop:
+      create_stmt += ' TBLPROPERTIES("parquet.bloom.filter.columns"="{col_name}:{size}")'
+    create_stmt = create_stmt.format(
+        db=db_name, tbl=tbl_name, col_name=column_name, size=bitset_size)
+
+    # We only insert even numbers so an odd number should be filtered out based on the
+    # Bloom filter.
+    values = ['({})'.format(i * 2) for i in range(ndv)]
+    insert_stmt = 'insert into {db}.{tbl} values {values}'.format(
+        db=db_name, tbl=tbl_name, values=','.join(values))
+
+    vector.get_value('exec_option')['num_nodes'] = 1
+    vector.get_value('exec_option')['parquet_bloom_filter_write'] = 'IF_NO_DICT'
+    self.execute_query(create_stmt, vector.get_value('exec_option'))
+    self.execute_query(insert_stmt, vector.get_value('exec_option'))
+
+  def _optimal_bitset_size(self, ndv, fpp):
+    """ Based on ParquetBloomFilter::OptimalByteSize() in
+    be/src/util/parquet-bloom-filter.h """
+    log_res = None
+    if (ndv == 0):
+      log_res = 0
+    else:
+      words_in_bucket = 8.0
+      m = -words_in_bucket * ndv / math.log(1 - math.pow(fpp, 1.0 / words_in_bucket))
+      log_res = max(0, math.ceil(math.log(m / 8, 2)))
+    return int(2 ** log_res)
+
+  def _create_test_table_from_file(self, db_name, filename):
+    create_table_and_copy_files(self.client, self.create_stmt,
+                                db_name, 'parquet_bloom_filter', [filename])
+
+  def _create_empty_test_table(self, vector, db_name, tbl_name):
+    self.execute_query("drop table if exists {0}.{1}".format(db_name, tbl_name))
+    vector.get_value('exec_option')['num_nodes'] = 1
+    query = self.create_stmt.format(db=db_name, tbl=tbl_name)
+    self.execute_query(query, vector.get_value('exec_option'))
+    return self._get_hdfs_path(db_name, tbl_name)
+
+  def _get_hdfs_path(self, db_name, tbl_name):
+    return get_fs_path('/test-warehouse/{0}.db/{1}/'.format(db_name, tbl_name))
+
+  def _set_tbl_props_to_match_test_file(self, vector, db_name, tbl_name):
+    vector.get_value('exec_option')['num_nodes'] = 1
+    query = self.set_tbl_props_stmt.format(db=db_name, tbl=tbl_name)
+    self.execute_query(query, vector.get_value('exec_option'))
+
+  def _populate_table(self, vector, db_name, tbl_name):
+    # Populate the table with even numbers, as in the first row group in the file
+    # PARQUET_TEST_FILE
+    query_format = 'insert into {db}.{tbl} values {{values}}'.format(
+        db=db_name, tbl=tbl_name)
+    rows = []
+    for i in range(100):
+      k = (i * 2) % 128
+      row_values = '({0}, {0}, {0}, {0}, {0}.0, {0}.0, \
+          "{0}", cast("{0}" as VARCHAR(3)))'.format(k)
+      rows.append(row_values)
+    vector.get_value('exec_option')['num_nodes'] = 1
+    vector.get_value('exec_option')['parquet_bloom_filter_write'] = 'ALWAYS'
+    self.execute_query(query_format.format(values=", ".join(rows)),
+        vector.get_value('exec_option'))
+
+  def _get_first_row_group_bloom_filters_from_hdfs_dir(self, hdfs_path, tmpdir):
+    """ Returns the bloom filters from the first row group (like
+    _get_first_row_group_bloom_filters) from each file in the hdfs directory. """
+    # Get the created Parquet file and extract the Bloom filters.
+    check_call(['hdfs', 'dfs', '-get', hdfs_path, tmpdir.strpath])
+    col_to_bloom_filter_list = []
+    for root, _subdirs, files in os.walk(tmpdir.strpath):
+      for filename in files:
+        parquet_file = os.path.join(root, str(filename))
+        col_to_bloom_filter_list.append(
+            self._get_first_row_group_bloom_filters(parquet_file))
+    return col_to_bloom_filter_list
+
+  def _get_first_row_group_bloom_filters(self, parquet_file):
+    # While other functions require a filename relative to $IMPALA_HOME, and prepend the
+    # path of $IMPALA_HOME but this one does not so we have to prepend it ourselves.
+    filename = os.path.join(os.environ['IMPALA_HOME'],
+        parquet_file)
+    file_meta_data = get_parquet_metadata(filename)
+    # We only support flat schemas, the additional element is the root element.
+    schemas = file_meta_data.schema[1:]
+    # We are only interested in the first row group.
+    row_group = file_meta_data.row_groups[0]
+    assert len(schemas) == len(row_group.columns)
+    col_to_bloom_filter = dict()
+    with open(filename) as file_handle:
+      for i, column in enumerate(row_group.columns):
+        column_meta_data = column.meta_data
+        if column_meta_data and column_meta_data.bloom_filter_offset:
+          bloom_filter = self._try_read_bloom_filter(file_handle,
+              column_meta_data.bloom_filter_offset)
+          if bloom_filter:
+            col_to_bloom_filter[i] = bloom_filter
+    return col_to_bloom_filter
+
+  def _try_read_bloom_filter(self, file_handle, bloom_filter_offset):
+    (header, header_size) = self._try_read_bloom_filter_header(
+        file_handle, bloom_filter_offset)
+    if header is None:
+      return None
+    file_handle.seek(bloom_filter_offset + header_size)
+    bloom_filter_bytes = file_handle.read(header.numBytes)
+    return self.BloomFilterData(header, bloom_filter_bytes)
+
+  def _try_read_bloom_filter_header(self, file_handle, bloom_filter_offset):
+    """ Returns the Bloom filter header and its size. If it is not found, None is returned
+    instead of the header and the size is unspecified. """
+    header = None
+    header_size = 8
+    while (header_size <= 1024 and header is None):
+      try:
+        header = read_serialized_object(BloomFilterHeader, file_handle,
+          bloom_filter_offset, header_size)
+      except EOFError:
+        header_size *= 2
+    return (header, header_size)
+
+  def _compare_bloom_filters_to_reference(self,
+       reference_col_to_bloom_filter, col_to_bloom_filter):
+    expected_cols = [0, 1, 2, 3, 4, 5, 6]
+    assert sorted(col_to_bloom_filter.keys()) == expected_cols,\
+        "All columns except the last one (VARCHAR(3)) should have a Bloom filter."
+    for col in expected_cols:
+      (exp_header, exp_directory) = reference_col_to_bloom_filter[col]
+      (header, directory) = col_to_bloom_filter[col]
+
+      assert exp_header == header
+      assert exp_directory == directory,\
+          "Incorrect directory for Bloom filter for column no. {}.".format(col)

[impala] 01/03: IMPALA-10490: Fix illegalStateException in drop stats

Posted by bo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

boroknagyz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 85ebfe220aa6fe654b860d5f3fe1d8902940de0f
Author: Vihang Karajgaonkar <vi...@apache.org>
AuthorDate: Wed Jul 7 14:58:07 2021 -0700

    IMPALA-10490: Fix illegalStateException in drop stats
    
    If a table was created and incremental stats were computed on the table
    while events processing is turned off, then after the events processing
    is turned on a drop stats or truncate table command on such a table
    fails with IllegalStateException. This happens because the
    catalog service identifiers are not found in the table properties
    while partition for the table are being altered. This patch adds
    the catalog service identifiers for drop stats, truncate and comment on
    statement code paths to fix the error.
    
    Testing:
    1. Added more statements to the test_self_events test to cover the
    newly added logic.
    2. Manually tested the following scenario which was previously failing
      a. starting catalogd without events processing.
      b. create partitioned table and compute incremental stats on it.
      c. restart catalogd with events processing turned on.
      d. issue drop stats command.
    
    Change-Id: Iaa0b4043879370c52049d22acb49c847b0be1c68
    Reviewed-on: http://gerrit.cloudera.org:8080/17659
    Reviewed-by: Vihang Karajgaonkar <vi...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 .../org/apache/impala/service/CatalogOpExecutor.java    | 12 ++++++++++++
 tests/custom_cluster/test_event_processing.py           | 17 ++++++++++++++---
 2 files changed, 26 insertions(+), 3 deletions(-)

diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
index 1fc20fc..3b77cc9 100644
--- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
@@ -2099,6 +2099,8 @@ public class CatalogOpExecutor {
     try {
       long newCatalogVersion = catalog_.incrementAndGetCatalogVersion();
       catalog_.getLock().writeLock().unlock();
+      addCatalogServiceIdentifiers(table, catalog_.getCatalogServiceId(),
+          newCatalogVersion);
       if (params.getPartition_set() == null) {
         // TODO: Report the number of updated partitions/columns to the user?
         // TODO: bulk alter the partitions.
@@ -2124,6 +2126,7 @@ public class CatalogOpExecutor {
       }
       loadTableMetadata(table, newCatalogVersion, /*reloadFileMetadata=*/false,
           /*reloadTableSchema=*/true, /*partitionsToUpdate=*/null, "DROP STATS");
+      catalog_.addVersionsForInflightEvents(false, table, newCatalogVersion);
       addTableToCatalogUpdate(table, wantMinimalResult, resp.result);
       addSummary(resp, "Stats have been dropped.");
     } finally {
@@ -2667,6 +2670,7 @@ public class CatalogOpExecutor {
       Preconditions.checkState(newCatalogVersion > 0);
       addSummary(resp, "Table has been truncated.");
       loadTableMetadata(table, newCatalogVersion, true, true, null, "TRUNCATE");
+      catalog_.addVersionsForInflightEvents(false, table, newCatalogVersion);
       addTableToCatalogUpdate(table, wantMinimalResult, resp.result);
     } finally {
       UnlockWriteLockIfErronouslyLocked();
@@ -2710,6 +2714,8 @@ public class CatalogOpExecutor {
             table.getFullName(), sw.elapsed(TimeUnit.MILLISECONDS));
         newCatalogVersion = catalog_.incrementAndGetCatalogVersion();
         catalog_.getLock().writeLock().unlock();
+        addCatalogServiceIdentifiers(table, catalog_.getCatalogServiceId(),
+            newCatalogVersion);
         TblTransaction tblTxn = MetastoreShim.createTblTransaction(hmsClient,
             table.getMetaStoreTable(), txn.getId());
         HdfsTable hdfsTable = (HdfsTable) table;
@@ -2810,6 +2816,8 @@ public class CatalogOpExecutor {
     Preconditions.checkState(catalog_.getLock().isWriteLockedByCurrentThread());
     long newCatalogVersion = catalog_.incrementAndGetCatalogVersion();
     catalog_.getLock().writeLock().unlock();
+    addCatalogServiceIdentifiers(table, catalog_.getCatalogServiceId(),
+        newCatalogVersion);
     HdfsTable hdfsTable = (HdfsTable) table;
     boolean isTableBeingReplicated = false;
     Stopwatch sw = Stopwatch.createStarted();
@@ -6334,6 +6342,7 @@ public class CatalogOpExecutor {
       }
       applyAlterTable(msTbl);
       loadTableMetadata(tbl, newCatalogVersion, false, false, null, "ALTER COMMENT");
+      catalog_.addVersionsForInflightEvents(false, tbl, newCatalogVersion);
       addTableToCatalogUpdate(tbl, wantMinimalResult, response.result);
       addSummary(response, String.format("Updated %s.", (isView) ? "view" : "table"));
     } finally {
@@ -6350,6 +6359,8 @@ public class CatalogOpExecutor {
     try {
       long newCatalogVersion = catalog_.incrementAndGetCatalogVersion();
       catalog_.getLock().writeLock().unlock();
+      addCatalogServiceIdentifiers(tbl, catalog_.getCatalogServiceId(),
+          newCatalogVersion);
       if (tbl instanceof KuduTable) {
         TColumn new_col = new TColumn(columnName,
             tbl.getColumn(columnName).getType().toThrift());
@@ -6369,6 +6380,7 @@ public class CatalogOpExecutor {
       }
       loadTableMetadata(tbl, newCatalogVersion, false, true, null,
           "ALTER COLUMN COMMENT");
+      catalog_.addVersionsForInflightEvents(false, tbl, newCatalogVersion);
       addTableToCatalogUpdate(tbl, wantMinimalResult, response.result);
       addSummary(response, "Column has been altered.");
     } finally {
diff --git a/tests/custom_cluster/test_event_processing.py b/tests/custom_cluster/test_event_processing.py
index c9679e6..e5ed34d 100644
--- a/tests/custom_cluster/test_event_processing.py
+++ b/tests/custom_cluster/test_event_processing.py
@@ -659,7 +659,10 @@ class TestEventProcessing(CustomClusterTestSuite):
           "alter table {0}.{1} set TBLPROPERTIES ('k'='v')".format(db_name, tbl_name),
           "alter table {0}.{1} ADD COLUMN c1 int".format(db_name, tbl_name),
           "alter table {0}.{1} ALTER COLUMN C1 set comment 'c1 comment'".format(db_name,
-                                                                                tbl_name),
+            tbl_name),
+          "comment on table {0}.{1} IS 'table level comment'".format(db_name, tbl_name),
+          "comment on column {0}.{1}.C1 IS 'column level comment'".format(db_name,
+            tbl_name),
           "alter table {0}.{1} ADD COLUMNS (c2 int, c3 string)".format(db_name, tbl_name),
           "alter table {0}.{1} DROP COLUMN c1".format(db_name, tbl_name),
           "alter table {0}.{1} DROP COLUMN c2".format(db_name, tbl_name),
@@ -670,6 +673,8 @@ class TestEventProcessing(CustomClusterTestSuite):
           "alter view {0}.{1} set owner role `test-view-role`".format(db_name, view_name),
           # compute stats will generates ALTER_PARTITION
           "compute stats {0}.{1}".format(db_name, tbl_name),
+          "compute incremental stats {0}.{1}".format(db_name, tbl_name),
+          "drop stats {0}.{1}".format(db_name, tbl_name),
           # insert into a existing partition; generates INSERT self-event
           "insert into table {0}.{1} partition "
           "(year, month) select * from functional.alltypessmall where year=2009 "
@@ -677,8 +682,14 @@ class TestEventProcessing(CustomClusterTestSuite):
           # insert overwrite query from Impala also generates a INSERT self-event
           "insert overwrite table {0}.{1} partition "
           "(year, month) select * from functional.alltypessmall where year=2009 "
-          "and month=1".format(db_name, tbl_name)],
-      # Queries which will not increment the events-skipped counter
+          "and month=1".format(db_name, tbl_name),
+          # events processor doesn't process delete column stats events currently,
+          # however, in case of incremental stats, there could be alter table and
+          # alter partition events which should be ignored. Hence we run compute stats
+          # before to make sure that the truncate table command generated alter events
+          # are ignored.
+          "compute incremental stats {0}.{1}".format(db_name, tbl_name),
+          "truncate table {0}.{1}".format(db_name, tbl_name)],
       False: [
           "create table {0}.{1} like functional.alltypessmall "
           "stored as parquet".format(db_name, tbl_name),

[impala] 03/03: IMPALA-7087, IMPALA-8131: Read decimals from Parquet files with different precision/scale

Posted by bo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

boroknagyz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 9d46255739f94c53d686f670ee7b5981db59c148
Author: Zoltan Borok-Nagy <bo...@cloudera.com>
AuthorDate: Fri Jun 25 16:29:16 2021 +0200

    IMPALA-7087, IMPALA-8131: Read decimals from Parquet files with different precision/scale
    
    IMPALA-7087 is about reading Parquet decimal columns with lower
    precision/scale than table metadata.
    IMPALA-8131 is about reading Parquet decimal columns with higher
    scale than table metadata.
    
    Both are resolved by this patch. It reuses some parts from an
    earlier change request from Sahil Takiar:
    https://gerrit.cloudera.org/#/c/12163/
    
    A new utility class has been introduced, ParquetDataConverter which does
    the data conversion. It also helps to decide whether data conversion
    is needed or not.
    
    NULL values are returned in case of overflows. This behavior is
    consistent with Hive.
    
    Parquet column stats reader is also updated to convert the decimal
    values. The stats reader is used to evaluate min/max conjuncts. It
    works well because later we also evaluate the conjuncts on the
    converted values anyway.
    
    The status of different filterings:
     * dictionary filtering: disabled for columns that need conversion
     * runtime bloom filters: work on the converted values
     * runtime min/max filters: work on the converted values
    
    This patch also enables schema evolution of decimal columns of Iceberg
    tables.
    
    Testing:
     * added e2e tests
    
    Change-Id: Icefa7e545ca9f7df1741a2d1225375ecf54434da
    Reviewed-on: http://gerrit.cloudera.org:8080/17678
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
    Reviewed-by: Csaba Ringhofer <cs...@cloudera.com>
---
 be/src/exec/parquet/parquet-column-readers.cc      | 142 +++++--------
 be/src/exec/parquet/parquet-column-stats.cc        |  43 +++-
 be/src/exec/parquet/parquet-column-stats.h         |   9 +-
 be/src/exec/parquet/parquet-data-converter.h       | 184 ++++++++++++++++
 be/src/exec/parquet/parquet-metadata-utils.cc      |  30 +--
 be/src/exprs/decimal-operators-ir.cc               |  18 +-
 be/src/runtime/decimal-value.h                     |   4 +-
 be/src/runtime/decimal-value.inline.h              |   8 +-
 .../impala/service/IcebergCatalogOpExecutor.java   |   8 +-
 testdata/data/README                               |   5 +
 ...ry_decimal_precision_and_scale_widening.parquet | Bin 0 -> 984 bytes
 .../queries/QueryTest/iceberg-alter.test           |  21 ++
 .../queries/QueryTest/iceberg-negative.test        |   6 -
 ...rquet-decimal-precision-and-scale-altering.test | 236 +++++++++++++++++++++
 ...rquet-decimal-precision-and-scale-widening.test | 104 +++++++++
 tests/query_test/test_scanners.py                  |  55 +++++
 16 files changed, 718 insertions(+), 155 deletions(-)

diff --git a/be/src/exec/parquet/parquet-column-readers.cc b/be/src/exec/parquet/parquet-column-readers.cc
index d34dbe3..86131ff 100644
--- a/be/src/exec/parquet/parquet-column-readers.cc
+++ b/be/src/exec/parquet/parquet-column-readers.cc
@@ -22,6 +22,7 @@
 
 #include "exec/parquet/hdfs-parquet-scanner.h"
 #include "exec/parquet/parquet-bool-decoder.h"
+#include "exec/parquet/parquet-data-converter.h"
 #include "exec/parquet/parquet-level-decoder.h"
 #include "exec/parquet/parquet-metadata-utils.h"
 #include "exec/scratch-tuple-batch.h"
@@ -208,10 +209,7 @@ class ScalarColumnReader : public BaseScalarColumnReader {
   /// Most column readers never require conversion, so we can avoid branches by
   /// returning constant false. Column readers for types that require conversion
   /// must specialize this function.
-  inline bool NeedsConversionInline() const {
-    DCHECK(!needs_conversion_);
-    return false;
-  }
+  inline bool NeedsConversionInline() const;
 
   /// Similar to NeedsConversion(), most column readers do not require validation,
   /// so to avoid branches, we return constant false. In general, types where not
@@ -223,8 +221,7 @@ class ScalarColumnReader : public BaseScalarColumnReader {
 
   /// Converts and writes 'src' into 'slot' based on desc_->type()
   bool ConvertSlot(const InternalType* src, void* slot) {
-    DCHECK(false);
-    return false;
+    return data_converter_.ConvertSlot(src, slot);
   }
 
   /// Checks if 'val' is invalid, e.g. due to being out of the valid value range. If it
@@ -251,21 +248,22 @@ class ScalarColumnReader : public BaseScalarColumnReader {
         PrintThriftEnum(page_encoding_), col_chunk_reader_.stream()->file_offset());
   }
 
+  ParquetTimestampDecoder& GetTimestampDecoder() {
+    return data_converter_.timestamp_decoder();
+  }
+
   /// Dictionary decoder for decoding column values.
   DictDecoder<InternalType> dict_decoder_;
 
   /// True if dict_decoder_ has been initialized with a dictionary page.
   bool dict_decoder_init_ = false;
 
-  /// true if decoded values must be converted before being written to an output tuple.
-  bool needs_conversion_ = false;
-
   /// The size of this column with plain encoding for FIXED_LEN_BYTE_ARRAY, or
   /// the max length for VARCHAR columns. Unused otherwise.
   int fixed_len_size_;
 
-  /// Contains extra data needed for Timestamp decoding.
-  ParquetTimestampDecoder timestamp_decoder_;
+  /// Converts values if needed.
+  ParquetDataConverter<InternalType, MATERIALIZED> data_converter_;
 
   /// Contains extra state required to decode boolean values. Only initialised for
   /// BOOLEAN columns.
@@ -279,7 +277,9 @@ template <typename InternalType, parquet::Type::type PARQUET_TYPE, bool MATERIAL
 ScalarColumnReader<InternalType, PARQUET_TYPE, MATERIALIZED>::ScalarColumnReader(
     HdfsParquetScanner* parent, const SchemaNode& node, const SlotDescriptor* slot_desc)
   : BaseScalarColumnReader(parent, node, slot_desc),
-    dict_decoder_(parent->scan_node_->mem_tracker()) {
+    dict_decoder_(parent->scan_node_->mem_tracker()),
+    data_converter_(node.element,
+                    MATERIALIZED ? &slot_desc->type() : nullptr) {
   if (!MATERIALIZED) {
     // We're not materializing any values, just counting them. No need (or ability) to
     // initialize state used to materialize values.
@@ -297,18 +297,43 @@ ScalarColumnReader<InternalType, PARQUET_TYPE, MATERIALIZED>::ScalarColumnReader
     fixed_len_size_ = -1;
   }
 
-  needs_conversion_ = slot_desc_->type().type == TYPE_CHAR;
-
   if (slot_desc_->type().type == TYPE_TIMESTAMP) {
-    timestamp_decoder_ = parent->CreateTimestampDecoder(*node.element);
-    dict_decoder_.SetTimestampHelper(timestamp_decoder_);
-    needs_conversion_ = timestamp_decoder_.NeedsConversion();
+    data_converter_.SetTimestampDecoder(parent->CreateTimestampDecoder(*node.element));
+    dict_decoder_.SetTimestampHelper(GetTimestampDecoder());
   }
   if (slot_desc_->type().type == TYPE_BOOLEAN) {
     bool_decoder_ = make_unique<ParquetBoolDecoder>();
   }
 }
 
+template <typename T>
+struct IsDecimalValue {
+  static constexpr bool value =
+    std::is_same<T, Decimal4Value>::value ||
+    std::is_same<T, Decimal8Value>::value ||
+    std::is_same<T, Decimal16Value>::value;
+};
+
+template <typename InternalType, parquet::Type::type PARQUET_TYPE, bool MATERIALIZED>
+inline bool ScalarColumnReader<InternalType, PARQUET_TYPE, MATERIALIZED>
+::NeedsConversionInline() const {
+  //TODO: use constexpr ifs when we switch to C++17.
+  if /* constexpr */ (MATERIALIZED) {
+    if /* constexpr */ (IsDecimalValue<InternalType>::value) {
+      return data_converter_.NeedsConversion();
+    }
+    if /* constexpr */ (std::is_same<InternalType, TimestampValue>::value) {
+      return data_converter_.NeedsConversion();
+    }
+    if /* constexpr */ (std::is_same<InternalType, StringValue>::value &&
+                        PARQUET_TYPE == parquet::Type::BYTE_ARRAY) {
+      return data_converter_.NeedsConversion();
+    }
+  }
+  DCHECK(!data_converter_.NeedsConversion());
+  return false;
+}
+
 // TODO: consider performing filter selectivity checks in this function.
 template <typename InternalType, parquet::Type::type PARQUET_TYPE, bool MATERIALIZED>
 Status ScalarColumnReader<InternalType, PARQUET_TYPE, MATERIALIZED>::InitDataPage(
@@ -675,14 +700,14 @@ bool ScalarColumnReader<InternalType, PARQUET_TYPE, MATERIALIZED>::ReadSlot(
       reinterpret_cast<InternalType*>(NEEDS_CONVERSION ? val_buf : slot);
 
   if (UNLIKELY(!DecodeValue<ENCODING>(&data_, data_end_, val_ptr))) return false;
-  if (UNLIKELY(NeedsValidationInline() && !ValidateValue(val_ptr))) {
+  if (UNLIKELY(NeedsValidationInline() && !ValidateValue(val_ptr)) ||
+      (NEEDS_CONVERSION && UNLIKELY(!ConvertSlot(val_ptr, slot)))) {
     if (UNLIKELY(!parent_->parse_status_.ok())) return false;
     // The value is invalid but execution should continue - set the null indicator and
     // skip conversion.
     tuple->SetNull(null_indicator_offset_);
     return true;
   }
-  if (NEEDS_CONVERSION && UNLIKELY(!ConvertSlot(val_ptr, slot))) return false;
   return true;
 }
 
@@ -709,16 +734,14 @@ bool ScalarColumnReader<InternalType, PARQUET_TYPE, MATERIALIZED>::ReadAndConver
   uint8_t* curr_tuple = tuple_mem;
   for (int64_t i = 0; i < num_to_read; ++i, ++curr_val, curr_tuple += tuple_size) {
     Tuple* tuple = reinterpret_cast<Tuple*>(curr_tuple);
-    if (NeedsValidationInline() && UNLIKELY(!ValidateValue(curr_val))) {
+    if ((NeedsValidationInline() && UNLIKELY(!ValidateValue(curr_val))) ||
+        UNLIKELY(!ConvertSlot(curr_val, tuple->GetSlot(tuple_offset_)))) {
       if (UNLIKELY(!parent_->parse_status_.ok())) return false;
-      // The value is invalid but execution should continue - set the null indicator and
-      // skip conversion.
+      // The value or the conversion is invalid but execution should continue - set the
+      // null indicator.
       tuple->SetNull(null_indicator_offset_);
       continue;
     }
-    if (UNLIKELY(!ConvertSlot(curr_val, tuple->GetSlot(tuple_offset_)))) {
-      return false;
-    }
   }
   return true;
 }
@@ -772,14 +795,15 @@ bool ScalarColumnReader<InternalType, PARQUET_TYPE, MATERIALIZED>::DecodeValue(
 }
 
 // Specialise for decoding INT64 timestamps from PLAIN decoding, which need to call
-// out to 'timestamp_decoder_'.
+// out to the timestamp decoder.
 template <>
 template <>
 bool ScalarColumnReader<TimestampValue, parquet::Type::INT64,
     true>::DecodeValue<Encoding::PLAIN>(uint8_t** RESTRICT data,
     const uint8_t* RESTRICT data_end, TimestampValue* RESTRICT val) RESTRICT {
   DCHECK_EQ(page_encoding_, Encoding::PLAIN);
-  int encoded_len = timestamp_decoder_.Decode<parquet::Type::INT64>(*data, data_end, val);
+  int encoded_len =
+      GetTimestampDecoder().Decode<parquet::Type::INT64>(*data, data_end, val);
   if (UNLIKELY(encoded_len < 0)) {
     SetPlainDecodeError();
     return false;
@@ -834,14 +858,14 @@ bool ScalarColumnReader<InternalType, PARQUET_TYPE, MATERIALIZED>::DecodeValues(
 }
 
 // Specialise for decoding INT64 timestamps from PLAIN decoding, which need to call
-// out to 'timestamp_decoder_'.
+// out to the timestamp decoder.
 template <>
 template <>
 bool ScalarColumnReader<TimestampValue, parquet::Type::INT64,
     true>::DecodeValues<Encoding::PLAIN>(int64_t stride, int64_t count,
     TimestampValue* RESTRICT out_vals) RESTRICT {
   DCHECK_EQ(page_encoding_, Encoding::PLAIN);
-  int64_t encoded_len = timestamp_decoder_.DecodeBatch<parquet::Type::INT64>(
+  int64_t encoded_len = GetTimestampDecoder().DecodeBatch<parquet::Type::INT64>(
       data_, data_end_, count, stride, out_vals);
   if (UNLIKELY(encoded_len < 0)) {
     SetPlainDecodeError();
@@ -881,64 +905,6 @@ void ScalarColumnReader<InternalType, PARQUET_TYPE, MATERIALIZED>::ReadPositions
 }
 
 template <>
-inline bool ScalarColumnReader<StringValue, parquet::Type::BYTE_ARRAY,
-    true>::NeedsConversionInline() const {
-  return needs_conversion_;
-}
-
-template <>
-bool ScalarColumnReader<StringValue, parquet::Type::BYTE_ARRAY, true>::ConvertSlot(
-    const StringValue* src, void* slot) {
-  DCHECK(slot_desc() != nullptr);
-  DCHECK(slot_desc()->type().type == TYPE_CHAR);
-  int char_len = slot_desc()->type().len;
-  int unpadded_len = min(char_len, src->len);
-  char* dst_char = reinterpret_cast<char*>(slot);
-  memcpy(dst_char, src->ptr, unpadded_len);
-  StringValue::PadWithSpaces(dst_char, char_len, unpadded_len);
-  return true;
-}
-
-template <>
-inline bool ScalarColumnReader<TimestampValue, parquet::Type::INT96, true>
-::NeedsConversionInline() const {
-  return needs_conversion_;
-}
-
-template <>
-inline bool ScalarColumnReader<TimestampValue, parquet::Type::INT64, true>
-::NeedsConversionInline() const {
-  return needs_conversion_;
-}
-
-template <>
-bool ScalarColumnReader<TimestampValue, parquet::Type::INT96, true>::ConvertSlot(
-    const TimestampValue* src, void* slot) {
-  // Conversion should only happen when this flag is enabled.
-  DCHECK(timestamp_decoder_.NeedsConversion());
-  TimestampValue* dst_ts = reinterpret_cast<TimestampValue*>(slot);
-  *dst_ts = *src;
-  // TODO: IMPALA-7862: converting timestamps after validating them can move them out of
-  // range. We should either validate after conversion or require conversion to produce an
-  // in-range value.
-  timestamp_decoder_.ConvertToLocalTime(dst_ts);
-  return true;
-}
-
-template <>
-bool ScalarColumnReader<TimestampValue, parquet::Type::INT64, true>::ConvertSlot(
-    const TimestampValue* src, void* slot) {
-  DCHECK(timestamp_decoder_.NeedsConversion());
-  TimestampValue* dst_ts = reinterpret_cast<TimestampValue*>(slot);
-  *dst_ts = *src;
-  // TODO: IMPALA-7862: converting timestamps after validating them can move them out of
-  // range. We should either validate after conversion or require conversion to produce an
-  // in-range value.
-  timestamp_decoder_.ConvertToLocalTime(static_cast<TimestampValue*>(dst_ts));
-  return true;
-}
-
-template <>
 inline bool ScalarColumnReader<TimestampValue, parquet::Type::INT96, true>
 ::NeedsValidationInline() const {
   return true;
@@ -1457,11 +1423,9 @@ static ParquetColumnReader* CreateDecimalColumnReader(
       }
       break;
     case parquet::Type::INT32:
-      DCHECK_EQ(sizeof(Decimal4Value::StorageType), slot_desc->type().GetByteSize());
       return new ScalarColumnReader<Decimal4Value, parquet::Type::INT32, true>(
           parent, node, slot_desc);
     case parquet::Type::INT64:
-      DCHECK_EQ(sizeof(Decimal8Value::StorageType), slot_desc->type().GetByteSize());
       return new ScalarColumnReader<Decimal8Value, parquet::Type::INT64, true>(
           parent, node, slot_desc);
     default:
diff --git a/be/src/exec/parquet/parquet-column-stats.cc b/be/src/exec/parquet/parquet-column-stats.cc
index 94670d0..2ef42f2 100644
--- a/be/src/exec/parquet/parquet-column-stats.cc
+++ b/be/src/exec/parquet/parquet-column-stats.cc
@@ -21,6 +21,8 @@
 #include <cmath>
 #include <limits>
 
+#include "exec/parquet/parquet-data-converter.h"
+
 #include "common/names.h"
 
 namespace impala {
@@ -162,14 +164,14 @@ bool ColumnStatsReader::ReadFromString(StatsField stats_field,
     case TYPE_DECIMAL:
       switch (col_type_.GetByteSize()) {
         case 4:
-          return ColumnStats<Decimal4Value>::DecodePlainValue(encoded_value, slot,
-              element_.type);
+          return DecodeDecimal<Decimal4Value>(encoded_value,
+              static_cast<Decimal4Value*>(slot));
         case 8:
-          return ColumnStats<Decimal8Value>::DecodePlainValue(encoded_value, slot,
-              element_.type);
+          return DecodeDecimal<Decimal8Value>(encoded_value,
+              static_cast<Decimal8Value*>(slot));
         case 16:
-          return ColumnStats<Decimal16Value>::DecodePlainValue(encoded_value, slot,
-              element_.type);
+          return DecodeDecimal<Decimal16Value>(encoded_value,
+              static_cast<Decimal16Value*>(slot));
         }
       DCHECK(false) << "Unknown decimal byte size: " << col_type_.GetByteSize();
     case TYPE_DATE:
@@ -273,6 +275,8 @@ inline int64_t ColumnStatsReader::DecodeBatchOneBoundsCheck(
 
   int64_t pos = start_idx;
   InternalType* output = v;
+  ParquetDataConverter<InternalType, true> data_converter(&element_, &col_type_);
+  //TODO (IMPALA-10793): set timestamp decoder to correctly convert timestamps.
 
   /// We unroll the loop manually in batches of 8.
   constexpr int batch = 8;
@@ -281,9 +285,12 @@ inline int64_t ColumnStatsReader::DecodeBatchOneBoundsCheck(
 
   for (int64_t b = 0; b < full_batches; b++) {
 #pragma push_macro("DECODE_NO_CHECK_UNROLL")
-#define DECODE_NO_CHECK_UNROLL(ignore1, i, ignore2) \
-    ColumnStats<InternalType>::DecodePlainValue( \
-        source[pos + i], output + i, PARQUET_TYPE);
+#define DECODE_NO_CHECK_UNROLL(ignore1, i, ignore2)           \
+    ColumnStats<InternalType>::DecodePlainValue(              \
+        source[pos + i], output + i, PARQUET_TYPE);           \
+    if (UNLIKELY(data_converter.NeedsConversion())) {         \
+      data_converter.ConvertSlot(output + i, output + i);     \
+    }
 
     BOOST_PP_REPEAT_FROM_TO(0, 8 /* The value of `batch` */,
         DECODE_NO_CHECK_UNROLL, ignore);
@@ -295,6 +302,9 @@ inline int64_t ColumnStatsReader::DecodeBatchOneBoundsCheck(
 
   for (; pos < num_values; ++pos) {
     ColumnStats<InternalType>::DecodePlainValue(source[pos], output, PARQUET_TYPE);
+    if (UNLIKELY(data_converter.NeedsConversion())) {
+      data_converter.ConvertSlot(output, output);
+    }
     output++;
   }
 
@@ -340,6 +350,21 @@ inline int64_t ColumnStatsReader::DecodeBatchOneBoundsCheckFastTrack(
   return num_values;
 }
 
+template <typename DecimalType>
+bool ColumnStatsReader::DecodeDecimal(const std::string& stat_value,
+    DecimalType* slot) const {
+  bool ret = ColumnStats<DecimalType>::DecodePlainValue(stat_value, slot,
+      element_.type);
+  if (!ret) return false;
+  ParquetDataConverter<DecimalType, true> data_converter(&element_, &col_type_);
+  if (LIKELY(!data_converter.NeedsConversion())) return true;
+  // Let's convert the decimal value to the table's decimal type. It's OK to evaluate
+  // filters and min/max conjuncts against the converted values as later we'd also
+  // use the converted values anyways.
+  // No need for an extra buffer, we can do the conversion in-place.
+  return data_converter.ConvertSlot(slot, slot);
+}
+
 bool ColumnStatsReader::DecodeTimestamp(const std::string& stat_value,
     ColumnStatsReader::StatsField stats_field, TimestampValue* slot) const {
   bool stats_read = false;
diff --git a/be/src/exec/parquet/parquet-column-stats.h b/be/src/exec/parquet/parquet-column-stats.h
index 996ae04..5615d15 100644
--- a/be/src/exec/parquet/parquet-column-stats.h
+++ b/be/src/exec/parquet/parquet-column-stats.h
@@ -252,8 +252,9 @@ public:
   /// the minimum or maximum value.
   enum class StatsField { MIN, MAX };
 
-  ColumnStatsReader(const parquet::ColumnChunk& col_chunk,  const ColumnType& col_type,
-      const parquet::ColumnOrder* col_order, const parquet::SchemaElement& element)
+  ColumnStatsReader(const parquet::ColumnChunk& col_chunk,
+      const ColumnType& col_type, const parquet::ColumnOrder* col_order,
+      const parquet::SchemaElement& element)
   : col_chunk_(col_chunk),
     col_type_(col_type),
     col_order_(col_order),
@@ -347,6 +348,10 @@ private:
   /// order 'col_order_'. Otherwise, returns false.
   bool CanUseDeprecatedStats() const;
 
+  /// Decodes decimal value into slot. Does conversion if needed.
+  template <typename DecimalType>
+  bool DecodeDecimal(const std::string& stat_value, DecimalType* slot) const;
+
   /// Decodes 'stat_value' and does INT64->TimestampValue and timezone conversions if
   /// necessary. Returns true if the decoding and conversions were successful.
   bool DecodeTimestamp(const std::string& stat_value,
diff --git a/be/src/exec/parquet/parquet-data-converter.h b/be/src/exec/parquet/parquet-data-converter.h
new file mode 100644
index 0000000..feb0cab
--- /dev/null
+++ b/be/src/exec/parquet/parquet-data-converter.h
@@ -0,0 +1,184 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <algorithm>
+
+#include "gen-cpp/parquet_types.h"
+
+#include "runtime/decimal-value.inline.h"
+#include "runtime/runtime-state.h"
+
+namespace impala {
+
+/// Utility class for converting Parquet data to the proper data type that is used
+/// in the tuple's slot.
+template <typename InternalType, bool MATERIALIZED>
+class ParquetDataConverter {
+ public:
+  ParquetDataConverter(const parquet::SchemaElement* elem, const ColumnType* col_type) :
+      parquet_element_(elem), col_type_(col_type) {
+    needs_conversion_ = CheckIfNeedsConversion();
+  }
+
+  /// Converts and writes 'src' into 'slot' while doing the necessary conversions
+  /// It shouldn't be invoked when conversion is not needed.
+  bool ConvertSlot(const InternalType* src, void* slot) const {
+    DCHECK(false);
+    return false;
+  }
+
+  bool NeedsConversion() const { return needs_conversion_; }
+
+  /// Sets extra information that is only needed for decoding TIMESTAMPs.
+  void SetTimestampDecoder(const ParquetTimestampDecoder& timestamp_decoder) {
+    DCHECK_EQ(col_type_->type, TYPE_TIMESTAMP);
+    timestamp_decoder_ = timestamp_decoder;
+    needs_conversion_ = timestamp_decoder_.NeedsConversion();
+  }
+
+  ParquetTimestampDecoder& timestamp_decoder() {
+    return timestamp_decoder_;
+  }
+
+ private:
+  /// Returns true if we need to do a conversion from the Parquet type to the slot type.
+  bool CheckIfNeedsConversion() {
+    if (!MATERIALIZED) return false;
+    if (col_type_->type == TYPE_TIMESTAMP) {
+      return timestamp_decoder_.NeedsConversion();
+    }
+    if (col_type_->type == TYPE_CHAR) {
+      return true;
+    }
+    if (col_type_->type == TYPE_DECIMAL) {
+      if (col_type_->precision != parquet_element_->precision) {
+        // Decimal values can be stored by Decimal4Value (4 bytes), Decimal8Value, and
+        // Decimal16Value. We only need to do a conversion for different precision if
+        // the values require different types (different byte size).
+        if (ColumnType::GetDecimalByteSize(parquet_element_->precision)
+            != col_type_->GetByteSize()) {
+          return true;
+        }
+      }
+      // Different scales require decimal conversion.
+      if (col_type_->scale != parquet_element_->scale) return true;
+    }
+    return false;
+  }
+
+  /// Converts decimal slot to proper precision/scale.
+  bool ConvertDecimalSlot(const InternalType* src, void* slot) const;
+  template <typename DecimalType>
+  bool ConvertDecimalScale(DecimalType* slot) const;
+
+  /// Parquet schema node of the field.
+  const parquet::SchemaElement* parquet_element_;
+  /// Impala slot descriptor of the field.
+  const ColumnType* col_type_;
+  /// Contains extra data needed for Timestamp decoding.
+  ParquetTimestampDecoder timestamp_decoder_;
+  /// True if the slot needs conversion.
+  bool needs_conversion_ = false;
+};
+
+template <>
+inline bool ParquetDataConverter<StringValue, true>::ConvertSlot(
+    const StringValue* src, void* slot) const {
+  DCHECK_EQ(col_type_->type, TYPE_CHAR);
+  int char_len = col_type_->len;
+  int unpadded_len = std::min(char_len, src->len);
+  char* dst_char = reinterpret_cast<char*>(slot);
+  memcpy(dst_char, src->ptr, unpadded_len);
+  StringValue::PadWithSpaces(dst_char, char_len, unpadded_len);
+  return true;
+}
+
+template <>
+inline bool ParquetDataConverter<TimestampValue, true>::ConvertSlot(
+    const TimestampValue* src, void* slot) const {
+  // Conversion should only happen when this flag is enabled.
+  DCHECK(timestamp_decoder_.NeedsConversion());
+  TimestampValue* dst_ts = reinterpret_cast<TimestampValue*>(slot);
+  *dst_ts = *src;
+  // TODO: IMPALA-7862: converting timestamps after validating them can move them out of
+  // range. We should either validate after conversion or require conversion to produce an
+  // in-range value.
+  timestamp_decoder_.ConvertToLocalTime(dst_ts);
+  return true;
+}
+
+#define DECIMAL_CONVERT_SLOT(DecimalValue)                                               \
+template <>                                                                              \
+inline bool ParquetDataConverter<DecimalValue, true>::ConvertSlot(                       \
+    const DecimalValue* src, void* slot) const {                                         \
+  return ConvertDecimalSlot(src, slot);                                                  \
+}
+
+DECIMAL_CONVERT_SLOT(Decimal4Value)
+DECIMAL_CONVERT_SLOT(Decimal8Value)
+DECIMAL_CONVERT_SLOT(Decimal16Value)
+
+template <typename InternalType, bool MATERIALIZED>
+inline bool ParquetDataConverter<InternalType, MATERIALIZED>::ConvertDecimalSlot(
+    const InternalType* src, void* slot) const {
+  // 'overflow' is required for ToDecimal*(), but we only allow higher precision in the
+  // table metadata than the file metadata, i.e. it should never overflow.
+  bool overflow = false;
+  switch (col_type_->GetByteSize()) {
+    case 4: {
+      auto dst_dec4 = reinterpret_cast<Decimal4Value*>(slot);
+      *dst_dec4 = ToDecimal4(*src, &overflow);
+      DCHECK(!overflow);
+      return ConvertDecimalScale(dst_dec4);
+    }
+    case 8: {
+      auto dst_dec8 = reinterpret_cast<Decimal8Value*>(slot);
+      *dst_dec8 = ToDecimal8(*src, &overflow);
+      DCHECK(!overflow);
+      return ConvertDecimalScale(dst_dec8);
+    }
+    case 16: {
+      auto dst_dec16 = reinterpret_cast<Decimal16Value*>(slot);
+      *dst_dec16 = ToDecimal16(*src, &overflow);
+      DCHECK(!overflow);
+      return ConvertDecimalScale(dst_dec16);
+    }
+    default:
+      DCHECK(false) << "Internal error: cannot handle decimals of precision "
+                    << col_type_->precision;
+      return false;
+  }
+}
+
+template <typename InternalType, bool MATERIALIZED>
+template <typename DecimalType>
+inline bool ParquetDataConverter<InternalType, MATERIALIZED>
+    ::ConvertDecimalScale(DecimalType* slot) const {
+  int parquet_file_scale = parquet_element_->scale;
+  int slot_scale = col_type_->scale;
+  if (LIKELY(parquet_file_scale == slot_scale)) return true;
+
+  bool overflow = false;
+  *slot = slot->ScaleTo(parquet_file_scale, slot_scale, col_type_->precision,
+                        /*round=*/true, &overflow);
+  if (UNLIKELY(overflow)) return false;
+  return true;
+}
+
+} // namespace impala
diff --git a/be/src/exec/parquet/parquet-metadata-utils.cc b/be/src/exec/parquet/parquet-metadata-utils.cc
index 6de2e76..dfd6ce3 100644
--- a/be/src/exec/parquet/parquet-metadata-utils.cc
+++ b/be/src/exec/parquet/parquet-metadata-utils.cc
@@ -318,52 +318,24 @@ Status ParquetMetadataUtils::ValidateColumn(const char* filename,
   bool is_converted_type_decimal = schema_element.__isset.converted_type
       && schema_element.converted_type == parquet::ConvertedType::DECIMAL;
   if (slot_desc->type().type == TYPE_DECIMAL) {
-    // TODO: allow converting to wider type (IMPALA-2515)
-    if (schema_element.type == parquet::Type::INT32 &&
-        sizeof(int32_t) != slot_desc->type().GetByteSize()) {
-      return Status(Substitute("File '$0' decimal column '$1' is stored as INT32, but "
-          "based on the precision in the table metadata, another type would needed.",
-          filename, schema_element.name));
-    }
-    if (schema_element.type == parquet::Type::INT64 &&
-        sizeof(int64_t) != slot_desc->type().GetByteSize()) {
-      return Status(Substitute("File '$0' decimal column '$1' is stored as INT64, but "
-          "based on the precision in the table metadata, another type would needed.",
-          filename, schema_element.name));
-    }
     // We require that the scale and byte length be set.
     if (schema_element.type == parquet::Type::FIXED_LEN_BYTE_ARRAY) {
       if (!schema_element.__isset.type_length) {
         return Status(Substitute("File '$0' column '$1' does not have type_length set.",
             filename, schema_element.name));
       }
-
-      int expected_len = ParquetPlainEncoder::DecimalSize(slot_desc->type());
-      if (schema_element.type_length < expected_len) {
-        return Status(Substitute("File '$0' column '$1' has an invalid type length. "
-            "Expecting: len >= $2 in file: $3", filename, schema_element.name,
-            expected_len, schema_element.type_length));
-      }
     }
     if (!schema_element.__isset.scale) {
       return Status(Substitute("File '$0' column '$1' does not have the scale set.",
           filename, schema_element.name));
     }
 
-    if (schema_element.scale != slot_desc->type().scale) {
-      // TODO: we could allow a mismatch and do a conversion at this step.
-      return Status(Substitute("File '$0' column '$1' has a scale that does not match "
-          "the table metadata scale. File metadata scale: $2 Table metadata scale: $3",
-          filename, schema_element.name, schema_element.scale, slot_desc->type().scale));
-    }
-
     // The other decimal metadata should be there but we don't need it.
     if (!schema_element.__isset.precision) {
       ErrorMsg msg(TErrorCode::PARQUET_MISSING_PRECISION, filename, schema_element.name);
       RETURN_IF_ERROR(state->LogOrReturnError(msg));
     } else {
-      if (schema_element.precision != slot_desc->type().precision) {
-        // TODO: we could allow a mismatch and do a conversion at this step.
+      if (schema_element.precision > slot_desc->type().precision) {
         ErrorMsg msg(TErrorCode::PARQUET_WRONG_PRECISION, filename, schema_element.name,
             schema_element.precision, slot_desc->type().precision);
         RETURN_IF_ERROR(state->LogOrReturnError(msg));
diff --git a/be/src/exprs/decimal-operators-ir.cc b/be/src/exprs/decimal-operators-ir.cc
index b0dbc54..d32e3bb 100644
--- a/be/src/exprs/decimal-operators-ir.cc
+++ b/be/src/exprs/decimal-operators-ir.cc
@@ -116,21 +116,21 @@ IR_ALWAYS_INLINE DecimalVal DecimalOperators::ScaleDecimalValue(FunctionContext*
   switch (ColumnType::GetDecimalByteSize(output_precision)) {
     case 4: {
       Decimal4Value scaled_val = val.ScaleTo(
-          val_scale, output_scale, output_precision, &overflow);
+          val_scale, output_scale, output_precision, /*round=*/false, &overflow);
       RETURN_IF_OVERFLOW(ctx, overflow, DecimalVal);
       return DecimalVal(scaled_val.value());
     }
     case 8: {
       Decimal8Value val8 = ToDecimal8(val, &overflow);
       Decimal8Value scaled_val = val8.ScaleTo(
-          val_scale, output_scale, output_precision, &overflow);
+          val_scale, output_scale, output_precision, /*round=*/false, &overflow);
       RETURN_IF_OVERFLOW(ctx, overflow, DecimalVal);
       return DecimalVal(scaled_val.value());
     }
     case 16: {
       Decimal16Value val16 = ToDecimal16(val, &overflow);
       Decimal16Value scaled_val = val16.ScaleTo(
-          val_scale, output_scale, output_precision, &overflow);
+          val_scale, output_scale, output_precision, /*round=*/false, &overflow);
       RETURN_IF_OVERFLOW(ctx, overflow, DecimalVal);
       return DecimalVal(scaled_val.value());
     }
@@ -146,21 +146,21 @@ IR_ALWAYS_INLINE DecimalVal DecimalOperators::ScaleDecimalValue(FunctionContext*
   switch (ColumnType::GetDecimalByteSize(output_precision)) {
     case 4: {
       Decimal8Value scaled_val = val.ScaleTo(
-          val_scale, output_scale, output_precision, &overflow);
+          val_scale, output_scale, output_precision, /*round=*/false, &overflow);
       Decimal4Value val4 = ToDecimal4(scaled_val, &overflow);
       RETURN_IF_OVERFLOW(ctx, overflow, DecimalVal);
       return DecimalVal(val4.value());
     }
     case 8: {
       Decimal8Value scaled_val = val.ScaleTo(
-          val_scale, output_scale, output_precision, &overflow);
+          val_scale, output_scale, output_precision, /*round=*/false, &overflow);
       RETURN_IF_OVERFLOW(ctx, overflow, DecimalVal);
       return DecimalVal(scaled_val.value());
     }
     case 16: {
       Decimal16Value val16 = ToDecimal16(val, &overflow);
       Decimal16Value scaled_val = val16.ScaleTo(
-          val_scale, output_scale, output_precision, &overflow);
+          val_scale, output_scale, output_precision, /*round=*/false, &overflow);
       RETURN_IF_OVERFLOW(ctx, overflow, DecimalVal);
       return DecimalVal(scaled_val.value());
     }
@@ -176,21 +176,21 @@ IR_ALWAYS_INLINE DecimalVal DecimalOperators::ScaleDecimalValue(FunctionContext*
   switch (ColumnType::GetDecimalByteSize(output_precision)) {
     case 4: {
       Decimal16Value scaled_val = val.ScaleTo(
-          val_scale, output_scale, output_precision, &overflow);
+          val_scale, output_scale, output_precision, /*round=*/false, &overflow);
       Decimal4Value val4 = ToDecimal4(scaled_val, &overflow);
       RETURN_IF_OVERFLOW(ctx, overflow, DecimalVal);
       return DecimalVal(val4.value());
     }
     case 8: {
       Decimal16Value scaled_val = val.ScaleTo(
-          val_scale, output_scale, output_precision, &overflow);
+          val_scale, output_scale, output_precision, /*round=*/false, &overflow);
       Decimal8Value val8 = ToDecimal8(scaled_val, &overflow);
       RETURN_IF_OVERFLOW(ctx, overflow, DecimalVal);
       return DecimalVal(val8.value());
     }
     case 16: {
       Decimal16Value scaled_val = val.ScaleTo(
-          val_scale, output_scale, output_precision, &overflow);
+          val_scale, output_scale, output_precision, /*round=*/false, &overflow);
       RETURN_IF_OVERFLOW(ctx, overflow, DecimalVal);
       return DecimalVal(scaled_val.value());
     }
diff --git a/be/src/runtime/decimal-value.h b/be/src/runtime/decimal-value.h
index c2744e0..573e35c 100644
--- a/be/src/runtime/decimal-value.h
+++ b/be/src/runtime/decimal-value.h
@@ -110,8 +110,10 @@ class __attribute__ ((packed)) DecimalValue {
   /// Returns a new decimal scaled by from src_type to dst_type.
   /// e.g. If this value was 1100 at scale 3 and the dst_type had scale two, the
   /// result would be 110. (In both cases representing the decimal 1.1).
+  /// 'round' determines the behavior in case of scaling down, i.e. whether 11.56 should
+  /// be 11.5 or 11.6.
   inline DecimalValue ScaleTo(int src_scale, int dst_scale, int dst_precision,
-      bool* overflow) const;
+      bool round, bool* overflow) const;
 
   /// Implementations of the basic arithmetic operators. In all these functions,
   /// we take the precision and scale of both inputs. The return type is assumed
diff --git a/be/src/runtime/decimal-value.inline.h b/be/src/runtime/decimal-value.inline.h
index 3af1d7a..2ae5743 100644
--- a/be/src/runtime/decimal-value.inline.h
+++ b/be/src/runtime/decimal-value.inline.h
@@ -129,17 +129,19 @@ inline typename RESULT_T::underlying_type_t DecimalValue<T>::ToInt(int scale,
 
 template<typename T>
 inline DecimalValue<T> DecimalValue<T>::ScaleTo(int src_scale, int dst_scale,
-    int dst_precision, bool* overflow) const {
+    int dst_precision, bool round, bool* overflow) const {
   int delta_scale = src_scale - dst_scale;
   T result = value();
   T max_value = DecimalUtil::GetScaleMultiplier<T>(dst_precision);
   if (delta_scale >= 0) {
-    if (delta_scale != 0) result /= DecimalUtil::GetScaleMultiplier<T>(delta_scale);
+    if (delta_scale != 0) {
+      result = DecimalUtil::ScaleDownAndRound(result, delta_scale, round);
+    }
     // Even if we are decreasing the absolute unscaled value, we can still overflow.
     // This path is also used to convert between precisions so for example, converting
     // from 100 as decimal(3,0) to decimal(2,0) should be considered an overflow.
     *overflow |= abs(result) >= max_value;
-  } else if (delta_scale < 0) {
+  } else {
     T mult = DecimalUtil::GetScaleMultiplier<T>(-delta_scale);
     *overflow |= abs(result) >= max_value / mult;
     result = ArithmeticUtil::AsUnsigned<std::multiplies>(result, mult);
diff --git a/fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java
index 70def18..5981ddc 100644
--- a/fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java
@@ -118,16 +118,10 @@ public class IcebergCatalogOpExecutor {
    * Iceberg only supports these type conversions:
    *   INTEGER -> LONG
    *   FLOAT -> DOUBLE
-   *   DECIMAL(p1,s1) -> DECIMAL(p1,s2), same scale, p1<=p2 // But we disable DECIMAL
-   *                                                        // conversions due to
-   *                                                        // IMPALA-7087
+   *   DECIMAL(p1,s1) -> DECIMAL(p1,s2), same scale, p1<=p2
    */
   public static void alterColumn(FeIcebergTable feTable, String colName, TColumn newCol)
       throws TableLoadingException, ImpalaRuntimeException {
-    if (Type.fromThrift(newCol.getColumnType()).isDecimal()) {
-      // TODO: remove this if stmt when IMPALA-7087 is fixed.
-      throw new ImpalaRuntimeException("Cannot change DECIMAL column of Iceberg table.");
-    }
     UpdateSchema schema = IcebergUtil.getIcebergUpdateSchema(feTable);
     org.apache.iceberg.types.Type type =
         IcebergSchemaConverter.fromImpalaColumnType(newCol.getColumnType());
diff --git a/testdata/data/README b/testdata/data/README
index cdf0fb8..db8bdd7 100644
--- a/testdata/data/README
+++ b/testdata/data/README
@@ -656,3 +656,8 @@ https://github.com/apache/parquet-mr/blob/master/parquet-hadoop/src/test/java/or
 (65b95fb72be8f5a8a193a6f7bc4560fdcd742fc7).
 The schema was completely changed to allow us to test types supported in Parquet Bloom
 filters.
+
+binary_decimal_precision_and_scale_widening.parquet
+Parquet file written with schema (decimal(9,2), decimal(18,2), decimal(38,2)). The rows
+inside the file are carefully chosen so that they don't cause an overflow when being read
+by an Impala table with a higher precision/scale.
diff --git a/testdata/data/binary_decimal_precision_and_scale_widening.parquet b/testdata/data/binary_decimal_precision_and_scale_widening.parquet
new file mode 100644
index 0000000..040d788
Binary files /dev/null and b/testdata/data/binary_decimal_precision_and_scale_widening.parquet differ
diff --git a/testdata/workloads/functional-query/queries/QueryTest/iceberg-alter.test b/testdata/workloads/functional-query/queries/QueryTest/iceberg-alter.test
index 937fb78..ed64e55 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/iceberg-alter.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/iceberg-alter.test
@@ -264,3 +264,24 @@ NULL,NULL
 ---- TYPES
 DECIMAL,BIGINT
 ====
+---- QUERY
+# Iceberg allows changing precision to a higher value.
+ALTER TABLE ice_alter_cols DROP COLUMN bi;
+ALTER TABLE ice_alter_cols CHANGE COLUMN x d DECIMAL (22, 3);
+SELECT * FROM ice_alter_cols;
+---- RESULTS
+1.618
+1.110
+2.345
+NULL
+NULL
+---- TYPES
+DECIMAL
+====
+---- QUERY
+DESCRIBE ice_alter_cols;
+---- RESULTS
+'d','decimal(22,3)','','true'
+---- TYPES
+STRING,STRING,STRING,STRING
+====
diff --git a/testdata/workloads/functional-query/queries/QueryTest/iceberg-negative.test b/testdata/workloads/functional-query/queries/QueryTest/iceberg-negative.test
index 7aae524..2752e8f 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/iceberg-negative.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/iceberg-negative.test
@@ -268,12 +268,6 @@ ALTER TABLE iceberg_table_hadoop_catalog REPLACE COLUMNS(level INT, register_tim
 AnalysisException: ALTER TABLE REPLACE COLUMNS is not supported on Iceberg tables.
 ====
 ---- QUERY
-CREATE TABLE test_decimal_conversion (d decimal(6,2)) STORED AS ICEBERG;
-ALTER TABLE test_decimal_conversion CHANGE COLUMN d d decimal(8,2);
----- CATCH
-Cannot change DECIMAL column of Iceberg table.
-====
----- QUERY
 CREATE TABLE iceberg_transactional(i int)
 STORED AS ICEBERG
 tblproperties('iceberg.catalog'='hadoop.tables', 'transactional'='true');
diff --git a/testdata/workloads/functional-query/queries/QueryTest/parquet-decimal-precision-and-scale-altering.test b/testdata/workloads/functional-query/queries/QueryTest/parquet-decimal-precision-and-scale-altering.test
new file mode 100644
index 0000000..a242737
--- /dev/null
+++ b/testdata/workloads/functional-query/queries/QueryTest/parquet-decimal-precision-and-scale-altering.test
@@ -0,0 +1,236 @@
+====
+---- QUERY
+CREATE TABLE test_dec (d decimal(6, 3)) STORED AS PARQUET;
+INSERT INTO test_dec VALUES (23.633), (11.151), (-23.672), (-23.154);
+SELECT * FROM test_dec;
+---- RESULTS
+23.633
+11.151
+-23.672
+-23.154
+---- TYPES
+DECIMAL
+====
+---- QUERY
+ALTER TABLE test_dec CHANGE COLUMN d d DECIMAL(8, 3);
+SELECT * FROM test_dec where d = 23.633;
+---- RESULTS
+23.633
+---- TYPES
+DECIMAL
+====
+---- QUERY
+ALTER TABLE test_dec CHANGE COLUMN d d DECIMAL(8, 4);
+SELECT * FROM test_dec where d != 0;
+---- RESULTS
+23.6330
+11.1510
+-23.6720
+-23.1540
+---- TYPES
+DECIMAL
+====
+---- QUERY
+ALTER TABLE test_dec CHANGE COLUMN d d DECIMAL(10, 3);
+SELECT * FROM test_dec where d < 0;
+---- RESULTS
+-23.672
+-23.154
+---- TYPES
+DECIMAL
+====
+---- QUERY
+ALTER TABLE test_dec CHANGE COLUMN d d DECIMAL(10, 5);
+SELECT * FROM test_dec where d > 23.63;
+---- RESULTS
+23.63300
+---- TYPES
+DECIMAL
+====
+---- QUERY
+ALTER TABLE test_dec CHANGE COLUMN d d DECIMAL(32, 8);
+SELECT * FROM test_dec where d > 0;
+---- RESULTS
+23.63300000
+11.15100000
+---- TYPES
+DECIMAL
+====
+---- QUERY
+SELECT * FROM test_dec where d < 0;
+---- RESULTS
+-23.67200000
+-23.15400000
+---- TYPES
+DECIMAL
+====
+---- QUERY
+ALTER TABLE test_dec CHANGE COLUMN d d DECIMAL(10, 2);
+SELECT * FROM test_dec where d = 11.15;
+---- RESULTS
+11.15
+---- TYPES
+DECIMAL
+====
+---- QUERY
+SELECT * FROM test_dec;
+---- RESULTS
+23.63
+11.15
+-23.67
+-23.15
+---- TYPES
+DECIMAL
+====
+---- QUERY
+ALTER TABLE test_dec CHANGE COLUMN d d DECIMAL(10, 1);
+SELECT * FROM test_dec where d = 11.2;
+---- RESULTS
+11.2
+---- TYPES
+DECIMAL
+====
+---- QUERY
+ALTER TABLE test_dec CHANGE COLUMN d d DECIMAL(10, 1);
+SELECT * FROM test_dec where d < 0;
+---- RESULTS
+-23.7
+-23.2
+---- TYPES
+DECIMAL
+====
+---- QUERY
+SELECT * FROM test_dec where d = -23.7;
+---- RESULTS
+-23.7
+---- TYPES
+DECIMAL
+====
+---- QUERY
+ALTER TABLE test_dec CHANGE COLUMN d d DECIMAL(20, 0);
+SELECT * FROM test_dec where d = 24;
+---- RESULTS
+24
+---- TYPES
+DECIMAL
+====
+---- QUERY
+SELECT * FROM test_dec where d = -23;
+---- RESULTS
+-23
+---- TYPES
+DECIMAL
+====
+---- QUERY
+ALTER TABLE test_dec CHANGE COLUMN d d DECIMAL(32, 8);
+INSERT INTO test_dec values (100000000.9999);
+SELECT * FROM test_dec where d > 0;
+---- RESULTS
+23.63300000
+11.15100000
+100000000.99990000
+---- TYPES
+DECIMAL
+====
+---- QUERY
+# Test runtime filters with equi-joins.
+set parquet_page_row_count_limit=1;
+create table deci_left (d decimal(6,3)) sort by (d) stored as parquet;
+create table deci_right (d decimal (6, 3), b boolean) stored as parquet;
+insert into deci_left values (123.123), (222.566), (-123.971);
+insert into deci_right values (123.123, false), (222.566, true), (-123.97, true);
+====
+---- QUERY
+# At first only change the left side.
+set minmax_filter_threshold=1.0;
+set minmax_filtering_level=row;
+alter table deci_left change column d d decimal(6, 2);
+select deci_left.d, deci_right.b
+from deci_left join deci_right on (deci_left.d = deci_right.d)
+where b = true;
+---- RESULTS
+-123.97,true
+---- TYPES
+DECIMAL,BOOLEAN
+====
+---- QUERY
+set minmax_filter_threshold=1.0;
+set minmax_filtering_level=row;
+alter table deci_right change column d d decimal(6, 2);
+select deci_left.d, deci_right.b
+from deci_left join deci_right on (deci_left.d = deci_right.d)
+where b = true;
+---- RESULTS
+-123.97,true
+222.57,true
+---- TYPES
+DECIMAL, BOOLEAN
+====
+---- QUERY
+set minmax_filter_threshold=1.0;
+set minmax_filtering_level=row;
+select deci_left.d, deci_right.b
+from deci_left join deci_right on (deci_left.d = deci_right.d)
+where b = false;
+---- RESULTS
+123.12,false
+---- TYPES
+DECIMAL, BOOLEAN
+====
+---- QUERY
+set minmax_filter_threshold=1.0;
+set minmax_filtering_level=row;
+alter table deci_left change column d d decimal (6, 1);
+alter table deci_right change column d d decimal (6, 1);
+select deci_left.d, deci_right.b
+from deci_left join deci_right on (deci_left.d = deci_right.d)
+where b = true;
+---- RESULTS
+-124.0,true
+222.6,true
+---- TYPES
+DECIMAL, BOOLEAN
+====
+---- QUERY
+set minmax_filter_threshold=1.0;
+set minmax_filtering_level=row;
+select deci_left.d, deci_right.b
+from deci_left join deci_right on (deci_left.d = deci_right.d)
+where b = false;
+---- RESULTS
+123.1,false
+---- TYPES
+DECIMAL, BOOLEAN
+====
+---- QUERY
+set parquet_page_row_count_limit=1;
+set minmax_filter_threshold=1.0;
+set minmax_filtering_level=row;
+insert into deci_left values (356.7), (-200.9);
+insert into deci_right values (356.7, true), (-200.9, true);
+select deci_left.d, deci_right.b
+from deci_left join deci_right on (deci_left.d = deci_right.d)
+where b = true;
+---- RESULTS
+-124.0,true
+222.6,true
+-200.9,true
+356.7,true
+---- TYPES
+DECIMAL, BOOLEAN
+====
+---- QUERY
+set minmax_filter_threshold=1.0;
+set minmax_filtering_level=row;
+alter table deci_left change column d d decimal (8, 4);
+alter table deci_right change column d d decimal (8, 4);
+select deci_left.d, deci_right.b
+from deci_left join deci_right on (deci_left.d = deci_right.d)
+where b = true;
+---- RESULTS
+-200.9000,true
+356.7000,true
+222.5660,true
+---- TYPES
+DECIMAL, BOOLEAN
+====
diff --git a/testdata/workloads/functional-query/queries/QueryTest/parquet-decimal-precision-and-scale-widening.test b/testdata/workloads/functional-query/queries/QueryTest/parquet-decimal-precision-and-scale-widening.test
new file mode 100644
index 0000000..8bc22a8
--- /dev/null
+++ b/testdata/workloads/functional-query/queries/QueryTest/parquet-decimal-precision-and-scale-widening.test
@@ -0,0 +1,104 @@
+====
+---- QUERY
+select * from binary_decimal_precision_widening;
+---- RESULTS
+0.00,0.00,0.00
+255.00,255.00,255.00
+65535.00,65535.00,65535.00
+99999.99,99999.99,99999.99
+0.00,99999999999999.99,9999999999999999999999999999999999.99
+-255.00,-255.00,-255.00
+-65535.00,-65535.00,-65535.00
+-99999.99,-99999.99,-99999.99
+0.00,-99999999999999.99,-9999999999999999999999999999999999.99
+---- TYPES
+DECIMAL,DECIMAL,DECIMAL
+====
+---- QUERY
+select * from binary_decimal_scale_widening;
+---- RESULTS
+0.0000,0.0000,0.0000
+255.0000,255.0000,255.0000
+65535.0000,65535.0000,65535.0000
+99999.9900,99999.9900,99999.9900
+0.0000,99999999999999.9900,9999999999999999999999999999999999.9900
+-255.0000,-255.0000,-255.0000
+-65535.0000,-65535.0000,-65535.0000
+-99999.9900,-99999.9900,-99999.9900
+0.0000,-99999999999999.9900,-9999999999999999999999999999999999.9900
+---- TYPES
+DECIMAL,DECIMAL,DECIMAL
+====
+---- QUERY
+select * from binary_decimal_precision_and_scale_widening;
+---- RESULTS
+0.0000,0.0000,0.0000
+255.0000,255.0000,255.0000
+65535.0000,65535.0000,65535.0000
+99999.9900,99999.9900,99999.9900
+0.0000,99999999999999.9900,9999999999999999999999999999999999.9900
+-255.0000,-255.0000,-255.0000
+-65535.0000,-65535.0000,-65535.0000
+-99999.9900,-99999.9900,-99999.9900
+0.0000,-99999999999999.9900,-9999999999999999999999999999999999.9900
+---- TYPES
+DECIMAL,DECIMAL,DECIMAL
+====
+---- QUERY
+select med_dec * 100000, med_dec * 0.000001 from
+binary_decimal_scale_widening;
+---- RESULTS
+0.0000,0.0000000000
+25500000.0000,0.0002550000
+6553500000.0000,0.0655350000
+9999999000.0000,0.0999999900
+9999999999999999000.0000,99999999.9999999900
+-25500000.0000,-0.0002550000
+-6553500000.0000,-0.0655350000
+-9999999000.0000,-0.0999999900
+-9999999999999999000.0000,-99999999.9999999900
+---- TYPES
+DECIMAL,DECIMAL
+====
+---- QUERY
+select score from int32_decimal_precision_and_scale_widening
+---- RESULTS
+12.340000
+24.560000
+34.123000
+---- TYPES
+DECIMAL
+====
+---- QUERY
+select score from int64_decimal_precision_and_scale_widening
+---- RESULTS
+12.34000000
+24.56000000
+34.12300000
+---- TYPES
+DECIMAL
+====
+---- QUERY
+select * from scale_overflow
+---- RESULTS
+0.0000,0.0000,0.0000
+255.0000,255.0000,255.0000
+65535.0000,65535.0000,65535.0000
+NULL,9999999.9900,9999999.9900
+0.0000,NULL,NULL
+-255.0000,-255.0000,-255.0000
+-65535.0000,-65535.0000,-65535.0000
+NULL,-9999999.9900,-9999999.9900
+0.0000,NULL,NULL
+---- TYPES
+DECIMAL,DECIMAL,DECIMAL
+====
+---- QUERY
+select * from binary_decimal_precision_and_scale_widening where small_dec > 0 and med_dec > 0 and large_dec > 0;
+---- RESULTS
+255.0000,255.0000,255.0000
+65535.0000,65535.0000,65535.0000
+99999.9900,99999.9900,99999.9900
+---- TYPES
+DECIMAL,DECIMAL,DECIMAL
+====
diff --git a/tests/query_test/test_scanners.py b/tests/query_test/test_scanners.py
index 6b4a83c..530e2d9 100644
--- a/tests/query_test/test_scanners.py
+++ b/tests/query_test/test_scanners.py
@@ -1051,6 +1051,61 @@ class TestParquet(ImpalaTestSuite):
         else:
           assert summary.total_num_values == 11
 
+  def test_decimal_precision_and_scale_widening(self, vector, unique_database):
+    """IMPALA-7087: Tests that Parquet files stored with a lower precision or scale than
+       the table metadata can be read by Impala.
+    """
+    # The file binary_decimal_precision_widening is written with schema (decimal(9,2),
+    # decimal(18,2), decimal(38,2))
+    binary_decimal_test_files =\
+        ["testdata/data/binary_decimal_precision_and_scale_widening.parquet"]
+
+    # Test reading Parquet files when the table has a higher precision than the file
+    create_table_and_copy_files(self.client, """create table if not exists {db}.{tbl}
+        (small_dec decimal(38,2), med_dec decimal(38,2), large_dec decimal(38,2))
+        STORED AS PARQUET""", unique_database, "binary_decimal_precision_widening",
+        binary_decimal_test_files)
+
+    # Test reading Parquet files when the table has a higher scale than the file
+    create_table_and_copy_files(self.client, """create table if not exists {db}.{tbl}
+        (small_dec decimal(9,4), med_dec decimal(18,4), large_dec decimal(38,4))
+        STORED AS PARQUET""", unique_database, "binary_decimal_scale_widening",
+        binary_decimal_test_files)
+
+    # Test reading Parquet files when the table has a higher precision and scale than the
+    # file
+    create_table_and_copy_files(self.client, """create table if not exists {db}.{tbl}
+        (small_dec decimal(38,4), med_dec decimal(38,4), large_dec decimal(38,4))
+        STORED AS PARQUET""", unique_database,
+        "binary_decimal_precision_and_scale_widening", binary_decimal_test_files)
+
+    # Test Parquet precision and scale widening when decimals are stored as INT32
+    create_table_and_copy_files(self.client, """create table if not exists {db}.{tbl}
+        (team string, score decimal(12, 6)) STORED AS PARQUET""", unique_database,
+        "int32_decimal_precision_and_scale_widening",
+        ["testdata/data/decimal_stored_as_int32.parquet"])
+
+    # Test Parquet precision and scale widening when decimals are stored as INT64
+    create_table_and_copy_files(self.client, """create table if not exists {db}.{tbl}
+        (team string, score decimal(32, 8)) STORED AS PARQUET""", unique_database,
+        "int64_decimal_precision_and_scale_widening",
+        ["testdata/data/decimal_stored_as_int64.parquet"])
+
+    # Unlike the file binary_decimal_precision_and_scale_widening.parquet, all the values
+    # in binary_decimal_no_dictionary.parquet cannot be converted to a higher scale
+    # without overflowing
+    create_table_and_copy_files(self.client, """create table if not exists {db}.{tbl}
+        (small_dec decimal(9,4), med_dec decimal(18,4), large_dec decimal(38,4))
+        STORED AS PARQUET""", unique_database, "scale_overflow",
+        ["testdata/data/binary_decimal_no_dictionary.parquet"])
+
+    self.run_test_case("QueryTest/parquet-decimal-precision-and-scale-widening", vector,
+                       unique_database)
+
+  def test_decimal_precision_and_scale_altering(self, vector, unique_database):
+    self.run_test_case(
+        "QueryTest/parquet-decimal-precision-and-scale-altering", vector, unique_database)
+
 
 # We use various scan range lengths to exercise corner cases in the HDFS scanner more
 # thoroughly. In particular, it will exercise: