You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by bo...@apache.org on 2021/07/14 13:11:51 UTC

[impala] 03/03: IMPALA-7087, IMPALA-8131: Read decimals from Parquet files with different precision/scale

This is an automated email from the ASF dual-hosted git repository.

boroknagyz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 9d46255739f94c53d686f670ee7b5981db59c148
Author: Zoltan Borok-Nagy <bo...@cloudera.com>
AuthorDate: Fri Jun 25 16:29:16 2021 +0200

    IMPALA-7087, IMPALA-8131: Read decimals from Parquet files with different precision/scale
    
    IMPALA-7087 is about reading Parquet decimal columns with lower
    precision/scale than table metadata.
    IMPALA-8131 is about reading Parquet decimal columns with higher
    scale than table metadata.
    
    Both are resolved by this patch. It reuses some parts from an
    earlier change request from Sahil Takiar:
    https://gerrit.cloudera.org/#/c/12163/
    
    A new utility class has been introduced, ParquetDataConverter which does
    the data conversion. It also helps to decide whether data conversion
    is needed or not.
    
    NULL values are returned in case of overflows. This behavior is
    consistent with Hive.
    
    Parquet column stats reader is also updated to convert the decimal
    values. The stats reader is used to evaluate min/max conjuncts. It
    works well because later we also evaluate the conjuncts on the
    converted values anyway.
    
    The status of different filterings:
     * dictionary filtering: disabled for columns that need conversion
     * runtime bloom filters: work on the converted values
     * runtime min/max filters: work on the converted values
    
    This patch also enables schema evolution of decimal columns of Iceberg
    tables.
    
    Testing:
     * added e2e tests
    
    Change-Id: Icefa7e545ca9f7df1741a2d1225375ecf54434da
    Reviewed-on: http://gerrit.cloudera.org:8080/17678
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
    Reviewed-by: Csaba Ringhofer <cs...@cloudera.com>
---
 be/src/exec/parquet/parquet-column-readers.cc      | 142 +++++--------
 be/src/exec/parquet/parquet-column-stats.cc        |  43 +++-
 be/src/exec/parquet/parquet-column-stats.h         |   9 +-
 be/src/exec/parquet/parquet-data-converter.h       | 184 ++++++++++++++++
 be/src/exec/parquet/parquet-metadata-utils.cc      |  30 +--
 be/src/exprs/decimal-operators-ir.cc               |  18 +-
 be/src/runtime/decimal-value.h                     |   4 +-
 be/src/runtime/decimal-value.inline.h              |   8 +-
 .../impala/service/IcebergCatalogOpExecutor.java   |   8 +-
 testdata/data/README                               |   5 +
 ...ry_decimal_precision_and_scale_widening.parquet | Bin 0 -> 984 bytes
 .../queries/QueryTest/iceberg-alter.test           |  21 ++
 .../queries/QueryTest/iceberg-negative.test        |   6 -
 ...rquet-decimal-precision-and-scale-altering.test | 236 +++++++++++++++++++++
 ...rquet-decimal-precision-and-scale-widening.test | 104 +++++++++
 tests/query_test/test_scanners.py                  |  55 +++++
 16 files changed, 718 insertions(+), 155 deletions(-)

diff --git a/be/src/exec/parquet/parquet-column-readers.cc b/be/src/exec/parquet/parquet-column-readers.cc
index d34dbe3..86131ff 100644
--- a/be/src/exec/parquet/parquet-column-readers.cc
+++ b/be/src/exec/parquet/parquet-column-readers.cc
@@ -22,6 +22,7 @@
 
 #include "exec/parquet/hdfs-parquet-scanner.h"
 #include "exec/parquet/parquet-bool-decoder.h"
+#include "exec/parquet/parquet-data-converter.h"
 #include "exec/parquet/parquet-level-decoder.h"
 #include "exec/parquet/parquet-metadata-utils.h"
 #include "exec/scratch-tuple-batch.h"
@@ -208,10 +209,7 @@ class ScalarColumnReader : public BaseScalarColumnReader {
   /// Most column readers never require conversion, so we can avoid branches by
   /// returning constant false. Column readers for types that require conversion
   /// must specialize this function.
-  inline bool NeedsConversionInline() const {
-    DCHECK(!needs_conversion_);
-    return false;
-  }
+  inline bool NeedsConversionInline() const;
 
   /// Similar to NeedsConversion(), most column readers do not require validation,
   /// so to avoid branches, we return constant false. In general, types where not
@@ -223,8 +221,7 @@ class ScalarColumnReader : public BaseScalarColumnReader {
 
   /// Converts and writes 'src' into 'slot' based on desc_->type()
   bool ConvertSlot(const InternalType* src, void* slot) {
-    DCHECK(false);
-    return false;
+    return data_converter_.ConvertSlot(src, slot);
   }
 
   /// Checks if 'val' is invalid, e.g. due to being out of the valid value range. If it
@@ -251,21 +248,22 @@ class ScalarColumnReader : public BaseScalarColumnReader {
         PrintThriftEnum(page_encoding_), col_chunk_reader_.stream()->file_offset());
   }
 
+  ParquetTimestampDecoder& GetTimestampDecoder() {
+    return data_converter_.timestamp_decoder();
+  }
+
   /// Dictionary decoder for decoding column values.
   DictDecoder<InternalType> dict_decoder_;
 
   /// True if dict_decoder_ has been initialized with a dictionary page.
   bool dict_decoder_init_ = false;
 
-  /// true if decoded values must be converted before being written to an output tuple.
-  bool needs_conversion_ = false;
-
   /// The size of this column with plain encoding for FIXED_LEN_BYTE_ARRAY, or
   /// the max length for VARCHAR columns. Unused otherwise.
   int fixed_len_size_;
 
-  /// Contains extra data needed for Timestamp decoding.
-  ParquetTimestampDecoder timestamp_decoder_;
+  /// Converts values if needed.
+  ParquetDataConverter<InternalType, MATERIALIZED> data_converter_;
 
   /// Contains extra state required to decode boolean values. Only initialised for
   /// BOOLEAN columns.
@@ -279,7 +277,9 @@ template <typename InternalType, parquet::Type::type PARQUET_TYPE, bool MATERIAL
 ScalarColumnReader<InternalType, PARQUET_TYPE, MATERIALIZED>::ScalarColumnReader(
     HdfsParquetScanner* parent, const SchemaNode& node, const SlotDescriptor* slot_desc)
   : BaseScalarColumnReader(parent, node, slot_desc),
-    dict_decoder_(parent->scan_node_->mem_tracker()) {
+    dict_decoder_(parent->scan_node_->mem_tracker()),
+    data_converter_(node.element,
+                    MATERIALIZED ? &slot_desc->type() : nullptr) {
   if (!MATERIALIZED) {
     // We're not materializing any values, just counting them. No need (or ability) to
     // initialize state used to materialize values.
@@ -297,18 +297,43 @@ ScalarColumnReader<InternalType, PARQUET_TYPE, MATERIALIZED>::ScalarColumnReader
     fixed_len_size_ = -1;
   }
 
-  needs_conversion_ = slot_desc_->type().type == TYPE_CHAR;
-
   if (slot_desc_->type().type == TYPE_TIMESTAMP) {
-    timestamp_decoder_ = parent->CreateTimestampDecoder(*node.element);
-    dict_decoder_.SetTimestampHelper(timestamp_decoder_);
-    needs_conversion_ = timestamp_decoder_.NeedsConversion();
+    data_converter_.SetTimestampDecoder(parent->CreateTimestampDecoder(*node.element));
+    dict_decoder_.SetTimestampHelper(GetTimestampDecoder());
   }
   if (slot_desc_->type().type == TYPE_BOOLEAN) {
     bool_decoder_ = make_unique<ParquetBoolDecoder>();
   }
 }
 
+template <typename T>
+struct IsDecimalValue {
+  static constexpr bool value =
+    std::is_same<T, Decimal4Value>::value ||
+    std::is_same<T, Decimal8Value>::value ||
+    std::is_same<T, Decimal16Value>::value;
+};
+
+template <typename InternalType, parquet::Type::type PARQUET_TYPE, bool MATERIALIZED>
+inline bool ScalarColumnReader<InternalType, PARQUET_TYPE, MATERIALIZED>
+::NeedsConversionInline() const {
+  //TODO: use constexpr ifs when we switch to C++17.
+  if /* constexpr */ (MATERIALIZED) {
+    if /* constexpr */ (IsDecimalValue<InternalType>::value) {
+      return data_converter_.NeedsConversion();
+    }
+    if /* constexpr */ (std::is_same<InternalType, TimestampValue>::value) {
+      return data_converter_.NeedsConversion();
+    }
+    if /* constexpr */ (std::is_same<InternalType, StringValue>::value &&
+                        PARQUET_TYPE == parquet::Type::BYTE_ARRAY) {
+      return data_converter_.NeedsConversion();
+    }
+  }
+  DCHECK(!data_converter_.NeedsConversion());
+  return false;
+}
+
 // TODO: consider performing filter selectivity checks in this function.
 template <typename InternalType, parquet::Type::type PARQUET_TYPE, bool MATERIALIZED>
 Status ScalarColumnReader<InternalType, PARQUET_TYPE, MATERIALIZED>::InitDataPage(
@@ -675,14 +700,14 @@ bool ScalarColumnReader<InternalType, PARQUET_TYPE, MATERIALIZED>::ReadSlot(
       reinterpret_cast<InternalType*>(NEEDS_CONVERSION ? val_buf : slot);
 
   if (UNLIKELY(!DecodeValue<ENCODING>(&data_, data_end_, val_ptr))) return false;
-  if (UNLIKELY(NeedsValidationInline() && !ValidateValue(val_ptr))) {
+  if (UNLIKELY(NeedsValidationInline() && !ValidateValue(val_ptr)) ||
+      (NEEDS_CONVERSION && UNLIKELY(!ConvertSlot(val_ptr, slot)))) {
     if (UNLIKELY(!parent_->parse_status_.ok())) return false;
     // The value is invalid but execution should continue - set the null indicator and
     // skip conversion.
     tuple->SetNull(null_indicator_offset_);
     return true;
   }
-  if (NEEDS_CONVERSION && UNLIKELY(!ConvertSlot(val_ptr, slot))) return false;
   return true;
 }
 
@@ -709,16 +734,14 @@ bool ScalarColumnReader<InternalType, PARQUET_TYPE, MATERIALIZED>::ReadAndConver
   uint8_t* curr_tuple = tuple_mem;
   for (int64_t i = 0; i < num_to_read; ++i, ++curr_val, curr_tuple += tuple_size) {
     Tuple* tuple = reinterpret_cast<Tuple*>(curr_tuple);
-    if (NeedsValidationInline() && UNLIKELY(!ValidateValue(curr_val))) {
+    if ((NeedsValidationInline() && UNLIKELY(!ValidateValue(curr_val))) ||
+        UNLIKELY(!ConvertSlot(curr_val, tuple->GetSlot(tuple_offset_)))) {
       if (UNLIKELY(!parent_->parse_status_.ok())) return false;
-      // The value is invalid but execution should continue - set the null indicator and
-      // skip conversion.
+      // The value or the conversion is invalid but execution should continue - set the
+      // null indicator.
       tuple->SetNull(null_indicator_offset_);
       continue;
     }
-    if (UNLIKELY(!ConvertSlot(curr_val, tuple->GetSlot(tuple_offset_)))) {
-      return false;
-    }
   }
   return true;
 }
@@ -772,14 +795,15 @@ bool ScalarColumnReader<InternalType, PARQUET_TYPE, MATERIALIZED>::DecodeValue(
 }
 
 // Specialise for decoding INT64 timestamps from PLAIN decoding, which need to call
-// out to 'timestamp_decoder_'.
+// out to the timestamp decoder.
 template <>
 template <>
 bool ScalarColumnReader<TimestampValue, parquet::Type::INT64,
     true>::DecodeValue<Encoding::PLAIN>(uint8_t** RESTRICT data,
     const uint8_t* RESTRICT data_end, TimestampValue* RESTRICT val) RESTRICT {
   DCHECK_EQ(page_encoding_, Encoding::PLAIN);
-  int encoded_len = timestamp_decoder_.Decode<parquet::Type::INT64>(*data, data_end, val);
+  int encoded_len =
+      GetTimestampDecoder().Decode<parquet::Type::INT64>(*data, data_end, val);
   if (UNLIKELY(encoded_len < 0)) {
     SetPlainDecodeError();
     return false;
@@ -834,14 +858,14 @@ bool ScalarColumnReader<InternalType, PARQUET_TYPE, MATERIALIZED>::DecodeValues(
 }
 
 // Specialise for decoding INT64 timestamps from PLAIN decoding, which need to call
-// out to 'timestamp_decoder_'.
+// out to the timestamp decoder.
 template <>
 template <>
 bool ScalarColumnReader<TimestampValue, parquet::Type::INT64,
     true>::DecodeValues<Encoding::PLAIN>(int64_t stride, int64_t count,
     TimestampValue* RESTRICT out_vals) RESTRICT {
   DCHECK_EQ(page_encoding_, Encoding::PLAIN);
-  int64_t encoded_len = timestamp_decoder_.DecodeBatch<parquet::Type::INT64>(
+  int64_t encoded_len = GetTimestampDecoder().DecodeBatch<parquet::Type::INT64>(
       data_, data_end_, count, stride, out_vals);
   if (UNLIKELY(encoded_len < 0)) {
     SetPlainDecodeError();
@@ -881,64 +905,6 @@ void ScalarColumnReader<InternalType, PARQUET_TYPE, MATERIALIZED>::ReadPositions
 }
 
 template <>
-inline bool ScalarColumnReader<StringValue, parquet::Type::BYTE_ARRAY,
-    true>::NeedsConversionInline() const {
-  return needs_conversion_;
-}
-
-template <>
-bool ScalarColumnReader<StringValue, parquet::Type::BYTE_ARRAY, true>::ConvertSlot(
-    const StringValue* src, void* slot) {
-  DCHECK(slot_desc() != nullptr);
-  DCHECK(slot_desc()->type().type == TYPE_CHAR);
-  int char_len = slot_desc()->type().len;
-  int unpadded_len = min(char_len, src->len);
-  char* dst_char = reinterpret_cast<char*>(slot);
-  memcpy(dst_char, src->ptr, unpadded_len);
-  StringValue::PadWithSpaces(dst_char, char_len, unpadded_len);
-  return true;
-}
-
-template <>
-inline bool ScalarColumnReader<TimestampValue, parquet::Type::INT96, true>
-::NeedsConversionInline() const {
-  return needs_conversion_;
-}
-
-template <>
-inline bool ScalarColumnReader<TimestampValue, parquet::Type::INT64, true>
-::NeedsConversionInline() const {
-  return needs_conversion_;
-}
-
-template <>
-bool ScalarColumnReader<TimestampValue, parquet::Type::INT96, true>::ConvertSlot(
-    const TimestampValue* src, void* slot) {
-  // Conversion should only happen when this flag is enabled.
-  DCHECK(timestamp_decoder_.NeedsConversion());
-  TimestampValue* dst_ts = reinterpret_cast<TimestampValue*>(slot);
-  *dst_ts = *src;
-  // TODO: IMPALA-7862: converting timestamps after validating them can move them out of
-  // range. We should either validate after conversion or require conversion to produce an
-  // in-range value.
-  timestamp_decoder_.ConvertToLocalTime(dst_ts);
-  return true;
-}
-
-template <>
-bool ScalarColumnReader<TimestampValue, parquet::Type::INT64, true>::ConvertSlot(
-    const TimestampValue* src, void* slot) {
-  DCHECK(timestamp_decoder_.NeedsConversion());
-  TimestampValue* dst_ts = reinterpret_cast<TimestampValue*>(slot);
-  *dst_ts = *src;
-  // TODO: IMPALA-7862: converting timestamps after validating them can move them out of
-  // range. We should either validate after conversion or require conversion to produce an
-  // in-range value.
-  timestamp_decoder_.ConvertToLocalTime(static_cast<TimestampValue*>(dst_ts));
-  return true;
-}
-
-template <>
 inline bool ScalarColumnReader<TimestampValue, parquet::Type::INT96, true>
 ::NeedsValidationInline() const {
   return true;
@@ -1457,11 +1423,9 @@ static ParquetColumnReader* CreateDecimalColumnReader(
       }
       break;
     case parquet::Type::INT32:
-      DCHECK_EQ(sizeof(Decimal4Value::StorageType), slot_desc->type().GetByteSize());
       return new ScalarColumnReader<Decimal4Value, parquet::Type::INT32, true>(
           parent, node, slot_desc);
     case parquet::Type::INT64:
-      DCHECK_EQ(sizeof(Decimal8Value::StorageType), slot_desc->type().GetByteSize());
       return new ScalarColumnReader<Decimal8Value, parquet::Type::INT64, true>(
           parent, node, slot_desc);
     default:
diff --git a/be/src/exec/parquet/parquet-column-stats.cc b/be/src/exec/parquet/parquet-column-stats.cc
index 94670d0..2ef42f2 100644
--- a/be/src/exec/parquet/parquet-column-stats.cc
+++ b/be/src/exec/parquet/parquet-column-stats.cc
@@ -21,6 +21,8 @@
 #include <cmath>
 #include <limits>
 
+#include "exec/parquet/parquet-data-converter.h"
+
 #include "common/names.h"
 
 namespace impala {
@@ -162,14 +164,14 @@ bool ColumnStatsReader::ReadFromString(StatsField stats_field,
     case TYPE_DECIMAL:
       switch (col_type_.GetByteSize()) {
         case 4:
-          return ColumnStats<Decimal4Value>::DecodePlainValue(encoded_value, slot,
-              element_.type);
+          return DecodeDecimal<Decimal4Value>(encoded_value,
+              static_cast<Decimal4Value*>(slot));
         case 8:
-          return ColumnStats<Decimal8Value>::DecodePlainValue(encoded_value, slot,
-              element_.type);
+          return DecodeDecimal<Decimal8Value>(encoded_value,
+              static_cast<Decimal8Value*>(slot));
         case 16:
-          return ColumnStats<Decimal16Value>::DecodePlainValue(encoded_value, slot,
-              element_.type);
+          return DecodeDecimal<Decimal16Value>(encoded_value,
+              static_cast<Decimal16Value*>(slot));
         }
       DCHECK(false) << "Unknown decimal byte size: " << col_type_.GetByteSize();
     case TYPE_DATE:
@@ -273,6 +275,8 @@ inline int64_t ColumnStatsReader::DecodeBatchOneBoundsCheck(
 
   int64_t pos = start_idx;
   InternalType* output = v;
+  ParquetDataConverter<InternalType, true> data_converter(&element_, &col_type_);
+  //TODO (IMPALA-10793): set timestamp decoder to correctly convert timestamps.
 
   /// We unroll the loop manually in batches of 8.
   constexpr int batch = 8;
@@ -281,9 +285,12 @@ inline int64_t ColumnStatsReader::DecodeBatchOneBoundsCheck(
 
   for (int64_t b = 0; b < full_batches; b++) {
 #pragma push_macro("DECODE_NO_CHECK_UNROLL")
-#define DECODE_NO_CHECK_UNROLL(ignore1, i, ignore2) \
-    ColumnStats<InternalType>::DecodePlainValue( \
-        source[pos + i], output + i, PARQUET_TYPE);
+#define DECODE_NO_CHECK_UNROLL(ignore1, i, ignore2)           \
+    ColumnStats<InternalType>::DecodePlainValue(              \
+        source[pos + i], output + i, PARQUET_TYPE);           \
+    if (UNLIKELY(data_converter.NeedsConversion())) {         \
+      data_converter.ConvertSlot(output + i, output + i);     \
+    }
 
     BOOST_PP_REPEAT_FROM_TO(0, 8 /* The value of `batch` */,
         DECODE_NO_CHECK_UNROLL, ignore);
@@ -295,6 +302,9 @@ inline int64_t ColumnStatsReader::DecodeBatchOneBoundsCheck(
 
   for (; pos < num_values; ++pos) {
     ColumnStats<InternalType>::DecodePlainValue(source[pos], output, PARQUET_TYPE);
+    if (UNLIKELY(data_converter.NeedsConversion())) {
+      data_converter.ConvertSlot(output, output);
+    }
     output++;
   }
 
@@ -340,6 +350,21 @@ inline int64_t ColumnStatsReader::DecodeBatchOneBoundsCheckFastTrack(
   return num_values;
 }
 
+template <typename DecimalType>
+bool ColumnStatsReader::DecodeDecimal(const std::string& stat_value,
+    DecimalType* slot) const {
+  bool ret = ColumnStats<DecimalType>::DecodePlainValue(stat_value, slot,
+      element_.type);
+  if (!ret) return false;
+  ParquetDataConverter<DecimalType, true> data_converter(&element_, &col_type_);
+  if (LIKELY(!data_converter.NeedsConversion())) return true;
+  // Let's convert the decimal value to the table's decimal type. It's OK to evaluate
+  // filters and min/max conjuncts against the converted values as later we'd also
+  // use the converted values anyways.
+  // No need for an extra buffer, we can do the conversion in-place.
+  return data_converter.ConvertSlot(slot, slot);
+}
+
 bool ColumnStatsReader::DecodeTimestamp(const std::string& stat_value,
     ColumnStatsReader::StatsField stats_field, TimestampValue* slot) const {
   bool stats_read = false;
diff --git a/be/src/exec/parquet/parquet-column-stats.h b/be/src/exec/parquet/parquet-column-stats.h
index 996ae04..5615d15 100644
--- a/be/src/exec/parquet/parquet-column-stats.h
+++ b/be/src/exec/parquet/parquet-column-stats.h
@@ -252,8 +252,9 @@ public:
   /// the minimum or maximum value.
   enum class StatsField { MIN, MAX };
 
-  ColumnStatsReader(const parquet::ColumnChunk& col_chunk,  const ColumnType& col_type,
-      const parquet::ColumnOrder* col_order, const parquet::SchemaElement& element)
+  ColumnStatsReader(const parquet::ColumnChunk& col_chunk,
+      const ColumnType& col_type, const parquet::ColumnOrder* col_order,
+      const parquet::SchemaElement& element)
   : col_chunk_(col_chunk),
     col_type_(col_type),
     col_order_(col_order),
@@ -347,6 +348,10 @@ private:
   /// order 'col_order_'. Otherwise, returns false.
   bool CanUseDeprecatedStats() const;
 
+  /// Decodes decimal value into slot. Does conversion if needed.
+  template <typename DecimalType>
+  bool DecodeDecimal(const std::string& stat_value, DecimalType* slot) const;
+
   /// Decodes 'stat_value' and does INT64->TimestampValue and timezone conversions if
   /// necessary. Returns true if the decoding and conversions were successful.
   bool DecodeTimestamp(const std::string& stat_value,
diff --git a/be/src/exec/parquet/parquet-data-converter.h b/be/src/exec/parquet/parquet-data-converter.h
new file mode 100644
index 0000000..feb0cab
--- /dev/null
+++ b/be/src/exec/parquet/parquet-data-converter.h
@@ -0,0 +1,184 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <algorithm>
+
+#include "gen-cpp/parquet_types.h"
+
+#include "runtime/decimal-value.inline.h"
+#include "runtime/runtime-state.h"
+
+namespace impala {
+
+/// Utility class for converting Parquet data to the proper data type that is used
+/// in the tuple's slot.
+template <typename InternalType, bool MATERIALIZED>
+class ParquetDataConverter {
+ public:
+  ParquetDataConverter(const parquet::SchemaElement* elem, const ColumnType* col_type) :
+      parquet_element_(elem), col_type_(col_type) {
+    needs_conversion_ = CheckIfNeedsConversion();
+  }
+
+  /// Converts and writes 'src' into 'slot' while doing the necessary conversions
+  /// It shouldn't be invoked when conversion is not needed.
+  bool ConvertSlot(const InternalType* src, void* slot) const {
+    DCHECK(false);
+    return false;
+  }
+
+  bool NeedsConversion() const { return needs_conversion_; }
+
+  /// Sets extra information that is only needed for decoding TIMESTAMPs.
+  void SetTimestampDecoder(const ParquetTimestampDecoder& timestamp_decoder) {
+    DCHECK_EQ(col_type_->type, TYPE_TIMESTAMP);
+    timestamp_decoder_ = timestamp_decoder;
+    needs_conversion_ = timestamp_decoder_.NeedsConversion();
+  }
+
+  ParquetTimestampDecoder& timestamp_decoder() {
+    return timestamp_decoder_;
+  }
+
+ private:
+  /// Returns true if we need to do a conversion from the Parquet type to the slot type.
+  bool CheckIfNeedsConversion() {
+    if (!MATERIALIZED) return false;
+    if (col_type_->type == TYPE_TIMESTAMP) {
+      return timestamp_decoder_.NeedsConversion();
+    }
+    if (col_type_->type == TYPE_CHAR) {
+      return true;
+    }
+    if (col_type_->type == TYPE_DECIMAL) {
+      if (col_type_->precision != parquet_element_->precision) {
+        // Decimal values can be stored by Decimal4Value (4 bytes), Decimal8Value, and
+        // Decimal16Value. We only need to do a conversion for different precision if
+        // the values require different types (different byte size).
+        if (ColumnType::GetDecimalByteSize(parquet_element_->precision)
+            != col_type_->GetByteSize()) {
+          return true;
+        }
+      }
+      // Different scales require decimal conversion.
+      if (col_type_->scale != parquet_element_->scale) return true;
+    }
+    return false;
+  }
+
+  /// Converts decimal slot to proper precision/scale.
+  bool ConvertDecimalSlot(const InternalType* src, void* slot) const;
+  template <typename DecimalType>
+  bool ConvertDecimalScale(DecimalType* slot) const;
+
+  /// Parquet schema node of the field.
+  const parquet::SchemaElement* parquet_element_;
+  /// Impala slot descriptor of the field.
+  const ColumnType* col_type_;
+  /// Contains extra data needed for Timestamp decoding.
+  ParquetTimestampDecoder timestamp_decoder_;
+  /// True if the slot needs conversion.
+  bool needs_conversion_ = false;
+};
+
+template <>
+inline bool ParquetDataConverter<StringValue, true>::ConvertSlot(
+    const StringValue* src, void* slot) const {
+  DCHECK_EQ(col_type_->type, TYPE_CHAR);
+  int char_len = col_type_->len;
+  int unpadded_len = std::min(char_len, src->len);
+  char* dst_char = reinterpret_cast<char*>(slot);
+  memcpy(dst_char, src->ptr, unpadded_len);
+  StringValue::PadWithSpaces(dst_char, char_len, unpadded_len);
+  return true;
+}
+
+template <>
+inline bool ParquetDataConverter<TimestampValue, true>::ConvertSlot(
+    const TimestampValue* src, void* slot) const {
+  // Conversion should only happen when this flag is enabled.
+  DCHECK(timestamp_decoder_.NeedsConversion());
+  TimestampValue* dst_ts = reinterpret_cast<TimestampValue*>(slot);
+  *dst_ts = *src;
+  // TODO: IMPALA-7862: converting timestamps after validating them can move them out of
+  // range. We should either validate after conversion or require conversion to produce an
+  // in-range value.
+  timestamp_decoder_.ConvertToLocalTime(dst_ts);
+  return true;
+}
+
+#define DECIMAL_CONVERT_SLOT(DecimalValue)                                               \
+template <>                                                                              \
+inline bool ParquetDataConverter<DecimalValue, true>::ConvertSlot(                       \
+    const DecimalValue* src, void* slot) const {                                         \
+  return ConvertDecimalSlot(src, slot);                                                  \
+}
+
+DECIMAL_CONVERT_SLOT(Decimal4Value)
+DECIMAL_CONVERT_SLOT(Decimal8Value)
+DECIMAL_CONVERT_SLOT(Decimal16Value)
+
+template <typename InternalType, bool MATERIALIZED>
+inline bool ParquetDataConverter<InternalType, MATERIALIZED>::ConvertDecimalSlot(
+    const InternalType* src, void* slot) const {
+  // 'overflow' is required for ToDecimal*(), but we only allow higher precision in the
+  // table metadata than the file metadata, i.e. it should never overflow.
+  bool overflow = false;
+  switch (col_type_->GetByteSize()) {
+    case 4: {
+      auto dst_dec4 = reinterpret_cast<Decimal4Value*>(slot);
+      *dst_dec4 = ToDecimal4(*src, &overflow);
+      DCHECK(!overflow);
+      return ConvertDecimalScale(dst_dec4);
+    }
+    case 8: {
+      auto dst_dec8 = reinterpret_cast<Decimal8Value*>(slot);
+      *dst_dec8 = ToDecimal8(*src, &overflow);
+      DCHECK(!overflow);
+      return ConvertDecimalScale(dst_dec8);
+    }
+    case 16: {
+      auto dst_dec16 = reinterpret_cast<Decimal16Value*>(slot);
+      *dst_dec16 = ToDecimal16(*src, &overflow);
+      DCHECK(!overflow);
+      return ConvertDecimalScale(dst_dec16);
+    }
+    default:
+      DCHECK(false) << "Internal error: cannot handle decimals of precision "
+                    << col_type_->precision;
+      return false;
+  }
+}
+
+template <typename InternalType, bool MATERIALIZED>
+template <typename DecimalType>
+inline bool ParquetDataConverter<InternalType, MATERIALIZED>
+    ::ConvertDecimalScale(DecimalType* slot) const {
+  int parquet_file_scale = parquet_element_->scale;
+  int slot_scale = col_type_->scale;
+  if (LIKELY(parquet_file_scale == slot_scale)) return true;
+
+  bool overflow = false;
+  *slot = slot->ScaleTo(parquet_file_scale, slot_scale, col_type_->precision,
+                        /*round=*/true, &overflow);
+  if (UNLIKELY(overflow)) return false;
+  return true;
+}
+
+} // namespace impala
diff --git a/be/src/exec/parquet/parquet-metadata-utils.cc b/be/src/exec/parquet/parquet-metadata-utils.cc
index 6de2e76..dfd6ce3 100644
--- a/be/src/exec/parquet/parquet-metadata-utils.cc
+++ b/be/src/exec/parquet/parquet-metadata-utils.cc
@@ -318,52 +318,24 @@ Status ParquetMetadataUtils::ValidateColumn(const char* filename,
   bool is_converted_type_decimal = schema_element.__isset.converted_type
       && schema_element.converted_type == parquet::ConvertedType::DECIMAL;
   if (slot_desc->type().type == TYPE_DECIMAL) {
-    // TODO: allow converting to wider type (IMPALA-2515)
-    if (schema_element.type == parquet::Type::INT32 &&
-        sizeof(int32_t) != slot_desc->type().GetByteSize()) {
-      return Status(Substitute("File '$0' decimal column '$1' is stored as INT32, but "
-          "based on the precision in the table metadata, another type would needed.",
-          filename, schema_element.name));
-    }
-    if (schema_element.type == parquet::Type::INT64 &&
-        sizeof(int64_t) != slot_desc->type().GetByteSize()) {
-      return Status(Substitute("File '$0' decimal column '$1' is stored as INT64, but "
-          "based on the precision in the table metadata, another type would needed.",
-          filename, schema_element.name));
-    }
     // We require that the scale and byte length be set.
     if (schema_element.type == parquet::Type::FIXED_LEN_BYTE_ARRAY) {
       if (!schema_element.__isset.type_length) {
         return Status(Substitute("File '$0' column '$1' does not have type_length set.",
             filename, schema_element.name));
       }
-
-      int expected_len = ParquetPlainEncoder::DecimalSize(slot_desc->type());
-      if (schema_element.type_length < expected_len) {
-        return Status(Substitute("File '$0' column '$1' has an invalid type length. "
-            "Expecting: len >= $2 in file: $3", filename, schema_element.name,
-            expected_len, schema_element.type_length));
-      }
     }
     if (!schema_element.__isset.scale) {
       return Status(Substitute("File '$0' column '$1' does not have the scale set.",
           filename, schema_element.name));
     }
 
-    if (schema_element.scale != slot_desc->type().scale) {
-      // TODO: we could allow a mismatch and do a conversion at this step.
-      return Status(Substitute("File '$0' column '$1' has a scale that does not match "
-          "the table metadata scale. File metadata scale: $2 Table metadata scale: $3",
-          filename, schema_element.name, schema_element.scale, slot_desc->type().scale));
-    }
-
     // The other decimal metadata should be there but we don't need it.
     if (!schema_element.__isset.precision) {
       ErrorMsg msg(TErrorCode::PARQUET_MISSING_PRECISION, filename, schema_element.name);
       RETURN_IF_ERROR(state->LogOrReturnError(msg));
     } else {
-      if (schema_element.precision != slot_desc->type().precision) {
-        // TODO: we could allow a mismatch and do a conversion at this step.
+      if (schema_element.precision > slot_desc->type().precision) {
         ErrorMsg msg(TErrorCode::PARQUET_WRONG_PRECISION, filename, schema_element.name,
             schema_element.precision, slot_desc->type().precision);
         RETURN_IF_ERROR(state->LogOrReturnError(msg));
diff --git a/be/src/exprs/decimal-operators-ir.cc b/be/src/exprs/decimal-operators-ir.cc
index b0dbc54..d32e3bb 100644
--- a/be/src/exprs/decimal-operators-ir.cc
+++ b/be/src/exprs/decimal-operators-ir.cc
@@ -116,21 +116,21 @@ IR_ALWAYS_INLINE DecimalVal DecimalOperators::ScaleDecimalValue(FunctionContext*
   switch (ColumnType::GetDecimalByteSize(output_precision)) {
     case 4: {
       Decimal4Value scaled_val = val.ScaleTo(
-          val_scale, output_scale, output_precision, &overflow);
+          val_scale, output_scale, output_precision, /*round=*/false, &overflow);
       RETURN_IF_OVERFLOW(ctx, overflow, DecimalVal);
       return DecimalVal(scaled_val.value());
     }
     case 8: {
       Decimal8Value val8 = ToDecimal8(val, &overflow);
       Decimal8Value scaled_val = val8.ScaleTo(
-          val_scale, output_scale, output_precision, &overflow);
+          val_scale, output_scale, output_precision, /*round=*/false, &overflow);
       RETURN_IF_OVERFLOW(ctx, overflow, DecimalVal);
       return DecimalVal(scaled_val.value());
     }
     case 16: {
       Decimal16Value val16 = ToDecimal16(val, &overflow);
       Decimal16Value scaled_val = val16.ScaleTo(
-          val_scale, output_scale, output_precision, &overflow);
+          val_scale, output_scale, output_precision, /*round=*/false, &overflow);
       RETURN_IF_OVERFLOW(ctx, overflow, DecimalVal);
       return DecimalVal(scaled_val.value());
     }
@@ -146,21 +146,21 @@ IR_ALWAYS_INLINE DecimalVal DecimalOperators::ScaleDecimalValue(FunctionContext*
   switch (ColumnType::GetDecimalByteSize(output_precision)) {
     case 4: {
       Decimal8Value scaled_val = val.ScaleTo(
-          val_scale, output_scale, output_precision, &overflow);
+          val_scale, output_scale, output_precision, /*round=*/false, &overflow);
       Decimal4Value val4 = ToDecimal4(scaled_val, &overflow);
       RETURN_IF_OVERFLOW(ctx, overflow, DecimalVal);
       return DecimalVal(val4.value());
     }
     case 8: {
       Decimal8Value scaled_val = val.ScaleTo(
-          val_scale, output_scale, output_precision, &overflow);
+          val_scale, output_scale, output_precision, /*round=*/false, &overflow);
       RETURN_IF_OVERFLOW(ctx, overflow, DecimalVal);
       return DecimalVal(scaled_val.value());
     }
     case 16: {
       Decimal16Value val16 = ToDecimal16(val, &overflow);
       Decimal16Value scaled_val = val16.ScaleTo(
-          val_scale, output_scale, output_precision, &overflow);
+          val_scale, output_scale, output_precision, /*round=*/false, &overflow);
       RETURN_IF_OVERFLOW(ctx, overflow, DecimalVal);
       return DecimalVal(scaled_val.value());
     }
@@ -176,21 +176,21 @@ IR_ALWAYS_INLINE DecimalVal DecimalOperators::ScaleDecimalValue(FunctionContext*
   switch (ColumnType::GetDecimalByteSize(output_precision)) {
     case 4: {
       Decimal16Value scaled_val = val.ScaleTo(
-          val_scale, output_scale, output_precision, &overflow);
+          val_scale, output_scale, output_precision, /*round=*/false, &overflow);
       Decimal4Value val4 = ToDecimal4(scaled_val, &overflow);
       RETURN_IF_OVERFLOW(ctx, overflow, DecimalVal);
       return DecimalVal(val4.value());
     }
     case 8: {
       Decimal16Value scaled_val = val.ScaleTo(
-          val_scale, output_scale, output_precision, &overflow);
+          val_scale, output_scale, output_precision, /*round=*/false, &overflow);
       Decimal8Value val8 = ToDecimal8(scaled_val, &overflow);
       RETURN_IF_OVERFLOW(ctx, overflow, DecimalVal);
       return DecimalVal(val8.value());
     }
     case 16: {
       Decimal16Value scaled_val = val.ScaleTo(
-          val_scale, output_scale, output_precision, &overflow);
+          val_scale, output_scale, output_precision, /*round=*/false, &overflow);
       RETURN_IF_OVERFLOW(ctx, overflow, DecimalVal);
       return DecimalVal(scaled_val.value());
     }
diff --git a/be/src/runtime/decimal-value.h b/be/src/runtime/decimal-value.h
index c2744e0..573e35c 100644
--- a/be/src/runtime/decimal-value.h
+++ b/be/src/runtime/decimal-value.h
@@ -110,8 +110,10 @@ class __attribute__ ((packed)) DecimalValue {
   /// Returns a new decimal scaled by from src_type to dst_type.
   /// e.g. If this value was 1100 at scale 3 and the dst_type had scale two, the
   /// result would be 110. (In both cases representing the decimal 1.1).
+  /// 'round' determines the behavior in case of scaling down, i.e. whether 11.56 should
+  /// be 11.5 or 11.6.
   inline DecimalValue ScaleTo(int src_scale, int dst_scale, int dst_precision,
-      bool* overflow) const;
+      bool round, bool* overflow) const;
 
   /// Implementations of the basic arithmetic operators. In all these functions,
   /// we take the precision and scale of both inputs. The return type is assumed
diff --git a/be/src/runtime/decimal-value.inline.h b/be/src/runtime/decimal-value.inline.h
index 3af1d7a..2ae5743 100644
--- a/be/src/runtime/decimal-value.inline.h
+++ b/be/src/runtime/decimal-value.inline.h
@@ -129,17 +129,19 @@ inline typename RESULT_T::underlying_type_t DecimalValue<T>::ToInt(int scale,
 
 template<typename T>
 inline DecimalValue<T> DecimalValue<T>::ScaleTo(int src_scale, int dst_scale,
-    int dst_precision, bool* overflow) const {
+    int dst_precision, bool round, bool* overflow) const {
   int delta_scale = src_scale - dst_scale;
   T result = value();
   T max_value = DecimalUtil::GetScaleMultiplier<T>(dst_precision);
   if (delta_scale >= 0) {
-    if (delta_scale != 0) result /= DecimalUtil::GetScaleMultiplier<T>(delta_scale);
+    if (delta_scale != 0) {
+      result = DecimalUtil::ScaleDownAndRound(result, delta_scale, round);
+    }
     // Even if we are decreasing the absolute unscaled value, we can still overflow.
     // This path is also used to convert between precisions so for example, converting
     // from 100 as decimal(3,0) to decimal(2,0) should be considered an overflow.
     *overflow |= abs(result) >= max_value;
-  } else if (delta_scale < 0) {
+  } else {
     T mult = DecimalUtil::GetScaleMultiplier<T>(-delta_scale);
     *overflow |= abs(result) >= max_value / mult;
     result = ArithmeticUtil::AsUnsigned<std::multiplies>(result, mult);
diff --git a/fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java
index 70def18..5981ddc 100644
--- a/fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java
@@ -118,16 +118,10 @@ public class IcebergCatalogOpExecutor {
    * Iceberg only supports these type conversions:
    *   INTEGER -> LONG
    *   FLOAT -> DOUBLE
-   *   DECIMAL(p1,s1) -> DECIMAL(p1,s2), same scale, p1<=p2 // But we disable DECIMAL
-   *                                                        // conversions due to
-   *                                                        // IMPALA-7087
+   *   DECIMAL(p1,s1) -> DECIMAL(p1,s2), same scale, p1<=p2
    */
   public static void alterColumn(FeIcebergTable feTable, String colName, TColumn newCol)
       throws TableLoadingException, ImpalaRuntimeException {
-    if (Type.fromThrift(newCol.getColumnType()).isDecimal()) {
-      // TODO: remove this if stmt when IMPALA-7087 is fixed.
-      throw new ImpalaRuntimeException("Cannot change DECIMAL column of Iceberg table.");
-    }
     UpdateSchema schema = IcebergUtil.getIcebergUpdateSchema(feTable);
     org.apache.iceberg.types.Type type =
         IcebergSchemaConverter.fromImpalaColumnType(newCol.getColumnType());
diff --git a/testdata/data/README b/testdata/data/README
index cdf0fb8..db8bdd7 100644
--- a/testdata/data/README
+++ b/testdata/data/README
@@ -656,3 +656,8 @@ https://github.com/apache/parquet-mr/blob/master/parquet-hadoop/src/test/java/or
 (65b95fb72be8f5a8a193a6f7bc4560fdcd742fc7).
 The schema was completely changed to allow us to test types supported in Parquet Bloom
 filters.
+
+binary_decimal_precision_and_scale_widening.parquet
+Parquet file written with schema (decimal(9,2), decimal(18,2), decimal(38,2)). The rows
+inside the file are carefully chosen so that they don't cause an overflow when being read
+by an Impala table with a higher precision/scale.
diff --git a/testdata/data/binary_decimal_precision_and_scale_widening.parquet b/testdata/data/binary_decimal_precision_and_scale_widening.parquet
new file mode 100644
index 0000000..040d788
Binary files /dev/null and b/testdata/data/binary_decimal_precision_and_scale_widening.parquet differ
diff --git a/testdata/workloads/functional-query/queries/QueryTest/iceberg-alter.test b/testdata/workloads/functional-query/queries/QueryTest/iceberg-alter.test
index 937fb78..ed64e55 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/iceberg-alter.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/iceberg-alter.test
@@ -264,3 +264,24 @@ NULL,NULL
 ---- TYPES
 DECIMAL,BIGINT
 ====
+---- QUERY
+# Iceberg allows changing precision to a higher value.
+ALTER TABLE ice_alter_cols DROP COLUMN bi;
+ALTER TABLE ice_alter_cols CHANGE COLUMN x d DECIMAL (22, 3);
+SELECT * FROM ice_alter_cols;
+---- RESULTS
+1.618
+1.110
+2.345
+NULL
+NULL
+---- TYPES
+DECIMAL
+====
+---- QUERY
+DESCRIBE ice_alter_cols;
+---- RESULTS
+'d','decimal(22,3)','','true'
+---- TYPES
+STRING,STRING,STRING,STRING
+====
diff --git a/testdata/workloads/functional-query/queries/QueryTest/iceberg-negative.test b/testdata/workloads/functional-query/queries/QueryTest/iceberg-negative.test
index 7aae524..2752e8f 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/iceberg-negative.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/iceberg-negative.test
@@ -268,12 +268,6 @@ ALTER TABLE iceberg_table_hadoop_catalog REPLACE COLUMNS(level INT, register_tim
 AnalysisException: ALTER TABLE REPLACE COLUMNS is not supported on Iceberg tables.
 ====
 ---- QUERY
-CREATE TABLE test_decimal_conversion (d decimal(6,2)) STORED AS ICEBERG;
-ALTER TABLE test_decimal_conversion CHANGE COLUMN d d decimal(8,2);
----- CATCH
-Cannot change DECIMAL column of Iceberg table.
-====
----- QUERY
 CREATE TABLE iceberg_transactional(i int)
 STORED AS ICEBERG
 tblproperties('iceberg.catalog'='hadoop.tables', 'transactional'='true');
diff --git a/testdata/workloads/functional-query/queries/QueryTest/parquet-decimal-precision-and-scale-altering.test b/testdata/workloads/functional-query/queries/QueryTest/parquet-decimal-precision-and-scale-altering.test
new file mode 100644
index 0000000..a242737
--- /dev/null
+++ b/testdata/workloads/functional-query/queries/QueryTest/parquet-decimal-precision-and-scale-altering.test
@@ -0,0 +1,236 @@
+====
+---- QUERY
+CREATE TABLE test_dec (d decimal(6, 3)) STORED AS PARQUET;
+INSERT INTO test_dec VALUES (23.633), (11.151), (-23.672), (-23.154);
+SELECT * FROM test_dec;
+---- RESULTS
+23.633
+11.151
+-23.672
+-23.154
+---- TYPES
+DECIMAL
+====
+---- QUERY
+ALTER TABLE test_dec CHANGE COLUMN d d DECIMAL(8, 3);
+SELECT * FROM test_dec where d = 23.633;
+---- RESULTS
+23.633
+---- TYPES
+DECIMAL
+====
+---- QUERY
+ALTER TABLE test_dec CHANGE COLUMN d d DECIMAL(8, 4);
+SELECT * FROM test_dec where d != 0;
+---- RESULTS
+23.6330
+11.1510
+-23.6720
+-23.1540
+---- TYPES
+DECIMAL
+====
+---- QUERY
+ALTER TABLE test_dec CHANGE COLUMN d d DECIMAL(10, 3);
+SELECT * FROM test_dec where d < 0;
+---- RESULTS
+-23.672
+-23.154
+---- TYPES
+DECIMAL
+====
+---- QUERY
+ALTER TABLE test_dec CHANGE COLUMN d d DECIMAL(10, 5);
+SELECT * FROM test_dec where d > 23.63;
+---- RESULTS
+23.63300
+---- TYPES
+DECIMAL
+====
+---- QUERY
+ALTER TABLE test_dec CHANGE COLUMN d d DECIMAL(32, 8);
+SELECT * FROM test_dec where d > 0;
+---- RESULTS
+23.63300000
+11.15100000
+---- TYPES
+DECIMAL
+====
+---- QUERY
+SELECT * FROM test_dec where d < 0;
+---- RESULTS
+-23.67200000
+-23.15400000
+---- TYPES
+DECIMAL
+====
+---- QUERY
+ALTER TABLE test_dec CHANGE COLUMN d d DECIMAL(10, 2);
+SELECT * FROM test_dec where d = 11.15;
+---- RESULTS
+11.15
+---- TYPES
+DECIMAL
+====
+---- QUERY
+SELECT * FROM test_dec;
+---- RESULTS
+23.63
+11.15
+-23.67
+-23.15
+---- TYPES
+DECIMAL
+====
+---- QUERY
+ALTER TABLE test_dec CHANGE COLUMN d d DECIMAL(10, 1);
+SELECT * FROM test_dec where d = 11.2;
+---- RESULTS
+11.2
+---- TYPES
+DECIMAL
+====
+---- QUERY
+ALTER TABLE test_dec CHANGE COLUMN d d DECIMAL(10, 1);
+SELECT * FROM test_dec where d < 0;
+---- RESULTS
+-23.7
+-23.2
+---- TYPES
+DECIMAL
+====
+---- QUERY
+SELECT * FROM test_dec where d = -23.7;
+---- RESULTS
+-23.7
+---- TYPES
+DECIMAL
+====
+---- QUERY
+ALTER TABLE test_dec CHANGE COLUMN d d DECIMAL(20, 0);
+SELECT * FROM test_dec where d = 24;
+---- RESULTS
+24
+---- TYPES
+DECIMAL
+====
+---- QUERY
+SELECT * FROM test_dec where d = -23;
+---- RESULTS
+-23
+---- TYPES
+DECIMAL
+====
+---- QUERY
+ALTER TABLE test_dec CHANGE COLUMN d d DECIMAL(32, 8);
+INSERT INTO test_dec values (100000000.9999);
+SELECT * FROM test_dec where d > 0;
+---- RESULTS
+23.63300000
+11.15100000
+100000000.99990000
+---- TYPES
+DECIMAL
+====
+---- QUERY
+# Test runtime filters with equi-joins.
+set parquet_page_row_count_limit=1;
+create table deci_left (d decimal(6,3)) sort by (d) stored as parquet;
+create table deci_right (d decimal (6, 3), b boolean) stored as parquet;
+insert into deci_left values (123.123), (222.566), (-123.971);
+insert into deci_right values (123.123, false), (222.566, true), (-123.97, true);
+====
+---- QUERY
+# At first only change the left side.
+set minmax_filter_threshold=1.0;
+set minmax_filtering_level=row;
+alter table deci_left change column d d decimal(6, 2);
+select deci_left.d, deci_right.b
+from deci_left join deci_right on (deci_left.d = deci_right.d)
+where b = true;
+---- RESULTS
+-123.97,true
+---- TYPES
+DECIMAL,BOOLEAN
+====
+---- QUERY
+set minmax_filter_threshold=1.0;
+set minmax_filtering_level=row;
+alter table deci_right change column d d decimal(6, 2);
+select deci_left.d, deci_right.b
+from deci_left join deci_right on (deci_left.d = deci_right.d)
+where b = true;
+---- RESULTS
+-123.97,true
+222.57,true
+---- TYPES
+DECIMAL, BOOLEAN
+====
+---- QUERY
+set minmax_filter_threshold=1.0;
+set minmax_filtering_level=row;
+select deci_left.d, deci_right.b
+from deci_left join deci_right on (deci_left.d = deci_right.d)
+where b = false;
+---- RESULTS
+123.12,false
+---- TYPES
+DECIMAL, BOOLEAN
+====
+---- QUERY
+set minmax_filter_threshold=1.0;
+set minmax_filtering_level=row;
+alter table deci_left change column d d decimal (6, 1);
+alter table deci_right change column d d decimal (6, 1);
+select deci_left.d, deci_right.b
+from deci_left join deci_right on (deci_left.d = deci_right.d)
+where b = true;
+---- RESULTS
+-124.0,true
+222.6,true
+---- TYPES
+DECIMAL, BOOLEAN
+====
+---- QUERY
+set minmax_filter_threshold=1.0;
+set minmax_filtering_level=row;
+select deci_left.d, deci_right.b
+from deci_left join deci_right on (deci_left.d = deci_right.d)
+where b = false;
+---- RESULTS
+123.1,false
+---- TYPES
+DECIMAL, BOOLEAN
+====
+---- QUERY
+set parquet_page_row_count_limit=1;
+set minmax_filter_threshold=1.0;
+set minmax_filtering_level=row;
+insert into deci_left values (356.7), (-200.9);
+insert into deci_right values (356.7, true), (-200.9, true);
+select deci_left.d, deci_right.b
+from deci_left join deci_right on (deci_left.d = deci_right.d)
+where b = true;
+---- RESULTS
+-124.0,true
+222.6,true
+-200.9,true
+356.7,true
+---- TYPES
+DECIMAL, BOOLEAN
+====
+---- QUERY
+set minmax_filter_threshold=1.0;
+set minmax_filtering_level=row;
+alter table deci_left change column d d decimal (8, 4);
+alter table deci_right change column d d decimal (8, 4);
+select deci_left.d, deci_right.b
+from deci_left join deci_right on (deci_left.d = deci_right.d)
+where b = true;
+---- RESULTS
+-200.9000,true
+356.7000,true
+222.5660,true
+---- TYPES
+DECIMAL, BOOLEAN
+====
diff --git a/testdata/workloads/functional-query/queries/QueryTest/parquet-decimal-precision-and-scale-widening.test b/testdata/workloads/functional-query/queries/QueryTest/parquet-decimal-precision-and-scale-widening.test
new file mode 100644
index 0000000..8bc22a8
--- /dev/null
+++ b/testdata/workloads/functional-query/queries/QueryTest/parquet-decimal-precision-and-scale-widening.test
@@ -0,0 +1,104 @@
+====
+---- QUERY
+select * from binary_decimal_precision_widening;
+---- RESULTS
+0.00,0.00,0.00
+255.00,255.00,255.00
+65535.00,65535.00,65535.00
+99999.99,99999.99,99999.99
+0.00,99999999999999.99,9999999999999999999999999999999999.99
+-255.00,-255.00,-255.00
+-65535.00,-65535.00,-65535.00
+-99999.99,-99999.99,-99999.99
+0.00,-99999999999999.99,-9999999999999999999999999999999999.99
+---- TYPES
+DECIMAL,DECIMAL,DECIMAL
+====
+---- QUERY
+select * from binary_decimal_scale_widening;
+---- RESULTS
+0.0000,0.0000,0.0000
+255.0000,255.0000,255.0000
+65535.0000,65535.0000,65535.0000
+99999.9900,99999.9900,99999.9900
+0.0000,99999999999999.9900,9999999999999999999999999999999999.9900
+-255.0000,-255.0000,-255.0000
+-65535.0000,-65535.0000,-65535.0000
+-99999.9900,-99999.9900,-99999.9900
+0.0000,-99999999999999.9900,-9999999999999999999999999999999999.9900
+---- TYPES
+DECIMAL,DECIMAL,DECIMAL
+====
+---- QUERY
+select * from binary_decimal_precision_and_scale_widening;
+---- RESULTS
+0.0000,0.0000,0.0000
+255.0000,255.0000,255.0000
+65535.0000,65535.0000,65535.0000
+99999.9900,99999.9900,99999.9900
+0.0000,99999999999999.9900,9999999999999999999999999999999999.9900
+-255.0000,-255.0000,-255.0000
+-65535.0000,-65535.0000,-65535.0000
+-99999.9900,-99999.9900,-99999.9900
+0.0000,-99999999999999.9900,-9999999999999999999999999999999999.9900
+---- TYPES
+DECIMAL,DECIMAL,DECIMAL
+====
+---- QUERY
+select med_dec * 100000, med_dec * 0.000001 from
+binary_decimal_scale_widening;
+---- RESULTS
+0.0000,0.0000000000
+25500000.0000,0.0002550000
+6553500000.0000,0.0655350000
+9999999000.0000,0.0999999900
+9999999999999999000.0000,99999999.9999999900
+-25500000.0000,-0.0002550000
+-6553500000.0000,-0.0655350000
+-9999999000.0000,-0.0999999900
+-9999999999999999000.0000,-99999999.9999999900
+---- TYPES
+DECIMAL,DECIMAL
+====
+---- QUERY
+select score from int32_decimal_precision_and_scale_widening
+---- RESULTS
+12.340000
+24.560000
+34.123000
+---- TYPES
+DECIMAL
+====
+---- QUERY
+select score from int64_decimal_precision_and_scale_widening
+---- RESULTS
+12.34000000
+24.56000000
+34.12300000
+---- TYPES
+DECIMAL
+====
+---- QUERY
+select * from scale_overflow
+---- RESULTS
+0.0000,0.0000,0.0000
+255.0000,255.0000,255.0000
+65535.0000,65535.0000,65535.0000
+NULL,9999999.9900,9999999.9900
+0.0000,NULL,NULL
+-255.0000,-255.0000,-255.0000
+-65535.0000,-65535.0000,-65535.0000
+NULL,-9999999.9900,-9999999.9900
+0.0000,NULL,NULL
+---- TYPES
+DECIMAL,DECIMAL,DECIMAL
+====
+---- QUERY
+select * from binary_decimal_precision_and_scale_widening where small_dec > 0 and med_dec > 0 and large_dec > 0;
+---- RESULTS
+255.0000,255.0000,255.0000
+65535.0000,65535.0000,65535.0000
+99999.9900,99999.9900,99999.9900
+---- TYPES
+DECIMAL,DECIMAL,DECIMAL
+====
diff --git a/tests/query_test/test_scanners.py b/tests/query_test/test_scanners.py
index 6b4a83c..530e2d9 100644
--- a/tests/query_test/test_scanners.py
+++ b/tests/query_test/test_scanners.py
@@ -1051,6 +1051,61 @@ class TestParquet(ImpalaTestSuite):
         else:
           assert summary.total_num_values == 11
 
+  def test_decimal_precision_and_scale_widening(self, vector, unique_database):
+    """IMPALA-7087: Tests that Parquet files stored with a lower precision or scale than
+       the table metadata can be read by Impala.
+    """
+    # The file binary_decimal_precision_widening is written with schema (decimal(9,2),
+    # decimal(18,2), decimal(38,2))
+    binary_decimal_test_files =\
+        ["testdata/data/binary_decimal_precision_and_scale_widening.parquet"]
+
+    # Test reading Parquet files when the table has a higher precision than the file
+    create_table_and_copy_files(self.client, """create table if not exists {db}.{tbl}
+        (small_dec decimal(38,2), med_dec decimal(38,2), large_dec decimal(38,2))
+        STORED AS PARQUET""", unique_database, "binary_decimal_precision_widening",
+        binary_decimal_test_files)
+
+    # Test reading Parquet files when the table has a higher scale than the file
+    create_table_and_copy_files(self.client, """create table if not exists {db}.{tbl}
+        (small_dec decimal(9,4), med_dec decimal(18,4), large_dec decimal(38,4))
+        STORED AS PARQUET""", unique_database, "binary_decimal_scale_widening",
+        binary_decimal_test_files)
+
+    # Test reading Parquet files when the table has a higher precision and scale than the
+    # file
+    create_table_and_copy_files(self.client, """create table if not exists {db}.{tbl}
+        (small_dec decimal(38,4), med_dec decimal(38,4), large_dec decimal(38,4))
+        STORED AS PARQUET""", unique_database,
+        "binary_decimal_precision_and_scale_widening", binary_decimal_test_files)
+
+    # Test Parquet precision and scale widening when decimals are stored as INT32
+    create_table_and_copy_files(self.client, """create table if not exists {db}.{tbl}
+        (team string, score decimal(12, 6)) STORED AS PARQUET""", unique_database,
+        "int32_decimal_precision_and_scale_widening",
+        ["testdata/data/decimal_stored_as_int32.parquet"])
+
+    # Test Parquet precision and scale widening when decimals are stored as INT64
+    create_table_and_copy_files(self.client, """create table if not exists {db}.{tbl}
+        (team string, score decimal(32, 8)) STORED AS PARQUET""", unique_database,
+        "int64_decimal_precision_and_scale_widening",
+        ["testdata/data/decimal_stored_as_int64.parquet"])
+
+    # Unlike the file binary_decimal_precision_and_scale_widening.parquet, all the values
+    # in binary_decimal_no_dictionary.parquet cannot be converted to a higher scale
+    # without overflowing
+    create_table_and_copy_files(self.client, """create table if not exists {db}.{tbl}
+        (small_dec decimal(9,4), med_dec decimal(18,4), large_dec decimal(38,4))
+        STORED AS PARQUET""", unique_database, "scale_overflow",
+        ["testdata/data/binary_decimal_no_dictionary.parquet"])
+
+    self.run_test_case("QueryTest/parquet-decimal-precision-and-scale-widening", vector,
+                       unique_database)
+
+  def test_decimal_precision_and_scale_altering(self, vector, unique_database):
+    self.run_test_case(
+        "QueryTest/parquet-decimal-precision-and-scale-altering", vector, unique_database)
+
 
 # We use various scan range lengths to exercise corner cases in the HDFS scanner more
 # thoroughly. In particular, it will exercise: