You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by bo...@apache.org on 2021/09/06 09:31:23 UTC

[impala] branch master updated: IMPALA-10879: Add parquet stats to iceberg manifest

This is an automated email from the ASF dual-hosted git repository.

boroknagyz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new c8aa579  IMPALA-10879: Add parquet stats to iceberg manifest
c8aa579 is described below

commit c8aa5796d93510723342055cc70cf8d00abae754
Author: Attila Jeges <at...@cloudera.com>
AuthorDate: Fri Aug 6 16:51:57 2021 +0200

    IMPALA-10879: Add parquet stats to iceberg manifest
    
    This patch adds parquet stats to iceberg manifest as per-datafile
    metrics.
    
    The following metrics are supported:
    - column_sizes :
      Map from column id to the total size on disk of all regions that
      store the column. Does not include bytes necessary to read other
      columns, like footers.
    
    - null_value_counts :
      Map from column id to number of null values in the column.
    
    - lower_bounds :
      Map from column id to lower bound in the column serialized as
      binary. Each value must be less than or equal to all non-null,
      non-NaN values in the column for the file.
    
    - upper_bounds :
      Map from column id to upper bound in the column serialized as
      binary. Each value must be greater than or equal to all non-null,
      non-Nan values in the column for the file.
    
    The corresponding parquet stats are collected by 'ColumnStats'
    (in 'min_value_', 'max_value_', 'null_count_' members) and
    'HdfsParquetTableWriter::BaseColumnWriter' (in
    'total_compressed_byte_size_' member).
    
    Testing:
    - New e2e test was added to verify that the metrics are written to the
      Iceberg manifest upon inserting data.
    - New e2e test was added to verify that lower_bounds/upper_bounds
      metrics are used to prune data files on querying iceberg tables.
    - Existing e2e tests were updated to work with the new behavior.
    - BE test for single-value serialization.
    
    Relevant Iceberg documentation:
    - Manifest:
      https://iceberg.apache.org/spec/#manifests
    - Values in lower_bounds and upper_bounds maps should be Single-value
      serialized to binary:
      https://iceberg.apache.org/spec/#appendix-d-single-value-serialization
    
    Change-Id: Ic31f2260bc6f6a7f307ac955ff05eb154917675b
    Reviewed-on: http://gerrit.cloudera.org:8080/17806
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
    Reviewed-by: Attila Jeges <at...@cloudera.com>
---
 be/src/exec/hdfs-table-sink.cc                     |   3 +-
 be/src/exec/hdfs-table-writer.h                    |  23 +-
 be/src/exec/parquet/CMakeLists.txt                 |   2 +
 be/src/exec/parquet/hdfs-parquet-table-writer.cc   |  18 +
 be/src/exec/parquet/hdfs-parquet-table-writer.h    |   3 +
 be/src/exec/parquet/parquet-column-stats.h         |  15 +
 be/src/exec/parquet/parquet-column-stats.inline.h  |  72 +++
 be/src/exec/parquet/serialize-single-value-test.cc |  66 +++
 be/src/runtime/dml-exec-state.cc                   |  84 +++-
 be/src/runtime/dml-exec-state.h                    |   6 +-
 be/src/util/bit-util.h                             |  12 +
 common/fbs/IcebergObjects.fbs                      |   9 +
 common/protobuf/control_service.proto              |   4 +
 .../impala/service/IcebergCatalogOpExecutor.java   |  33 +-
 infra/python/deps/requirements.txt                 |   1 +
 .../iceberg-partition-transform-insert.test        |  20 +-
 .../QueryTest/iceberg-partitioned-insert.test      |   5 +-
 .../iceberg-upper-lower-bound-metrics.test         | 500 +++++++++++++++++++++
 tests/query_test/test_iceberg.py                   | 182 ++++++++
 19 files changed, 1025 insertions(+), 33 deletions(-)

diff --git a/be/src/exec/hdfs-table-sink.cc b/be/src/exec/hdfs-table-sink.cc
index 0395c22..c1b0e87 100644
--- a/be/src/exec/hdfs-table-sink.cc
+++ b/be/src/exec/hdfs-table-sink.cc
@@ -735,7 +735,8 @@ Status HdfsTableSink::FinalizePartitionFile(
     state->dml_exec_state()->UpdatePartition(
         partition->partition_name, partition->current_file_rows,
         &partition->writer->stats());
-    state->dml_exec_state()->AddCreatedFile(*partition);
+    state->dml_exec_state()->AddCreatedFile(*partition, IsIceberg(),
+        partition->writer->iceberg_file_stats());
   }
 
   RETURN_IF_ERROR(ClosePartitionFile(state, partition));
diff --git a/be/src/exec/hdfs-table-writer.h b/be/src/exec/hdfs-table-writer.h
index 2d92829..7de3f4f 100644
--- a/be/src/exec/hdfs-table-writer.h
+++ b/be/src/exec/hdfs-table-writer.h
@@ -20,7 +20,7 @@
 #define IMPALA_EXEC_HDFS_TABLE_WRITER_H
 
 #include <vector>
-#include <hdfs.h>
+#include "common/hdfs.h"
 
 #include "common/status.h"
 #include "gen-cpp/control_service.pb.h"
@@ -35,6 +35,20 @@ class RowBatch;
 class RuntimeState;
 class ScalarExprEvaluator;
 
+/// Per column statistics to be written to Iceberg manifest files (for each data file).
+/// min_binary and max_binary members contain Single-Value serialized lower and upper
+/// column stats
+/// (https://iceberg.apache.org/spec/#appendix-d-single-value-serialization).
+struct IcebergColumnStats {
+  bool has_min_max_values;
+  std::string min_binary;
+  std::string max_binary;
+  int64_t null_count;
+  int64_t column_size;
+};
+
+typedef std::unordered_map<int, IcebergColumnStats> IcebergFileStats;
+
 /// Pure virtual class for writing to hdfs table partition files.
 /// Subclasses implement the code needed to write to a specific file type.
 /// A subclass needs to implement functions to format and add rows to the file
@@ -90,6 +104,9 @@ class HdfsTableWriter {
   /// Returns the stats for this writer.
   const DmlStatsPB& stats() const { return stats_; }
 
+  /// Returns the stats for the latest iceberg file written by this writer.
+  const IcebergFileStats& iceberg_file_stats() const { return iceberg_file_stats_; }
+
   /// Default block size to use for this file format.  If the file format doesn't
   /// care, it should return 0 and the hdfs config default will be used.
   virtual uint64_t default_block_size() const = 0;
@@ -134,6 +151,10 @@ class HdfsTableWriter {
 
   /// Subclass should populate any file format specific stats.
   DmlStatsPB stats_;
+
+  /// Contains the per-column stats for the latest file written by this writer.
+  /// Used with iceberg only.
+  IcebergFileStats iceberg_file_stats_;
 };
 }
 #endif
diff --git a/be/src/exec/parquet/CMakeLists.txt b/be/src/exec/parquet/CMakeLists.txt
index 0a3efa9..b121fe5 100644
--- a/be/src/exec/parquet/CMakeLists.txt
+++ b/be/src/exec/parquet/CMakeLists.txt
@@ -48,6 +48,7 @@ add_library(ParquetTests STATIC
   parquet-page-index-test.cc
   parquet-plain-test.cc
   parquet-version-test.cc
+  serialize-single-value-test.cc
 )
 add_dependencies(ParquetTests gen-deps)
 
@@ -57,4 +58,5 @@ ADD_UNIFIED_BE_LSAN_TEST(parquet-page-index-test ParquetPageIndex.*)
 ADD_UNIFIED_BE_LSAN_TEST(parquet-plain-test PlainEncoding.*)
 ADD_UNIFIED_BE_LSAN_TEST(parquet-version-test ParquetVersionTest.*)
 ADD_UNIFIED_BE_LSAN_TEST(hdfs-parquet-scanner-test HdfsParquetScannerTest.*)
+ADD_UNIFIED_BE_LSAN_TEST(serialize-single-value-test SerializeSingleValueTest.*)
 
diff --git a/be/src/exec/parquet/hdfs-parquet-table-writer.cc b/be/src/exec/parquet/hdfs-parquet-table-writer.cc
index b022b66..82746e4 100644
--- a/be/src/exec/parquet/hdfs-parquet-table-writer.cc
+++ b/be/src/exec/parquet/hdfs-parquet-table-writer.cc
@@ -1673,9 +1673,19 @@ Status HdfsParquetTableWriter::WriteParquetBloomFilter(BaseColumnWriter* col_wri
   return Status::OK();
 }
 
+void HdfsParquetTableWriter::CollectIcebergDmlFileColumnStats(int field_id,
+    const BaseColumnWriter* col_writer) {
+  // Each data file consists of a single row group, so row group null_count / min / max
+  // stats can be used as data file stats.
+  // Get column_size from column writer.
+  col_writer->row_group_stats_base_->GetIcebergStats(col_writer->total_compressed_size(),
+      &iceberg_file_stats_[field_id]);
+}
+
 Status HdfsParquetTableWriter::FlushCurrentRowGroup() {
   if (current_row_group_ == nullptr) return Status::OK();
 
+  const int num_clustering_cols = table_desc_->num_clustering_cols();
   for (int i = 0; i < columns_.size(); ++i) {
     int64_t data_page_offset, dict_page_offset;
     // Flush this column.  This updates the final metadata sizes for this column.
@@ -1701,6 +1711,14 @@ Status HdfsParquetTableWriter::FlushCurrentRowGroup() {
         parquet_dml_stats_.mutable_per_column_size();
     (*column_size_map)[col_name] += col_writer->total_compressed_size();
 
+    if (is_iceberg_file_ ) {
+      const ColumnDescriptor& col_desc =
+          table_desc_->col_descs()[i + num_clustering_cols];
+      DCHECK_EQ(col_desc.name(), col_name);
+      const int field_id = col_desc.field_id();
+      if (field_id != -1) CollectIcebergDmlFileColumnStats(field_id, col_writer);
+    }
+
     // Write encodings and encoding stats for this column
     col_metadata.encodings.clear();
     for (parquet::Encoding::type encoding : col_writer->column_encodings_) {
diff --git a/be/src/exec/parquet/hdfs-parquet-table-writer.h b/be/src/exec/parquet/hdfs-parquet-table-writer.h
index a13a517..862b9cb 100644
--- a/be/src/exec/parquet/hdfs-parquet-table-writer.h
+++ b/be/src/exec/parquet/hdfs-parquet-table-writer.h
@@ -156,6 +156,9 @@ class HdfsParquetTableWriter : public HdfsTableWriter {
   Status WriteParquetBloomFilter(BaseColumnWriter* col_writer,
       parquet::ColumnMetaData* meta_data) WARN_UNUSED_RESULT;
 
+  /// Add per-column statistics to 'iceberg_file_stats_' for the current data file.
+  void CollectIcebergDmlFileColumnStats(int field_id, const BaseColumnWriter* col_writer);
+
   /// Flushes the current row group to file.  This will compute the final
   /// offsets of column chunks, updating the file metadata.
   Status FlushCurrentRowGroup();
diff --git a/be/src/exec/parquet/parquet-column-stats.h b/be/src/exec/parquet/parquet-column-stats.h
index 5615d15..83128c7 100644
--- a/be/src/exec/parquet/parquet-column-stats.h
+++ b/be/src/exec/parquet/parquet-column-stats.h
@@ -21,6 +21,7 @@
 #include <string>
 #include <type_traits>
 
+#include "exec/hdfs-table-writer.h"
 #include "exec/parquet/parquet-common.h"
 #include "runtime/date-value.h"
 #include "runtime/decimal-value.h"
@@ -112,6 +113,10 @@ class ColumnStatsBase {
   /// Encodes the current values into a Statistics thrift message.
   virtual void EncodeToThrift(parquet::Statistics* out) const = 0;
 
+  /// Writes the current values into IcebergColumnStats struct.
+  /// Min and max stats are Single-value serialized.
+  virtual void GetIcebergStats(int64_t column_size, IcebergColumnStats* out) const = 0;
+
   /// Resets the state of this object.
   void Reset();
 
@@ -203,6 +208,9 @@ class ColumnStats : public ColumnStatsBase {
   virtual int64_t BytesNeeded() const override;
   virtual void EncodeToThrift(parquet::Statistics* out) const override;
 
+  virtual void GetIcebergStats(int64_t column_size,
+      IcebergColumnStats* out) const override;
+
   /// Decodes the plain encoded stats value from 'buffer' and writes the result into the
   /// buffer pointed to by 'slot'. Returns true if decoding was successful, false
   /// otherwise. For timestamps and dates an additional validation will be performed.
@@ -210,11 +218,18 @@ class ColumnStats : public ColumnStatsBase {
       parquet::Type::type parquet_type);
 
  protected:
+  /// For BE tests.
+  FRIEND_TEST(SerializeSingleValueTest, Decimal);
+
   /// Encodes a single value using parquet's plain encoding and stores it into the binary
   /// string 'out'. String values are stored without additional encoding. 'bytes_needed'
   /// must be positive.
   static void EncodePlainValue(const T& v, int64_t bytes_needed, std::string* out);
 
+  /// Single-value serialize value 'v'.
+  /// https://iceberg.apache.org/spec/#appendix-d-single-value-serialization
+  static void SerializeIcebergSingleValue(const T& v, std::string* out);
+
   /// Returns the number of bytes needed to encode value 'v'.
   int64_t BytesNeeded(const T& v) const;
 
diff --git a/be/src/exec/parquet/parquet-column-stats.inline.h b/be/src/exec/parquet/parquet-column-stats.inline.h
index eb29e1c..6c9c91a 100644
--- a/be/src/exec/parquet/parquet-column-stats.inline.h
+++ b/be/src/exec/parquet/parquet-column-stats.inline.h
@@ -22,6 +22,7 @@
 #include "gen-cpp/parquet_types.h"
 #include "parquet-column-stats.h"
 #include "runtime/string-value.inline.h"
+#include "util/bit-util.h"
 
 namespace impala {
 
@@ -312,5 +313,76 @@ inline Status ColumnStats<StringValue>::MaterializeStringValuesToInternalBuffers
   return Status::OK();
 }
 
+template <typename T>
+inline void ColumnStats<T>::GetIcebergStats(
+    int64_t column_size, IcebergColumnStats* out) const {
+  DCHECK(out != nullptr);
+  out->has_min_max_values = has_min_max_values_;
+  if (out->has_min_max_values) {
+    SerializeIcebergSingleValue(min_value_, &out->min_binary);
+    SerializeIcebergSingleValue(max_value_, &out->max_binary);
+  }
+  out->null_count = null_count_;
+  // Column size is not traced by ColumnStats.
+  out->column_size = column_size;
+}
+
+// Works for bool(TYPE_BOOLEAN), int32_t(TYPE_INT), int64_t(TYPE_BIGINT, TYPE_TIMESTAMP),
+// float(TYPE_FLOAT), double(TYPE_DOUBLE), DateValue (TYPE_DATE).
+// Serialize values as sizeof(T)-byte little endian.
+template <typename T>
+inline void ColumnStats<T>::SerializeIcebergSingleValue(const T& v, std::string* out) {
+  static_assert(sizeof(T) == 1 || sizeof(T) == 2 || sizeof(T) == 4 || sizeof(T) == 8);
+#if __BYTE_ORDER == __LITTLE_ENDIAN
+  const char* data = reinterpret_cast<const char*>(&v);
+  out->assign(data, sizeof(T));
+#else
+  char little_endian[sizeof(T)] = {};
+  BitUtil::ByteSwap(little_endian, reinterpret_cast<const char*>(&v), sizeof(T));
+  out->assign(little_endian, sizeof(T));
+#endif
+}
+
+// This should never be called.
+// With iceberg timestamp values are stored as int64_t with microseconds precision
+// (HdfsParquetTableWriter::timestamp_type_ is set to INT64_MICROS).
+template <>
+inline void ColumnStats<TimestampValue>::SerializeIcebergSingleValue(
+    const TimestampValue& v, std::string* out) {
+  DCHECK(false);
+}
+
+// Serialize StringValue values as UTF-8 bytes (without length).
+template <>
+inline void ColumnStats<StringValue>::SerializeIcebergSingleValue(
+    const StringValue& v, std::string* out) {
+  // With iceberg 'v' is UTF-8 encoded (HdfsParquetTableWriter::string_utf8_ is always set
+  // to true).
+  out->assign(v.ptr, v.len);
+}
+
+// Serialize Decimal4Value / Decimal8Value / Decimal16Value: values as unscaled,
+// two’s-complement big-endian binary, using the minimum number of bytes for the value.
+#define SERIALIZE_ICEBERG_DECIMAL_SINGLE_VALUE(DecimalValue)                            \
+template <>                                                                             \
+inline void ColumnStats<DecimalValue>::SerializeIcebergSingleValue(                     \
+    const DecimalValue& v, std::string* out) {                                          \
+  const auto big_endian_val = BitUtil::ToBigEndian(v.value());                          \
+  const uint8_t* first = reinterpret_cast<const uint8_t*>(&big_endian_val);             \
+  const uint8_t* last = first + sizeof(big_endian_val) - 1;                             \
+  if ((first[0] & 0x80) != 0) {                                                         \
+    for (; first < last && first[0] == 0xFF && (first[1] & 0x80) != 0; ++first) {       \
+    }                                                                                   \
+  } else {                                                                              \
+    for (; first < last && first[0] == 0x00 && (first[1] & 0x80) == 0; ++first) {       \
+    }                                                                                   \
+  }                                                                                     \
+  out->assign(reinterpret_cast<const char*>(first), last - first + 1);                  \
+}
+
+SERIALIZE_ICEBERG_DECIMAL_SINGLE_VALUE(Decimal4Value)
+SERIALIZE_ICEBERG_DECIMAL_SINGLE_VALUE(Decimal8Value)
+SERIALIZE_ICEBERG_DECIMAL_SINGLE_VALUE(Decimal16Value)
+
 } // end ns impala
 #endif
diff --git a/be/src/exec/parquet/serialize-single-value-test.cc b/be/src/exec/parquet/serialize-single-value-test.cc
new file mode 100644
index 0000000..65e76a6
--- /dev/null
+++ b/be/src/exec/parquet/serialize-single-value-test.cc
@@ -0,0 +1,66 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <string>
+
+#include "parquet-column-stats.inline.h"
+#include "runtime/decimal-value.inline.h"
+#include "testutil/gtest-util.h"
+
+#include "common/names.h"
+
+namespace impala {
+
+TEST(SerializeSingleValueTest, Decimal) {
+  Decimal4Value d4;
+  Decimal8Value d8;
+  bool overflow = false;
+  std::string out;
+
+  d4 = Decimal4Value::FromInt(9, 4, -1, &overflow);
+  EXPECT_FALSE(overflow);
+  ColumnStats<Decimal4Value>::SerializeIcebergSingleValue(d4, &out);
+  // -10000 is 0xffffd8f0, but the result contains only the minimum number of bytes.
+  // Only 2 bytes are required.
+  EXPECT_EQ(std::string("\xD8\xF0", 2), out);
+
+  d4 = Decimal4Value::FromInt(9, 0, -1, &overflow);
+  EXPECT_FALSE(overflow);
+  ColumnStats<Decimal4Value>::SerializeIcebergSingleValue(d4, &out);
+  // -1 is 0xffffffff, but the result contains only the minimum number of bytes.
+  // Only 1 byte is required.
+  EXPECT_EQ(std::string("\xFF", 1), out);
+
+  d8 = Decimal8Value::FromInt(18, 0, -2147483647, &overflow);
+  EXPECT_FALSE(overflow);
+  ColumnStats<Decimal8Value>::SerializeIcebergSingleValue(d8, &out);
+  // -2147483647 is 0xffffffff80000001. Only 4 bytes are required.
+  EXPECT_EQ(std::string("\x80\x00\x00\x01", 4), out);
+
+  d4 = Decimal4Value::FromInt(9, 4, 0, &overflow);
+  EXPECT_FALSE(overflow);
+  ColumnStats<Decimal4Value>::SerializeIcebergSingleValue(d4, &out);
+  // 0 is 0x00, 1 byte is required
+  EXPECT_EQ(std::string("\x00", 1), out);
+
+  d8 = Decimal8Value::FromInt(18, 0, 2147483647, &overflow);
+  EXPECT_FALSE(overflow);
+  ColumnStats<Decimal8Value>::SerializeIcebergSingleValue(d8, &out);
+  // 2147483647 is 0x000000007fffffff. Only 4 bytes are required.
+  EXPECT_EQ(std::string("\x7F\xFF\xFF\xFF", 4), out);
+}
+}
diff --git a/be/src/runtime/dml-exec-state.cc b/be/src/runtime/dml-exec-state.cc
index 5faad8d..862eab8 100644
--- a/be/src/runtime/dml-exec-state.cc
+++ b/be/src/runtime/dml-exec-state.cc
@@ -463,8 +463,66 @@ void DmlExecState::UpdatePartition(const string& partition_name,
   MergeDmlStats(*insert_stats, entry->second.mutable_stats());
 }
 
-void DmlExecState::AddCreatedFile(const OutputPartition& partition) {
+namespace {
+flatbuffers::Offset<org::apache::impala::fb::FbIcebergColumnStats>
+createIcebergColumnStats(
+    flatbuffers::FlatBufferBuilder& fbb, int field_id,
+    const IcebergColumnStats& col_stats) {
   using namespace org::apache::impala::fb;
+
+  flatbuffers::Offset<flatbuffers::Vector<uint8_t>> lower_bound;
+  flatbuffers::Offset<flatbuffers::Vector<uint8_t>> upper_bound;
+  if (col_stats.has_min_max_values) {
+    const std::string& min_binary = col_stats.min_binary;
+    const uint8_t* data = reinterpret_cast<const uint8_t*>(min_binary.data());
+    lower_bound = fbb.CreateVector(data, min_binary.size());
+
+    const std::string& max_binary = col_stats.max_binary;
+    data = reinterpret_cast<const uint8_t*>(max_binary.data());
+    upper_bound = fbb.CreateVector(data, max_binary.size());
+  }
+
+  FbIcebergColumnStatsBuilder stats_builder(fbb);
+  stats_builder.add_field_id(field_id);
+
+  stats_builder.add_total_compressed_byte_size(col_stats.column_size);
+  stats_builder.add_null_count(col_stats.null_count);
+
+  if (col_stats.has_min_max_values) {
+    stats_builder.add_lower_bound(lower_bound);
+    stats_builder.add_upper_bound(upper_bound);
+  }
+
+  return stats_builder.Finish();
+}
+
+string createIcebergDataFileString(
+    const string& partition_name, const string& final_path, int64_t num_rows,
+    int64_t file_size, const IcebergFileStats& insert_stats) {
+  using namespace org::apache::impala::fb;
+  flatbuffers::FlatBufferBuilder fbb;
+
+  vector<flatbuffers::Offset<FbIcebergColumnStats>> ice_col_stats_vec;
+  for (auto it = insert_stats.cbegin(); it != insert_stats.cend(); ++it) {
+    ice_col_stats_vec.push_back(createIcebergColumnStats(fbb, it->first, it->second));
+  }
+
+  flatbuffers::Offset<FbIcebergDataFile> data_file = CreateFbIcebergDataFile(fbb,
+      fbb.CreateString(final_path),
+      // Currently we can only write Parquet to Iceberg
+      FbFileFormat::FbFileFormat_PARQUET,
+      num_rows,
+      file_size,
+      fbb.CreateString(partition_name),
+      fbb.CreateVector(ice_col_stats_vec));
+  fbb.Finish(data_file);
+  return string(reinterpret_cast<char*>(fbb.GetBufferPointer()), fbb.GetSize());
+}
+
+}
+
+void DmlExecState::AddCreatedFile(const OutputPartition& partition, bool is_iceberg,
+    const IcebergFileStats& insert_stats) {
   lock_guard<mutex> l(lock_);
   const string& partition_name = partition.partition_name;
   PartitionStatusMap::iterator entry = per_partition_status_.find(partition_name);
@@ -478,21 +536,11 @@ void DmlExecState::AddCreatedFile(const OutputPartition& partition) {
   }
   file->set_num_rows(partition.current_file_rows);
   file->set_size(partition.current_file_bytes);
-}
-
-string createIcebergDataFileString(
-    const string& partition_name, const DmlFileStatusPb& file) {
-  using namespace org::apache::impala::fb;
-  flatbuffers::FlatBufferBuilder fbb;
-  flatbuffers::Offset<FbIcebergDataFile> data_file = CreateFbIcebergDataFile(fbb,
-      fbb.CreateString(file.final_path()),
-      // Currently we can only write Parquet to Iceberg
-      FbFileFormat::FbFileFormat_PARQUET,
-      file.num_rows(),
-      file.size(),
-      fbb.CreateString(partition_name));
-  fbb.Finish(data_file);
-  return string(reinterpret_cast<char*>(fbb.GetBufferPointer()), fbb.GetSize());
+  if (is_iceberg) {
+    file->set_iceberg_data_file_fb(
+        createIcebergDataFileString(partition_name, file->final_path(), file->num_rows(),
+        file->size(), insert_stats));
+  }
 }
 
 vector<string> DmlExecState::CreateIcebergDataFilesVector() {
@@ -501,7 +549,9 @@ vector<string> DmlExecState::CreateIcebergDataFilesVector() {
   for (const PartitionStatusMap::value_type& partition : per_partition_status_) {
     for (int i = 0; i < partition.second.created_files_size(); ++i) {
       const DmlFileStatusPb& file = partition.second.created_files(i);
-      ret.emplace_back(createIcebergDataFileString(partition.first, file));
+      if (file.has_iceberg_data_file_fb()) {
+        ret.push_back(file.iceberg_data_file_fb());
+      }
     }
   }
   return ret;
diff --git a/be/src/runtime/dml-exec-state.h b/be/src/runtime/dml-exec-state.h
index c6b9a1a..ecb150f 100644
--- a/be/src/runtime/dml-exec-state.h
+++ b/be/src/runtime/dml-exec-state.h
@@ -24,12 +24,14 @@
 
 #include "common/hdfs.h"
 #include "common/status.h"
+#include "exec/hdfs-table-writer.h"
 
 namespace impala {
 
 class DmlExecStatusPB;
 class DmlPartitionStatusPB;
 class DmlStatsPB;
+class DmlDataFileStatsPB;
 struct OutputPartition;
 class TDmlResult;
 class TFinalizeParams;
@@ -71,7 +73,9 @@ class DmlExecState {
       int64_t num_modified_rows_delta, const DmlStatsPB* insert_stats);
 
   /// Extract information from 'partition', and add a new Iceberg data file.
-  void AddCreatedFile(const OutputPartition& partition);
+  /// 'insert_stats' contains stats for the Iceberg data file.
+  void AddCreatedFile(const OutputPartition& partition, bool is_iceberg,
+      const IcebergFileStats& insert_stats);
 
   /// Used to initialize this state when execute Kudu DML. Must be called before
   /// SetKuduDmlStats().
diff --git a/be/src/util/bit-util.h b/be/src/util/bit-util.h
index 87ab094..b813880 100644
--- a/be/src/util/bit-util.h
+++ b/be/src/util/bit-util.h
@@ -204,6 +204,11 @@ class BitUtil {
 /// Converts to big endian format (if not already in big endian) from the
 /// machine's native endian format.
 #if __BYTE_ORDER == __LITTLE_ENDIAN
+  static inline __int128_t ToBigEndian(__int128_t value) {
+    __int128_t res = 0;
+    ByteSwap(&res, &value, sizeof(__int128_t));
+    return res;
+  }
   static inline int64_t ToBigEndian(int64_t value) { return ByteSwap(value); }
   static inline uint64_t ToBigEndian(uint64_t value) { return ByteSwap(value); }
   static inline int32_t ToBigEndian(int32_t value) { return ByteSwap(value); }
@@ -211,6 +216,7 @@ class BitUtil {
   static inline int16_t ToBigEndian(int16_t value) { return ByteSwap(value); }
   static inline uint16_t ToBigEndian(uint16_t value) { return ByteSwap(value); }
 #else
+  static inline __int128_t ToBigEndian(__int128_t val) { return val; }
   static inline int64_t ToBigEndian(int64_t val) { return val; }
   static inline uint64_t ToBigEndian(uint64_t val) { return val; }
   static inline int32_t ToBigEndian(int32_t val) { return val; }
@@ -221,6 +227,11 @@ class BitUtil {
 
 /// Converts from big endian format to the machine's native endian format.
 #if __BYTE_ORDER == __LITTLE_ENDIAN
+  static inline __int128_t FromBigEndian(__int128_t value) {
+    __int128_t res = 0;
+    ByteSwap(&res, &value, sizeof(__int128_t));
+    return res;
+  }
   static inline int64_t FromBigEndian(int64_t value) { return ByteSwap(value); }
   static inline uint64_t FromBigEndian(uint64_t value) { return ByteSwap(value); }
   static inline int32_t FromBigEndian(int32_t value) { return ByteSwap(value); }
@@ -228,6 +239,7 @@ class BitUtil {
   static inline int16_t FromBigEndian(int16_t value) { return ByteSwap(value); }
   static inline uint16_t FromBigEndian(uint16_t value) { return ByteSwap(value); }
 #else
+  static inline __int128_t FromBigEndian(__int128_t val) { return val; }
   static inline int64_t FromBigEndian(int64_t val) { return val; }
   static inline uint64_t FromBigEndian(uint64_t val) { return val; }
   static inline int32_t FromBigEndian(int32_t val) { return val; }
diff --git a/common/fbs/IcebergObjects.fbs b/common/fbs/IcebergObjects.fbs
index 80d7f1d..585c300 100644
--- a/common/fbs/IcebergObjects.fbs
+++ b/common/fbs/IcebergObjects.fbs
@@ -22,11 +22,20 @@ enum FbFileFormat: byte {
   ORC
 }
 
+table FbIcebergColumnStats {
+  field_id: int;
+  total_compressed_byte_size: long;
+  null_count: long;
+  lower_bound: [ubyte];
+  upper_bound: [ubyte];
+}
+
 table FbIcebergDataFile {
   path: string;
   format: FbFileFormat = PARQUET;
   record_count: long = 0;
   file_size_in_bytes: long = 0;
   partition_path: string;
+  per_column_stats: [FbIcebergColumnStats];
 }
 
diff --git a/common/protobuf/control_service.proto b/common/protobuf/control_service.proto
index 69a286a..4a4a3cb 100644
--- a/common/protobuf/control_service.proto
+++ b/common/protobuf/control_service.proto
@@ -63,6 +63,10 @@ message DmlFileStatusPb {
   // TODO: this could be merged with final_path by storing only the suffix
   //       of the path and append it to a staging/final prefix from DmlPartitionStatusPB
   optional string staging_path = 4;
+
+  // Flat buffer encoded Iceberg data file (see FbIcebergDataFile). Contains metadata and
+  // stats for the iceberg file.
+  optional bytes iceberg_data_file_fb = 5;
 }
 
 // Per-partition statistics and metadata resulting from DML statements.
diff --git a/fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java
index 01351ad..33a47df 100644
--- a/fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java
@@ -28,13 +28,14 @@ import org.apache.iceberg.BaseTable;
 import org.apache.iceberg.DataFile;
 import org.apache.iceberg.DataFiles;
 import org.apache.iceberg.DeleteFiles;
-import org.apache.iceberg.UpdateSchema;
+import org.apache.iceberg.Metrics;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.ReplacePartitions;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableMetadata;
 import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.UpdateSchema;
 import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.expressions.Expressions;
 import org.apache.iceberg.types.Types;
@@ -47,6 +48,7 @@ import org.apache.impala.catalog.TableNotFoundException;
 import org.apache.impala.catalog.Type;
 import org.apache.impala.catalog.iceberg.IcebergCatalog;
 import org.apache.impala.common.ImpalaRuntimeException;
+import org.apache.impala.fb.FbIcebergColumnStats;
 import org.apache.impala.fb.FbIcebergDataFile;
 import org.apache.impala.thrift.TColumn;
 import org.apache.impala.thrift.TCreateTableParams;
@@ -273,9 +275,12 @@ public class IcebergCatalogOpExecutor {
     }
     for (ByteBuffer buf : dataFilesFb) {
       FbIcebergDataFile dataFile = FbIcebergDataFile.getRootAsFbIcebergDataFile(buf);
+
       PartitionSpec partSpec = nativeIcebergTable.specs().get(icebergOp.getSpec_id());
+      Metrics metrics = buildDataFileMetrics(feIcebergTable, dataFile);
       DataFiles.Builder builder =
           DataFiles.builder(partSpec)
+          .withMetrics(metrics)
           .withPath(dataFile.path())
           .withFormat(IcebergUtil.fbFileFormatToIcebergFileFormat(dataFile.format()))
           .withRecordCount(dataFile.recordCount())
@@ -289,6 +294,32 @@ public class IcebergCatalogOpExecutor {
     batchWrite.commit();
   }
 
+  private static Metrics buildDataFileMetrics(FeIcebergTable feIcebergTable,
+      FbIcebergDataFile dataFile) {
+    Map<Integer, Long> columnSizes = new HashMap<>();
+    Map<Integer, Long> nullValueCounts = new HashMap<>();
+    Map<Integer, ByteBuffer> lowerBounds = new HashMap<>();
+    Map<Integer, ByteBuffer> upperBounds = new HashMap<>();
+    for (int i = 0; i < dataFile.perColumnStatsLength(); ++i) {
+      FbIcebergColumnStats stats = dataFile.perColumnStats(i);
+      if (stats != null) {
+        int fieldId = stats.fieldId();
+        if (fieldId != -1) {
+          columnSizes.put(fieldId, stats.totalCompressedByteSize());
+          nullValueCounts.put(fieldId, stats.nullCount());
+          if (stats.lowerBoundLength() > 0) {
+            lowerBounds.put(fieldId, stats.lowerBoundAsByteBuffer());
+          }
+          if (stats.upperBoundLength() > 0) {
+            upperBounds.put(fieldId, stats.upperBoundAsByteBuffer());
+          }
+        }
+      }
+    }
+    return new Metrics(dataFile.recordCount(), columnSizes, null,
+        nullValueCounts, null, lowerBounds, upperBounds);
+  }
+
   /**
    * Creates new snapshot for the iceberg table by deleting all data files.
    */
diff --git a/infra/python/deps/requirements.txt b/infra/python/deps/requirements.txt
index f86f084..cefff6d 100644
--- a/infra/python/deps/requirements.txt
+++ b/infra/python/deps/requirements.txt
@@ -66,6 +66,7 @@ six == 1.14.0
 sqlparse == 0.3.1
 texttable == 0.8.3
 virtualenv == 16.7.10
+avro==1.10.2
 
 # Required for Kudu:
   Cython == 0.29.14
diff --git a/testdata/workloads/functional-query/queries/QueryTest/iceberg-partition-transform-insert.test b/testdata/workloads/functional-query/queries/QueryTest/iceberg-partition-transform-insert.test
index 1d75729..01a0513 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/iceberg-partition-transform-insert.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/iceberg-partition-transform-insert.test
@@ -106,7 +106,7 @@ where i = 2;
 INT,STRING,DATE,TIMESTAMP
 ---- RUNTIME_PROFILE
 aggregation(SUM, RowsRead): 1
-aggregation(SUM, NumRowGroups): 3
+aggregation(SUM, NumRowGroups): 1
 ====
 ---- QUERY
 # Test partition pruning with RUNTIME_PROFILE.
@@ -142,7 +142,7 @@ where t = '2021-01-03 03:03:03.030000000';
 INT,STRING,DATE,TIMESTAMP
 ---- RUNTIME_PROFILE
 aggregation(SUM, RowsRead): 1
-aggregation(SUM, NumRowGroups): 2
+aggregation(SUM, NumRowGroups): 1
 ====
 ---- QUERY
 # Test single col TRUNCATE
@@ -269,7 +269,7 @@ where s = 'the quick brown fox jumps over the lazy dog';
 INT,BIGINT,DECIMAL,STRING
 ---- RUNTIME_PROFILE
 aggregation(SUM, RowsRead): 2
-aggregation(SUM, NumRowGroups): 3
+aggregation(SUM, NumRowGroups): 2
 ====
 ---- QUERY
 # Test partition pruning with RUNTIME_PROFILE.
@@ -280,7 +280,7 @@ where s = 'the quick impala';
 INT,BIGINT,DECIMAL,STRING
 ---- RUNTIME_PROFILE
 aggregation(SUM, RowsRead): 0
-aggregation(SUM, NumRowGroups): 1
+aggregation(SUM, NumRowGroups): 0
 ====
 ---- QUERY
 # Test partition pruning with RUNTIME_PROFILE.
@@ -402,7 +402,7 @@ where t = '1969-02-15 13:55:03';
 TIMESTAMP,DATE
 ---- RUNTIME_PROFILE
 aggregation(SUM, RowsRead): 2
-aggregation(SUM, NumRowGroups): 2
+aggregation(SUM, NumRowGroups): 1
 ====
 ---- QUERY
 # Test partition pruning with RUNTIME_PROFILE.
@@ -452,7 +452,7 @@ where d = '1969-12-15';
 TIMESTAMP,DATE
 ---- RUNTIME_PROFILE
 aggregation(SUM, RowsRead): 2
-aggregation(SUM, NumRowGroups): 2
+aggregation(SUM, NumRowGroups): 1
 ====
 ---- QUERY
 # Create table with MONTH partition transform
@@ -560,7 +560,7 @@ where d = '1969-12-15';
 TIMESTAMP,DATE
 ---- RUNTIME_PROFILE
 aggregation(SUM, RowsRead): 1
-aggregation(SUM, NumRowGroups): 2
+aggregation(SUM, NumRowGroups): 1
 ====
 ---- QUERY
 # Create table with DAY partition transform
@@ -739,7 +739,7 @@ where t = '1969-12-31 22:55:03';
 TIMESTAMP
 ---- RUNTIME_PROFILE
 aggregation(SUM, RowsRead): 1
-aggregation(SUM, NumRowGroups): 2
+aggregation(SUM, NumRowGroups): 1
 ====
 ---- QUERY
 # Test partition pruning with RUNTIME_PROFILE.
@@ -751,7 +751,7 @@ where t = '1969-12-31 23:55:03';
 TIMESTAMP
 ---- RUNTIME_PROFILE
 aggregation(SUM, RowsRead): 2
-aggregation(SUM, NumRowGroups): 2
+aggregation(SUM, NumRowGroups): 1
 ====
 ---- QUERY
 # Test partition pruning with RUNTIME_PROFILE.
@@ -827,7 +827,7 @@ where s = 'quick brown dog';
 STRING,BIGINT,DECIMAL,TIMESTAMP,DATE
 ---- RUNTIME_PROFILE
 aggregation(SUM, RowsRead): 1
-aggregation(SUM, NumRowGroups): 3
+aggregation(SUM, NumRowGroups): 1
 ====
 ---- QUERY
 # Test partition pruning with RUNTIME_PROFILE.
diff --git a/testdata/workloads/functional-query/queries/QueryTest/iceberg-partitioned-insert.test b/testdata/workloads/functional-query/queries/QueryTest/iceberg-partitioned-insert.test
index 6ee2edd..c8d3a68 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/iceberg-partitioned-insert.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/iceberg-partitioned-insert.test
@@ -211,7 +211,8 @@ BIGINT
 aggregation(SUM, NumRowGroups): 4
 ====
 ---- QUERY
-# 'timestamp_col' is not a partitioning column.
+# 'timestamp_col' is not a partitioning column, but min/max stats will be used to
+# eliminate row groups
 select count(*) from alltypes_part
 where timestamp_col = now();
 ---- RESULTS
@@ -219,7 +220,7 @@ where timestamp_col = now();
 ---- TYPES
 BIGINT
 ---- RUNTIME_PROFILE
-aggregation(SUM, NumRowGroups): 8
+aggregation(SUM, NumRowGroups): 0
 ====
 ---- QUERY
 # Iceberg partitions independent of column order
diff --git a/testdata/workloads/functional-query/queries/QueryTest/iceberg-upper-lower-bound-metrics.test b/testdata/workloads/functional-query/queries/QueryTest/iceberg-upper-lower-bound-metrics.test
new file mode 100644
index 0000000..d20fef8
--- /dev/null
+++ b/testdata/workloads/functional-query/queries/QueryTest/iceberg-upper-lower-bound-metrics.test
@@ -0,0 +1,500 @@
+====
+---- QUERY
+# Test STRING/INT/BIGINT/FLOAT/DOUBLE metrics
+create table ice_types1 (p INT, s STRING, i INT, bi BIGINT, f FLOAT, db DOUBLE)
+partitioned by spec (p)
+stored as iceberg
+tblproperties ('write.format.default'='parquet');
+---- RESULTS
+'Table has been created.'
+====
+---- QUERY
+insert into ice_types1 values
+    (0, 'aluminium', 1, 1, 0.126, 0.126), (0, 'bromine',  2, 2, 0.131, 0.131), (0, 'carbon', 3, 3, 3.45, 3.45),
+    (1, 'dubnium', 4, 4, 3.567, 3.567), (1, 'europium', 5, 5, 3.99, 3.99), (1, 'fermium', 6, 6, 4.01, 4.01),
+    (2, 'fermium', 6, 6, 4.01, 4.01), (2, 'helium', 8, 8, 4.65, 4.65), (2, 'iridium', 9, 9, 4.67, 4.67);
+select count(*) from ice_types1;
+---- RESULTS
+9
+---- TYPES
+BIGINT
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 3
+====
+---- QUERY
+show files in ice_types1;
+---- RESULTS: VERIFY_IS_SUBSET
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/ice_types1/data/p=0/.*.0.parq','.*',''
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/ice_types1/data/p=1/.*.0.parq','.*',''
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/ice_types1/data/p=2/.*.0.parq','.*',''
+---- TYPES
+STRING, STRING, STRING
+====
+---- QUERY
+# Lower/upper bounds metrics eliminate all row groups
+select count(*) from ice_types1
+where s >= 'z';
+---- RESULTS
+0
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 0
+aggregation(SUM, RowsRead): 0
+====
+---- QUERY
+# Where condition matches one row group's metrics
+select count(*) from ice_types1
+where s >= 'b' and s <= 'cz';
+---- RESULTS
+2
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 1
+aggregation(SUM, RowsRead): 3
+====
+---- QUERY
+# Where condition spans over 2 row groups
+select count(*) from ice_types1
+where s >= 'b' and s <= 'dz';
+---- RESULTS
+3
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 2
+aggregation(SUM, RowsRead): 6
+====
+---- QUERY
+# 'fermium' is the upper bound of one row group and the lower bound of another
+select count(*) from ice_types1
+where s = 'fermium';
+---- RESULTS
+2
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 2
+aggregation(SUM, RowsRead): 6
+====
+---- QUERY
+# Lower/upper bounds metrics eliminate all row groups
+select count(*) from ice_types1
+where i >= 10;
+---- RESULTS
+0
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 0
+aggregation(SUM, RowsRead): 0
+====
+---- QUERY
+# Where condition matches one row group's metrics
+select count(*) from ice_types1
+where i >= 2 and i <= 3;
+---- RESULTS
+2
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 1
+aggregation(SUM, RowsRead): 3
+====
+---- QUERY
+# Where condition spans over 2 row groups
+select count(*) from ice_types1
+where i >= 2 and i <= 4;
+---- RESULTS
+3
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 2
+aggregation(SUM, RowsRead): 6
+====
+---- QUERY
+# 6 is the upper bound of one row group and the lower bound of another
+select count(*) from ice_types1
+where i = 6;
+---- RESULTS
+2
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 2
+aggregation(SUM, RowsRead): 6
+====
+---- QUERY
+# Lower/upper bounds metrics eliminate all row groups
+select count(*) from ice_types1
+where bi >= 10;
+---- RESULTS
+0
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 0
+aggregation(SUM, RowsRead): 0
+====
+---- QUERY
+# Where condition matches one row group's metrics
+select count(*) from ice_types1
+where bi >= 2 and bi <= 3;
+---- RESULTS
+2
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 1
+aggregation(SUM, RowsRead): 3
+====
+---- QUERY
+# Where condition spans over 2 row groups
+select count(*) from ice_types1
+where bi >= 2 and bi <= 4;
+---- RESULTS
+3
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 2
+aggregation(SUM, RowsRead): 6
+====
+---- QUERY
+# 6 is the upper bound of one row group and the lower bound of another
+select count(*) from ice_types1
+where bi = 6;
+---- RESULTS
+2
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 2
+aggregation(SUM, RowsRead): 6
+====
+---- QUERY
+# Lower/upper bounds metrics eliminate all row groups
+select count(*) from ice_types1
+where f >= 5.0;
+---- RESULTS
+0
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 0
+aggregation(SUM, RowsRead): 0
+====
+---- QUERY
+# Where condition matches one row group's metrics
+select count(*) from ice_types1
+where f >= 0.13 and f <= 3.451;
+---- RESULTS
+2
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 1
+aggregation(SUM, RowsRead): 3
+====
+---- QUERY
+# Where condition spans over 2 row groups
+select count(*) from ice_types1
+where f >= 0.13 and f <= 3.57;
+---- RESULTS
+3
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 2
+aggregation(SUM, RowsRead): 6
+====
+---- QUERY
+# 4.01 is the upper bound of one row group and the lower bound of another
+select count(*) from ice_types1
+where f = 4.01;
+---- RESULTS
+2
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 2
+aggregation(SUM, RowsRead): 6
+====
+---- QUERY
+# Lower/upper bounds metrics eliminate all row groups
+select count(*) from ice_types1
+where db >= 5.0;
+---- RESULTS
+0
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 0
+aggregation(SUM, RowsRead): 0
+====
+---- QUERY
+# Where condition matches one row group's metrics
+select count(*) from ice_types1
+where db >= 0.13 and db <= 3.451;
+---- RESULTS
+2
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 1
+aggregation(SUM, RowsRead): 3
+====
+---- QUERY
+# Where condition spans over 2 row groups
+select count(*) from ice_types1
+where db >= 0.13 and db <= 3.57;
+---- RESULTS
+3
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 2
+aggregation(SUM, RowsRead): 6
+====
+---- QUERY
+# 4.01 is the upper bound of one row group and the lower bound of another
+select count(*) from ice_types1
+where db = 4.01;
+---- RESULTS
+2
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 2
+aggregation(SUM, RowsRead): 6
+====
+---- QUERY
+# Test DATE/TIMESTAMP metrics
+create table ice_types2 (p INT, dt DATE, ts TIMESTAMP)
+partitioned by spec (p)
+stored as iceberg
+tblproperties ('write.format.default'='parquet');
+---- RESULTS
+'Table has been created.'
+====
+---- QUERY
+insert into ice_types2 values
+    (0, DATE'1400-01-01', '1400-01-01 00:00:00'), (0, DATE'1969-12-31', '1969-12-31 12:34:59'), (0, DATE'1970-01-01', '1969-12-31 12:35:00 '),
+    (1, DATE'1970-01-02', '1969-12-31 12:35:01'), (1, DATE'1999-12-30', '1999-12-30 01:11:35'), (1, DATE'1999-12-31', '1999-12-31 23:59:59'),
+    (2, DATE'1999-12-31', '1999-12-31 23:59:59'), (2, DATE'9999-12-30', '9999-12-30 00:23:42'), (2, DATE'9999-12-30', '9999-12-31 03:04:05');
+select count(*) from ice_types2;
+---- RESULTS
+9
+---- TYPES
+BIGINT
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 3
+====
+---- QUERY
+show files in ice_types2;
+---- RESULTS: VERIFY_IS_SUBSET
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/ice_types2/data/p=0/.*.0.parq','.*',''
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/ice_types2/data/p=1/.*.0.parq','.*',''
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/ice_types2/data/p=2/.*.0.parq','.*',''
+---- TYPES
+STRING, STRING, STRING
+====
+---- QUERY
+# Lower/upper bounds metrics eliminate all row groups
+select count(*) from ice_types2
+where dt >= DATE'9999-12-31';
+---- RESULTS
+0
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 0
+aggregation(SUM, RowsRead): 0
+====
+---- QUERY
+# Where condition matches one row group's metrics
+select count(*) from ice_types2
+where dt >= DATE'1969-12-31' and dt <= DATE'1970-01-01';
+---- RESULTS
+2
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 1
+aggregation(SUM, RowsRead): 3
+====
+---- QUERY
+# Where condition spans over 2 row groups
+select count(*) from ice_types2
+where dt >= DATE'1969-12-31' and dt <= DATE'1970-01-02';
+---- RESULTS
+3
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 2
+aggregation(SUM, RowsRead): 6
+====
+---- QUERY
+# 1999-12-31 is the upper bound of one row group and the lower bound of another
+select count(*) from ice_types2
+where dt = DATE'1999-12-31';
+---- RESULTS
+2
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 2
+aggregation(SUM, RowsRead): 6
+====
+---- QUERY
+# Lower/upper bounds metrics eliminate all row groups
+select count(*) from ice_types2
+where ts >= '9999-12-31 03:04:06';
+---- RESULTS
+0
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 0
+aggregation(SUM, RowsRead): 0
+====
+---- QUERY
+# Where condition matches one row group's metrics
+select count(*) from ice_types2
+where dt >= '1969-12-31 12:34:59' and ts <= '1969-12-31 12:35:00';
+---- RESULTS
+2
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 1
+aggregation(SUM, RowsRead): 3
+====
+---- QUERY
+# Where condition spans over 2 row groups
+select count(*) from ice_types2
+where ts >= '1969-12-31 12:34:59' and ts <= '1969-12-31 12:35:01';
+---- RESULTS
+3
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 2
+aggregation(SUM, RowsRead): 6
+====
+---- QUERY
+# '1999-12-31 23:59:59' is the upper bound of one row group and the lower bound of another
+select count(*) from ice_types2
+where ts = '1999-12-31 23:59:59';
+---- RESULTS
+2
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 2
+aggregation(SUM, RowsRead): 6
+====
+---- QUERY
+# Test DECIMAL metrics
+create table ice_types3 (p INT, d1 DECIMAL(9, 3), d2 DECIMAL(18, 3), d3 DECIMAL(38, 3))
+partitioned by spec (p)
+stored as iceberg
+tblproperties ('write.format.default'='parquet');
+---- RESULTS
+'Table has been created.'
+====
+---- QUERY
+insert into ice_types3 values
+    (0, 123.456, 1234567890.123, 1234567890123456789.012),
+    (0, 123.457, 1234567890.124, 1234567890123456789.013),
+    (0, 123.458, 1234567890.125, 1234567890123456789.014),
+    (1, 123.459, 1234567890.126, 1234567890123456789.015),
+    (1, 333.333, 3333333333.333, 3333333333333333333.333),
+    (1, 341.234, 3412345678.901, 3412345678901234567.89),
+    (2, 341.234, 3412345678.901, 3412345678901234567.89),
+    (2, 123456.789, 123456789012345.678, 12345678901234567890123456789012345.678),
+    (2, 123456.790, 123456789012345.679, 12345678901234567890123456789012345.68);
+select count(*) from ice_types3;
+---- RESULTS
+9
+---- TYPES
+BIGINT
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 3
+====
+---- QUERY
+show files in ice_types3;
+---- RESULTS: VERIFY_IS_SUBSET
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/ice_types3/data/p=0/.*.0.parq','.*',''
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/ice_types3/data/p=1/.*.0.parq','.*',''
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/ice_types3/data/p=2/.*.0.parq','.*',''
+---- TYPES
+STRING, STRING, STRING
+====
+---- QUERY
+# Lower/upper bounds metrics eliminate all row groups
+select count(*) from ice_types3
+where d1 >= 123456.791;
+---- RESULTS
+0
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 0
+aggregation(SUM, RowsRead): 0
+====
+---- QUERY
+# Where condition matches one row group's metrics
+select count(*) from ice_types3
+where d1 >= 123.457 and d1 <= 123.458;
+---- RESULTS
+2
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 1
+aggregation(SUM, RowsRead): 3
+====
+---- QUERY
+# Where condition spans over 2 row groups
+select count(*) from ice_types3
+where d1 >= 123.457 and d1 <= 123.459;
+---- RESULTS
+3
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 2
+aggregation(SUM, RowsRead): 6
+====
+---- QUERY
+# 341.234 is the upper bound of one row group and the lower bound of another
+select count(*) from ice_types3
+where d1 = 341.234;
+---- RESULTS
+2
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 2
+aggregation(SUM, RowsRead): 6
+====
+---- QUERY
+# Lower/upper bounds metrics eliminate all row groups
+select count(*) from ice_types3
+where d2 >= 123456789012345.680;
+---- RESULTS
+0
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 0
+aggregation(SUM, RowsRead): 0
+====
+---- QUERY
+# Where condition matches one row group's metrics
+select count(*) from ice_types3
+where d2 >= 1234567890.124 and d2 <= 1234567890.125;
+---- RESULTS
+2
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 1
+aggregation(SUM, RowsRead): 3
+====
+---- QUERY
+# Where condition spans over 2 row groups
+select count(*) from ice_types3
+where d2 >= 1234567890.124 and d2 <= 1234567890.126;
+---- RESULTS
+3
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 2
+aggregation(SUM, RowsRead): 6
+====
+---- QUERY
+# 3412345678.901 is the upper bound of one row group and the lower bound of another
+select count(*) from ice_types3
+where d2 = 3412345678.901;
+---- RESULTS
+2
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 2
+aggregation(SUM, RowsRead): 6
+====
+---- QUERY
+# Lower/upper bounds metrics eliminate all row groups
+select count(*) from ice_types3
+where d3 >= 12345678901234567890123456789012345.681;
+---- RESULTS
+0
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 0
+aggregation(SUM, RowsRead): 0
+====
+---- QUERY
+# Where condition matches one row group's metrics
+select count(*) from ice_types3
+where d3 >= 1234567890123456789.013 and d3 <= 1234567890123456789.014;
+---- RESULTS
+2
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 1
+aggregation(SUM, RowsRead): 3
+====
+---- QUERY
+# Where condition spans over 2 row groups
+select count(*) from ice_types3
+where d3 >= 1234567890123456789.013 and d3 <= 1234567890123456789.015;
+---- RESULTS
+3
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 2
+aggregation(SUM, RowsRead): 6
+====
+---- QUERY
+# 3412345678901234567.89 is the upper bound of one row group and the lower bound of another
+select count(*) from ice_types3
+where d3 = 3412345678901234567.89;
+---- RESULTS
+2
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 2
+aggregation(SUM, RowsRead): 6
+====
diff --git a/tests/query_test/test_iceberg.py b/tests/query_test/test_iceberg.py
index 987dd42..489e384 100644
--- a/tests/query_test/test_iceberg.py
+++ b/tests/query_test/test_iceberg.py
@@ -23,6 +23,10 @@ import time
 from subprocess import check_call
 from parquet.ttypes import ConvertedType
 
+from avro.datafile import DataFileReader
+from avro.io import DatumReader
+import json
+
 from tests.common.impala_test_suite import ImpalaTestSuite, LOG
 from tests.common.skip import SkipIf
 
@@ -263,3 +267,181 @@ class TestIcebergTable(ImpalaTestSuite):
     assert a_schema_element.converted_type == ConvertedType.UTF8
 
     os.remove(local_file)
+
+  # Get hdfs path to manifest list that belongs to the sanpshot identified by
+  # 'snapshot_counter'.
+  def get_manifest_list_hdfs_path(self, tmp_path_prefix, db_name, table_name,
+      snapshot_counter):
+    local_path = '%s_%s.metadata.json' % (tmp_path_prefix, random.randint(0, 10000))
+    hdfs_path = get_fs_path('/test-warehouse/%s.db/%s/metadata/%s*.metadata.json'
+        % (db_name, table_name, snapshot_counter))
+    check_call(['hadoop', 'fs', '-copyToLocal', hdfs_path, local_path])
+
+    manifest_list_hdfs_path = None
+    try:
+      with open(local_path, 'r') as fp:
+        metadata = json.load(fp)
+        current_snapshot_id = metadata['current-snapshot-id']
+        for snapshot in metadata['snapshots']:
+          if snapshot['snapshot-id'] == current_snapshot_id:
+            manifest_list_hdfs_path = snapshot['manifest-list']
+            break
+    finally:
+      os.remove(local_path)
+    return manifest_list_hdfs_path
+
+  # Get list of hdfs paths to manifest files from the manifest list avro file.
+  def get_manifest_hdfs_path_list(self, tmp_path_prefix, manifest_list_hdfs_path):
+    local_path = '%s_%s.manifest_list.avro' % (tmp_path_prefix, random.randint(0, 10000))
+    check_call(['hadoop', 'fs', '-copyToLocal', manifest_list_hdfs_path, local_path])
+
+    manifest_hdfs_path_list = []
+    reader = None
+    try:
+      with open(local_path, 'rb') as fp:
+        reader = DataFileReader(fp, DatumReader())
+        for manifest in reader:
+          manifest_hdfs_path_list.append(manifest['manifest_path'])
+    finally:
+      if reader:
+        reader.close()
+      os.remove(local_path)
+    return manifest_hdfs_path_list
+
+  # Get 'data_file' structs from avro manifest files.
+  def get_data_file_list(self, tmp_path_prefix, manifest_hdfs_path_list):
+    datafiles = []
+    for hdfs_path in manifest_hdfs_path_list:
+      local_path = '%s_%s.manifest.avro' % (tmp_path_prefix, random.randint(0, 10000))
+      check_call(['hadoop', 'fs', '-copyToLocal', hdfs_path, local_path])
+
+      reader = None
+      try:
+        with open(local_path, 'rb') as fp:
+          reader = DataFileReader(fp, DatumReader())
+          datafiles.extend([rec['data_file'] for rec in reader])
+      finally:
+        if reader:
+          reader.close()
+        os.remove(local_path)
+    return datafiles
+
+  @SkipIf.not_hdfs
+  def test_writing_metrics_to_metadata(self, vector, unique_database):
+    # Create table
+    table_name = "ice_stats"
+    qualified_table_name = "%s.%s" % (unique_database, table_name)
+    query = 'create table %s ' \
+        '(s string, i int, b boolean, bi bigint, ts timestamp, dt date, ' \
+        'dc decimal(10, 3)) ' \
+        'stored as iceberg' \
+        % qualified_table_name
+    self.client.execute(query)
+
+    # Insert data
+    # 1st data file:
+    query = 'insert into %s values ' \
+        '("abc", 3, true, NULL, "1970-01-03 09:11:22", NULL, 56.34), ' \
+        '("def", NULL, false, NULL, "1969-12-29 14:45:59", DATE"1969-01-01", -10.0), ' \
+        '("ghij", 1, NULL, 123456789000000, "1970-01-01", DATE"1970-12-31", NULL), ' \
+        '(NULL, 0, NULL, 234567890000001, NULL, DATE"1971-01-01", NULL)' \
+        % qualified_table_name
+    self.execute_query(query)
+    # 2nd data file:
+    query = 'insert into %s values ' \
+        '(NULL, NULL, NULL, NULL, NULL, NULL, NULL), ' \
+        '(NULL, NULL, NULL, NULL, NULL, NULL, NULL)' \
+        % qualified_table_name
+    self.execute_query(query)
+
+    # Get hdfs path to manifest list file
+    manifest_list_hdfs_path = self.get_manifest_list_hdfs_path(
+        '/tmp/iceberg_metrics_test', unique_database, table_name, '00002')
+
+    # Get the list of hdfs paths to manifest files
+    assert manifest_list_hdfs_path is not None
+    manifest_hdfs_path_list = self.get_manifest_hdfs_path_list(
+        '/tmp/iceberg_metrics_test', manifest_list_hdfs_path)
+
+    # Get 'data_file' records from manifest files.
+    assert manifest_hdfs_path_list is not None and len(manifest_hdfs_path_list) > 0
+    datafiles = self.get_data_file_list('/tmp/iceberg_metrics_test',
+        manifest_hdfs_path_list)
+
+    # Check column stats in datafiles
+    assert datafiles is not None and len(datafiles) == 2
+
+    # The 1st datafile contains the 2 NULL rows
+    assert datafiles[0]['record_count'] == 2
+    assert datafiles[0]['column_sizes'] == \
+        [{'key': 1, 'value': 39},
+         {'key': 2, 'value': 39},
+         {'key': 3, 'value': 25},
+         {'key': 4, 'value': 39},
+         {'key': 5, 'value': 39},
+         {'key': 6, 'value': 39},
+         {'key': 7, 'value': 39}]
+    assert datafiles[0]['null_value_counts'] == \
+        [{'key': 1, 'value': 2},
+         {'key': 2, 'value': 2},
+         {'key': 3, 'value': 2},
+         {'key': 4, 'value': 2},
+         {'key': 5, 'value': 2},
+         {'key': 6, 'value': 2},
+         {'key': 7, 'value': 2}]
+    # Upper/lower bounds should be empty lists
+    assert datafiles[0]['lower_bounds'] == []
+    assert datafiles[0]['upper_bounds'] == []
+
+    # 2nd datafile
+    assert datafiles[1]['record_count'] == 4
+    assert datafiles[1]['column_sizes'] == \
+        [{'key': 1, 'value': 66},
+         {'key': 2, 'value': 56},
+         {'key': 3, 'value': 26},
+         {'key': 4, 'value': 59},
+         {'key': 5, 'value': 68},
+         {'key': 6, 'value': 56},
+         {'key': 7, 'value': 53}]
+    assert datafiles[1]['null_value_counts'] == \
+        [{'key': 1, 'value': 1},
+         {'key': 2, 'value': 1},
+         {'key': 3, 'value': 2},
+         {'key': 4, 'value': 2},
+         {'key': 5, 'value': 1},
+         {'key': 6, 'value': 1},
+         {'key': 7, 'value': 2}]
+    assert datafiles[1]['lower_bounds'] == \
+        [{'key': 1, 'value': 'abc'},
+         # INT is serialized as 4-byte little endian
+         {'key': 2, 'value': '\x00\x00\x00\x00'},
+         # BOOLEAN is serialized as 0x00 for FALSE
+         {'key': 3, 'value': '\x00'},
+         # BIGINT is serialized as 8-byte little endian
+         {'key': 4, 'value': '\x40\xaf\x0d\x86\x48\x70\x00\x00'},
+         # TIMESTAMP is serialized as 8-byte little endian (number of microseconds since
+         # 1970-01-01 00:00:00)
+         {'key': 5, 'value': '\xc0\xd7\xff\x06\xd0\xff\xff\xff'},
+         # DATE is serialized as 4-byte little endian (number of days since 1970-01-01)
+         {'key': 6, 'value': '\x93\xfe\xff\xff'},
+         # Unlike other numerical values, DECIMAL is serialized as big-endian.
+         {'key': 7, 'value': '\xd8\xf0'}]
+    assert datafiles[1]['upper_bounds'] == \
+        [{'key': 1, 'value': 'ghij'},
+         # INT is serialized as 4-byte little endian
+         {'key': 2, 'value': '\x03\x00\x00\x00'},
+         # BOOLEAN is serialized as 0x01 for TRUE
+         {'key': 3, 'value': '\x01'},
+         # BIGINT is serialized as 8-byte little endian
+         {'key': 4, 'value': '\x81\x58\xc2\x97\x56\xd5\x00\x00'},
+         # TIMESTAMP is serialized as 8-byte little endian (number of microseconds since
+         # 1970-01-01 00:00:00)
+         {'key': 5, 'value': '\x80\x02\x86\xef\x2f\x00\x00\x00'},
+         # DATE is serialized as 4-byte little endian (number of days since 1970-01-01)
+         {'key': 6, 'value': '\x6d\x01\x00\x00'},
+         # Unlike other numerical values, DECIMAL is serialized as big-endian.
+         {'key': 7, 'value': '\x00\xdc\x14'}]
+
+  def test_using_upper_lower_bound_metrics(self, vector, unique_database):
+    self.run_test_case('QueryTest/iceberg-upper-lower-bound-metrics', vector,
+        use_db=unique_database)