You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by bo...@apache.org on 2021/09/06 09:31:23 UTC
[impala] branch master updated: IMPALA-10879: Add parquet stats to
iceberg manifest
This is an automated email from the ASF dual-hosted git repository.
boroknagyz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
The following commit(s) were added to refs/heads/master by this push:
new c8aa579 IMPALA-10879: Add parquet stats to iceberg manifest
c8aa579 is described below
commit c8aa5796d93510723342055cc70cf8d00abae754
Author: Attila Jeges <at...@cloudera.com>
AuthorDate: Fri Aug 6 16:51:57 2021 +0200
IMPALA-10879: Add parquet stats to iceberg manifest
This patch adds parquet stats to iceberg manifest as per-datafile
metrics.
The following metrics are supported:
- column_sizes :
Map from column id to the total size on disk of all regions that
store the column. Does not include bytes necessary to read other
columns, like footers.
- null_value_counts :
Map from column id to number of null values in the column.
- lower_bounds :
Map from column id to lower bound in the column serialized as
binary. Each value must be less than or equal to all non-null,
non-NaN values in the column for the file.
- upper_bounds :
Map from column id to upper bound in the column serialized as
binary. Each value must be greater than or equal to all non-null,
non-Nan values in the column for the file.
The corresponding parquet stats are collected by 'ColumnStats'
(in 'min_value_', 'max_value_', 'null_count_' members) and
'HdfsParquetTableWriter::BaseColumnWriter' (in
'total_compressed_byte_size_' member).
Testing:
- New e2e test was added to verify that the metrics are written to the
Iceberg manifest upon inserting data.
- New e2e test was added to verify that lower_bounds/upper_bounds
metrics are used to prune data files on querying iceberg tables.
- Existing e2e tests were updated to work with the new behavior.
- BE test for single-value serialization.
Relevant Iceberg documentation:
- Manifest:
https://iceberg.apache.org/spec/#manifests
- Values in lower_bounds and upper_bounds maps should be Single-value
serialized to binary:
https://iceberg.apache.org/spec/#appendix-d-single-value-serialization
Change-Id: Ic31f2260bc6f6a7f307ac955ff05eb154917675b
Reviewed-on: http://gerrit.cloudera.org:8080/17806
Tested-by: Impala Public Jenkins <im...@cloudera.com>
Reviewed-by: Attila Jeges <at...@cloudera.com>
---
be/src/exec/hdfs-table-sink.cc | 3 +-
be/src/exec/hdfs-table-writer.h | 23 +-
be/src/exec/parquet/CMakeLists.txt | 2 +
be/src/exec/parquet/hdfs-parquet-table-writer.cc | 18 +
be/src/exec/parquet/hdfs-parquet-table-writer.h | 3 +
be/src/exec/parquet/parquet-column-stats.h | 15 +
be/src/exec/parquet/parquet-column-stats.inline.h | 72 +++
be/src/exec/parquet/serialize-single-value-test.cc | 66 +++
be/src/runtime/dml-exec-state.cc | 84 +++-
be/src/runtime/dml-exec-state.h | 6 +-
be/src/util/bit-util.h | 12 +
common/fbs/IcebergObjects.fbs | 9 +
common/protobuf/control_service.proto | 4 +
.../impala/service/IcebergCatalogOpExecutor.java | 33 +-
infra/python/deps/requirements.txt | 1 +
.../iceberg-partition-transform-insert.test | 20 +-
.../QueryTest/iceberg-partitioned-insert.test | 5 +-
.../iceberg-upper-lower-bound-metrics.test | 500 +++++++++++++++++++++
tests/query_test/test_iceberg.py | 182 ++++++++
19 files changed, 1025 insertions(+), 33 deletions(-)
diff --git a/be/src/exec/hdfs-table-sink.cc b/be/src/exec/hdfs-table-sink.cc
index 0395c22..c1b0e87 100644
--- a/be/src/exec/hdfs-table-sink.cc
+++ b/be/src/exec/hdfs-table-sink.cc
@@ -735,7 +735,8 @@ Status HdfsTableSink::FinalizePartitionFile(
state->dml_exec_state()->UpdatePartition(
partition->partition_name, partition->current_file_rows,
&partition->writer->stats());
- state->dml_exec_state()->AddCreatedFile(*partition);
+ state->dml_exec_state()->AddCreatedFile(*partition, IsIceberg(),
+ partition->writer->iceberg_file_stats());
}
RETURN_IF_ERROR(ClosePartitionFile(state, partition));
diff --git a/be/src/exec/hdfs-table-writer.h b/be/src/exec/hdfs-table-writer.h
index 2d92829..7de3f4f 100644
--- a/be/src/exec/hdfs-table-writer.h
+++ b/be/src/exec/hdfs-table-writer.h
@@ -20,7 +20,7 @@
#define IMPALA_EXEC_HDFS_TABLE_WRITER_H
#include <vector>
-#include <hdfs.h>
+#include "common/hdfs.h"
#include "common/status.h"
#include "gen-cpp/control_service.pb.h"
@@ -35,6 +35,20 @@ class RowBatch;
class RuntimeState;
class ScalarExprEvaluator;
+/// Per column statistics to be written to Iceberg manifest files (for each data file).
+/// min_binary and max_binary members contain Single-Value serialized lower and upper
+/// column stats
+/// (https://iceberg.apache.org/spec/#appendix-d-single-value-serialization).
+struct IcebergColumnStats {
+ bool has_min_max_values;
+ std::string min_binary;
+ std::string max_binary;
+ int64_t null_count;
+ int64_t column_size;
+};
+
+typedef std::unordered_map<int, IcebergColumnStats> IcebergFileStats;
+
/// Pure virtual class for writing to hdfs table partition files.
/// Subclasses implement the code needed to write to a specific file type.
/// A subclass needs to implement functions to format and add rows to the file
@@ -90,6 +104,9 @@ class HdfsTableWriter {
/// Returns the stats for this writer.
const DmlStatsPB& stats() const { return stats_; }
+ /// Returns the stats for the latest iceberg file written by this writer.
+ const IcebergFileStats& iceberg_file_stats() const { return iceberg_file_stats_; }
+
/// Default block size to use for this file format. If the file format doesn't
/// care, it should return 0 and the hdfs config default will be used.
virtual uint64_t default_block_size() const = 0;
@@ -134,6 +151,10 @@ class HdfsTableWriter {
/// Subclass should populate any file format specific stats.
DmlStatsPB stats_;
+
+ /// Contains the per-column stats for the latest file written by this writer.
+ /// Used with iceberg only.
+ IcebergFileStats iceberg_file_stats_;
};
}
#endif
diff --git a/be/src/exec/parquet/CMakeLists.txt b/be/src/exec/parquet/CMakeLists.txt
index 0a3efa9..b121fe5 100644
--- a/be/src/exec/parquet/CMakeLists.txt
+++ b/be/src/exec/parquet/CMakeLists.txt
@@ -48,6 +48,7 @@ add_library(ParquetTests STATIC
parquet-page-index-test.cc
parquet-plain-test.cc
parquet-version-test.cc
+ serialize-single-value-test.cc
)
add_dependencies(ParquetTests gen-deps)
@@ -57,4 +58,5 @@ ADD_UNIFIED_BE_LSAN_TEST(parquet-page-index-test ParquetPageIndex.*)
ADD_UNIFIED_BE_LSAN_TEST(parquet-plain-test PlainEncoding.*)
ADD_UNIFIED_BE_LSAN_TEST(parquet-version-test ParquetVersionTest.*)
ADD_UNIFIED_BE_LSAN_TEST(hdfs-parquet-scanner-test HdfsParquetScannerTest.*)
+ADD_UNIFIED_BE_LSAN_TEST(serialize-single-value-test SerializeSingleValueTest.*)
diff --git a/be/src/exec/parquet/hdfs-parquet-table-writer.cc b/be/src/exec/parquet/hdfs-parquet-table-writer.cc
index b022b66..82746e4 100644
--- a/be/src/exec/parquet/hdfs-parquet-table-writer.cc
+++ b/be/src/exec/parquet/hdfs-parquet-table-writer.cc
@@ -1673,9 +1673,19 @@ Status HdfsParquetTableWriter::WriteParquetBloomFilter(BaseColumnWriter* col_wri
return Status::OK();
}
+void HdfsParquetTableWriter::CollectIcebergDmlFileColumnStats(int field_id,
+ const BaseColumnWriter* col_writer) {
+ // Each data file consists of a single row group, so row group null_count / min / max
+ // stats can be used as data file stats.
+ // Get column_size from column writer.
+ col_writer->row_group_stats_base_->GetIcebergStats(col_writer->total_compressed_size(),
+ &iceberg_file_stats_[field_id]);
+}
+
Status HdfsParquetTableWriter::FlushCurrentRowGroup() {
if (current_row_group_ == nullptr) return Status::OK();
+ const int num_clustering_cols = table_desc_->num_clustering_cols();
for (int i = 0; i < columns_.size(); ++i) {
int64_t data_page_offset, dict_page_offset;
// Flush this column. This updates the final metadata sizes for this column.
@@ -1701,6 +1711,14 @@ Status HdfsParquetTableWriter::FlushCurrentRowGroup() {
parquet_dml_stats_.mutable_per_column_size();
(*column_size_map)[col_name] += col_writer->total_compressed_size();
+ if (is_iceberg_file_ ) {
+ const ColumnDescriptor& col_desc =
+ table_desc_->col_descs()[i + num_clustering_cols];
+ DCHECK_EQ(col_desc.name(), col_name);
+ const int field_id = col_desc.field_id();
+ if (field_id != -1) CollectIcebergDmlFileColumnStats(field_id, col_writer);
+ }
+
// Write encodings and encoding stats for this column
col_metadata.encodings.clear();
for (parquet::Encoding::type encoding : col_writer->column_encodings_) {
diff --git a/be/src/exec/parquet/hdfs-parquet-table-writer.h b/be/src/exec/parquet/hdfs-parquet-table-writer.h
index a13a517..862b9cb 100644
--- a/be/src/exec/parquet/hdfs-parquet-table-writer.h
+++ b/be/src/exec/parquet/hdfs-parquet-table-writer.h
@@ -156,6 +156,9 @@ class HdfsParquetTableWriter : public HdfsTableWriter {
Status WriteParquetBloomFilter(BaseColumnWriter* col_writer,
parquet::ColumnMetaData* meta_data) WARN_UNUSED_RESULT;
+ /// Add per-column statistics to 'iceberg_file_stats_' for the current data file.
+ void CollectIcebergDmlFileColumnStats(int field_id, const BaseColumnWriter* col_writer);
+
/// Flushes the current row group to file. This will compute the final
/// offsets of column chunks, updating the file metadata.
Status FlushCurrentRowGroup();
diff --git a/be/src/exec/parquet/parquet-column-stats.h b/be/src/exec/parquet/parquet-column-stats.h
index 5615d15..83128c7 100644
--- a/be/src/exec/parquet/parquet-column-stats.h
+++ b/be/src/exec/parquet/parquet-column-stats.h
@@ -21,6 +21,7 @@
#include <string>
#include <type_traits>
+#include "exec/hdfs-table-writer.h"
#include "exec/parquet/parquet-common.h"
#include "runtime/date-value.h"
#include "runtime/decimal-value.h"
@@ -112,6 +113,10 @@ class ColumnStatsBase {
/// Encodes the current values into a Statistics thrift message.
virtual void EncodeToThrift(parquet::Statistics* out) const = 0;
+ /// Writes the current values into IcebergColumnStats struct.
+ /// Min and max stats are Single-value serialized.
+ virtual void GetIcebergStats(int64_t column_size, IcebergColumnStats* out) const = 0;
+
/// Resets the state of this object.
void Reset();
@@ -203,6 +208,9 @@ class ColumnStats : public ColumnStatsBase {
virtual int64_t BytesNeeded() const override;
virtual void EncodeToThrift(parquet::Statistics* out) const override;
+ virtual void GetIcebergStats(int64_t column_size,
+ IcebergColumnStats* out) const override;
+
/// Decodes the plain encoded stats value from 'buffer' and writes the result into the
/// buffer pointed to by 'slot'. Returns true if decoding was successful, false
/// otherwise. For timestamps and dates an additional validation will be performed.
@@ -210,11 +218,18 @@ class ColumnStats : public ColumnStatsBase {
parquet::Type::type parquet_type);
protected:
+ /// For BE tests.
+ FRIEND_TEST(SerializeSingleValueTest, Decimal);
+
/// Encodes a single value using parquet's plain encoding and stores it into the binary
/// string 'out'. String values are stored without additional encoding. 'bytes_needed'
/// must be positive.
static void EncodePlainValue(const T& v, int64_t bytes_needed, std::string* out);
+ /// Single-value serialize value 'v'.
+ /// https://iceberg.apache.org/spec/#appendix-d-single-value-serialization
+ static void SerializeIcebergSingleValue(const T& v, std::string* out);
+
/// Returns the number of bytes needed to encode value 'v'.
int64_t BytesNeeded(const T& v) const;
diff --git a/be/src/exec/parquet/parquet-column-stats.inline.h b/be/src/exec/parquet/parquet-column-stats.inline.h
index eb29e1c..6c9c91a 100644
--- a/be/src/exec/parquet/parquet-column-stats.inline.h
+++ b/be/src/exec/parquet/parquet-column-stats.inline.h
@@ -22,6 +22,7 @@
#include "gen-cpp/parquet_types.h"
#include "parquet-column-stats.h"
#include "runtime/string-value.inline.h"
+#include "util/bit-util.h"
namespace impala {
@@ -312,5 +313,76 @@ inline Status ColumnStats<StringValue>::MaterializeStringValuesToInternalBuffers
return Status::OK();
}
+template <typename T>
+inline void ColumnStats<T>::GetIcebergStats(
+ int64_t column_size, IcebergColumnStats* out) const {
+ DCHECK(out != nullptr);
+ out->has_min_max_values = has_min_max_values_;
+ if (out->has_min_max_values) {
+ SerializeIcebergSingleValue(min_value_, &out->min_binary);
+ SerializeIcebergSingleValue(max_value_, &out->max_binary);
+ }
+ out->null_count = null_count_;
+ // Column size is not traced by ColumnStats.
+ out->column_size = column_size;
+}
+
+// Works for bool(TYPE_BOOLEAN), int32_t(TYPE_INT), int64_t(TYPE_BIGINT, TYPE_TIMESTAMP),
+// float(TYPE_FLOAT), double(TYPE_DOUBLE), DateValue (TYPE_DATE).
+// Serialize values as sizeof(T)-byte little endian.
+template <typename T>
+inline void ColumnStats<T>::SerializeIcebergSingleValue(const T& v, std::string* out) {
+ static_assert(sizeof(T) == 1 || sizeof(T) == 2 || sizeof(T) == 4 || sizeof(T) == 8);
+#if __BYTE_ORDER == __LITTLE_ENDIAN
+ const char* data = reinterpret_cast<const char*>(&v);
+ out->assign(data, sizeof(T));
+#else
+ char little_endian[sizeof(T)] = {};
+ BitUtil::ByteSwap(little_endian, reinterpret_cast<const char*>(&v), sizeof(T));
+ out->assign(little_endian, sizeof(T));
+#endif
+}
+
+// This should never be called.
+// With iceberg timestamp values are stored as int64_t with microseconds precision
+// (HdfsParquetTableWriter::timestamp_type_ is set to INT64_MICROS).
+template <>
+inline void ColumnStats<TimestampValue>::SerializeIcebergSingleValue(
+ const TimestampValue& v, std::string* out) {
+ DCHECK(false);
+}
+
+// Serialize StringValue values as UTF-8 bytes (without length).
+template <>
+inline void ColumnStats<StringValue>::SerializeIcebergSingleValue(
+ const StringValue& v, std::string* out) {
+ // With iceberg 'v' is UTF-8 encoded (HdfsParquetTableWriter::string_utf8_ is always set
+ // to true).
+ out->assign(v.ptr, v.len);
+}
+
+// Serialize Decimal4Value / Decimal8Value / Decimal16Value: values as unscaled,
+// two’s-complement big-endian binary, using the minimum number of bytes for the value.
+#define SERIALIZE_ICEBERG_DECIMAL_SINGLE_VALUE(DecimalValue) \
+template <> \
+inline void ColumnStats<DecimalValue>::SerializeIcebergSingleValue( \
+ const DecimalValue& v, std::string* out) { \
+ const auto big_endian_val = BitUtil::ToBigEndian(v.value()); \
+ const uint8_t* first = reinterpret_cast<const uint8_t*>(&big_endian_val); \
+ const uint8_t* last = first + sizeof(big_endian_val) - 1; \
+ if ((first[0] & 0x80) != 0) { \
+ for (; first < last && first[0] == 0xFF && (first[1] & 0x80) != 0; ++first) { \
+ } \
+ } else { \
+ for (; first < last && first[0] == 0x00 && (first[1] & 0x80) == 0; ++first) { \
+ } \
+ } \
+ out->assign(reinterpret_cast<const char*>(first), last - first + 1); \
+}
+
+SERIALIZE_ICEBERG_DECIMAL_SINGLE_VALUE(Decimal4Value)
+SERIALIZE_ICEBERG_DECIMAL_SINGLE_VALUE(Decimal8Value)
+SERIALIZE_ICEBERG_DECIMAL_SINGLE_VALUE(Decimal16Value)
+
} // end ns impala
#endif
diff --git a/be/src/exec/parquet/serialize-single-value-test.cc b/be/src/exec/parquet/serialize-single-value-test.cc
new file mode 100644
index 0000000..65e76a6
--- /dev/null
+++ b/be/src/exec/parquet/serialize-single-value-test.cc
@@ -0,0 +1,66 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <string>
+
+#include "parquet-column-stats.inline.h"
+#include "runtime/decimal-value.inline.h"
+#include "testutil/gtest-util.h"
+
+#include "common/names.h"
+
+namespace impala {
+
+TEST(SerializeSingleValueTest, Decimal) {
+ Decimal4Value d4;
+ Decimal8Value d8;
+ bool overflow = false;
+ std::string out;
+
+ d4 = Decimal4Value::FromInt(9, 4, -1, &overflow);
+ EXPECT_FALSE(overflow);
+ ColumnStats<Decimal4Value>::SerializeIcebergSingleValue(d4, &out);
+ // -10000 is 0xffffd8f0, but the result contains only the minimum number of bytes.
+ // Only 2 bytes are required.
+ EXPECT_EQ(std::string("\xD8\xF0", 2), out);
+
+ d4 = Decimal4Value::FromInt(9, 0, -1, &overflow);
+ EXPECT_FALSE(overflow);
+ ColumnStats<Decimal4Value>::SerializeIcebergSingleValue(d4, &out);
+ // -1 is 0xffffffff, but the result contains only the minimum number of bytes.
+ // Only 1 byte is required.
+ EXPECT_EQ(std::string("\xFF", 1), out);
+
+ d8 = Decimal8Value::FromInt(18, 0, -2147483647, &overflow);
+ EXPECT_FALSE(overflow);
+ ColumnStats<Decimal8Value>::SerializeIcebergSingleValue(d8, &out);
+ // -2147483647 is 0xffffffff80000001. Only 4 bytes are required.
+ EXPECT_EQ(std::string("\x80\x00\x00\x01", 4), out);
+
+ d4 = Decimal4Value::FromInt(9, 4, 0, &overflow);
+ EXPECT_FALSE(overflow);
+ ColumnStats<Decimal4Value>::SerializeIcebergSingleValue(d4, &out);
+ // 0 is 0x00, 1 byte is required
+ EXPECT_EQ(std::string("\x00", 1), out);
+
+ d8 = Decimal8Value::FromInt(18, 0, 2147483647, &overflow);
+ EXPECT_FALSE(overflow);
+ ColumnStats<Decimal8Value>::SerializeIcebergSingleValue(d8, &out);
+ // 2147483647 is 0x000000007fffffff. Only 4 bytes are required.
+ EXPECT_EQ(std::string("\x7F\xFF\xFF\xFF", 4), out);
+}
+}
diff --git a/be/src/runtime/dml-exec-state.cc b/be/src/runtime/dml-exec-state.cc
index 5faad8d..862eab8 100644
--- a/be/src/runtime/dml-exec-state.cc
+++ b/be/src/runtime/dml-exec-state.cc
@@ -463,8 +463,66 @@ void DmlExecState::UpdatePartition(const string& partition_name,
MergeDmlStats(*insert_stats, entry->second.mutable_stats());
}
-void DmlExecState::AddCreatedFile(const OutputPartition& partition) {
+namespace {
+flatbuffers::Offset<org::apache::impala::fb::FbIcebergColumnStats>
+createIcebergColumnStats(
+ flatbuffers::FlatBufferBuilder& fbb, int field_id,
+ const IcebergColumnStats& col_stats) {
using namespace org::apache::impala::fb;
+
+ flatbuffers::Offset<flatbuffers::Vector<uint8_t>> lower_bound;
+ flatbuffers::Offset<flatbuffers::Vector<uint8_t>> upper_bound;
+ if (col_stats.has_min_max_values) {
+ const std::string& min_binary = col_stats.min_binary;
+ const uint8_t* data = reinterpret_cast<const uint8_t*>(min_binary.data());
+ lower_bound = fbb.CreateVector(data, min_binary.size());
+
+ const std::string& max_binary = col_stats.max_binary;
+ data = reinterpret_cast<const uint8_t*>(max_binary.data());
+ upper_bound = fbb.CreateVector(data, max_binary.size());
+ }
+
+ FbIcebergColumnStatsBuilder stats_builder(fbb);
+ stats_builder.add_field_id(field_id);
+
+ stats_builder.add_total_compressed_byte_size(col_stats.column_size);
+ stats_builder.add_null_count(col_stats.null_count);
+
+ if (col_stats.has_min_max_values) {
+ stats_builder.add_lower_bound(lower_bound);
+ stats_builder.add_upper_bound(upper_bound);
+ }
+
+ return stats_builder.Finish();
+}
+
+string createIcebergDataFileString(
+ const string& partition_name, const string& final_path, int64_t num_rows,
+ int64_t file_size, const IcebergFileStats& insert_stats) {
+ using namespace org::apache::impala::fb;
+ flatbuffers::FlatBufferBuilder fbb;
+
+ vector<flatbuffers::Offset<FbIcebergColumnStats>> ice_col_stats_vec;
+ for (auto it = insert_stats.cbegin(); it != insert_stats.cend(); ++it) {
+ ice_col_stats_vec.push_back(createIcebergColumnStats(fbb, it->first, it->second));
+ }
+
+ flatbuffers::Offset<FbIcebergDataFile> data_file = CreateFbIcebergDataFile(fbb,
+ fbb.CreateString(final_path),
+ // Currently we can only write Parquet to Iceberg
+ FbFileFormat::FbFileFormat_PARQUET,
+ num_rows,
+ file_size,
+ fbb.CreateString(partition_name),
+ fbb.CreateVector(ice_col_stats_vec));
+ fbb.Finish(data_file);
+ return string(reinterpret_cast<char*>(fbb.GetBufferPointer()), fbb.GetSize());
+}
+
+}
+
+void DmlExecState::AddCreatedFile(const OutputPartition& partition, bool is_iceberg,
+ const IcebergFileStats& insert_stats) {
lock_guard<mutex> l(lock_);
const string& partition_name = partition.partition_name;
PartitionStatusMap::iterator entry = per_partition_status_.find(partition_name);
@@ -478,21 +536,11 @@ void DmlExecState::AddCreatedFile(const OutputPartition& partition) {
}
file->set_num_rows(partition.current_file_rows);
file->set_size(partition.current_file_bytes);
-}
-
-string createIcebergDataFileString(
- const string& partition_name, const DmlFileStatusPb& file) {
- using namespace org::apache::impala::fb;
- flatbuffers::FlatBufferBuilder fbb;
- flatbuffers::Offset<FbIcebergDataFile> data_file = CreateFbIcebergDataFile(fbb,
- fbb.CreateString(file.final_path()),
- // Currently we can only write Parquet to Iceberg
- FbFileFormat::FbFileFormat_PARQUET,
- file.num_rows(),
- file.size(),
- fbb.CreateString(partition_name));
- fbb.Finish(data_file);
- return string(reinterpret_cast<char*>(fbb.GetBufferPointer()), fbb.GetSize());
+ if (is_iceberg) {
+ file->set_iceberg_data_file_fb(
+ createIcebergDataFileString(partition_name, file->final_path(), file->num_rows(),
+ file->size(), insert_stats));
+ }
}
vector<string> DmlExecState::CreateIcebergDataFilesVector() {
@@ -501,7 +549,9 @@ vector<string> DmlExecState::CreateIcebergDataFilesVector() {
for (const PartitionStatusMap::value_type& partition : per_partition_status_) {
for (int i = 0; i < partition.second.created_files_size(); ++i) {
const DmlFileStatusPb& file = partition.second.created_files(i);
- ret.emplace_back(createIcebergDataFileString(partition.first, file));
+ if (file.has_iceberg_data_file_fb()) {
+ ret.push_back(file.iceberg_data_file_fb());
+ }
}
}
return ret;
diff --git a/be/src/runtime/dml-exec-state.h b/be/src/runtime/dml-exec-state.h
index c6b9a1a..ecb150f 100644
--- a/be/src/runtime/dml-exec-state.h
+++ b/be/src/runtime/dml-exec-state.h
@@ -24,12 +24,14 @@
#include "common/hdfs.h"
#include "common/status.h"
+#include "exec/hdfs-table-writer.h"
namespace impala {
class DmlExecStatusPB;
class DmlPartitionStatusPB;
class DmlStatsPB;
+class DmlDataFileStatsPB;
struct OutputPartition;
class TDmlResult;
class TFinalizeParams;
@@ -71,7 +73,9 @@ class DmlExecState {
int64_t num_modified_rows_delta, const DmlStatsPB* insert_stats);
/// Extract information from 'partition', and add a new Iceberg data file.
- void AddCreatedFile(const OutputPartition& partition);
+ /// 'insert_stats' contains stats for the Iceberg data file.
+ void AddCreatedFile(const OutputPartition& partition, bool is_iceberg,
+ const IcebergFileStats& insert_stats);
/// Used to initialize this state when execute Kudu DML. Must be called before
/// SetKuduDmlStats().
diff --git a/be/src/util/bit-util.h b/be/src/util/bit-util.h
index 87ab094..b813880 100644
--- a/be/src/util/bit-util.h
+++ b/be/src/util/bit-util.h
@@ -204,6 +204,11 @@ class BitUtil {
/// Converts to big endian format (if not already in big endian) from the
/// machine's native endian format.
#if __BYTE_ORDER == __LITTLE_ENDIAN
+ static inline __int128_t ToBigEndian(__int128_t value) {
+ __int128_t res = 0;
+ ByteSwap(&res, &value, sizeof(__int128_t));
+ return res;
+ }
static inline int64_t ToBigEndian(int64_t value) { return ByteSwap(value); }
static inline uint64_t ToBigEndian(uint64_t value) { return ByteSwap(value); }
static inline int32_t ToBigEndian(int32_t value) { return ByteSwap(value); }
@@ -211,6 +216,7 @@ class BitUtil {
static inline int16_t ToBigEndian(int16_t value) { return ByteSwap(value); }
static inline uint16_t ToBigEndian(uint16_t value) { return ByteSwap(value); }
#else
+ static inline __int128_t ToBigEndian(__int128_t val) { return val; }
static inline int64_t ToBigEndian(int64_t val) { return val; }
static inline uint64_t ToBigEndian(uint64_t val) { return val; }
static inline int32_t ToBigEndian(int32_t val) { return val; }
@@ -221,6 +227,11 @@ class BitUtil {
/// Converts from big endian format to the machine's native endian format.
#if __BYTE_ORDER == __LITTLE_ENDIAN
+ static inline __int128_t FromBigEndian(__int128_t value) {
+ __int128_t res = 0;
+ ByteSwap(&res, &value, sizeof(__int128_t));
+ return res;
+ }
static inline int64_t FromBigEndian(int64_t value) { return ByteSwap(value); }
static inline uint64_t FromBigEndian(uint64_t value) { return ByteSwap(value); }
static inline int32_t FromBigEndian(int32_t value) { return ByteSwap(value); }
@@ -228,6 +239,7 @@ class BitUtil {
static inline int16_t FromBigEndian(int16_t value) { return ByteSwap(value); }
static inline uint16_t FromBigEndian(uint16_t value) { return ByteSwap(value); }
#else
+ static inline __int128_t FromBigEndian(__int128_t val) { return val; }
static inline int64_t FromBigEndian(int64_t val) { return val; }
static inline uint64_t FromBigEndian(uint64_t val) { return val; }
static inline int32_t FromBigEndian(int32_t val) { return val; }
diff --git a/common/fbs/IcebergObjects.fbs b/common/fbs/IcebergObjects.fbs
index 80d7f1d..585c300 100644
--- a/common/fbs/IcebergObjects.fbs
+++ b/common/fbs/IcebergObjects.fbs
@@ -22,11 +22,20 @@ enum FbFileFormat: byte {
ORC
}
+table FbIcebergColumnStats {
+ field_id: int;
+ total_compressed_byte_size: long;
+ null_count: long;
+ lower_bound: [ubyte];
+ upper_bound: [ubyte];
+}
+
table FbIcebergDataFile {
path: string;
format: FbFileFormat = PARQUET;
record_count: long = 0;
file_size_in_bytes: long = 0;
partition_path: string;
+ per_column_stats: [FbIcebergColumnStats];
}
diff --git a/common/protobuf/control_service.proto b/common/protobuf/control_service.proto
index 69a286a..4a4a3cb 100644
--- a/common/protobuf/control_service.proto
+++ b/common/protobuf/control_service.proto
@@ -63,6 +63,10 @@ message DmlFileStatusPb {
// TODO: this could be merged with final_path by storing only the suffix
// of the path and append it to a staging/final prefix from DmlPartitionStatusPB
optional string staging_path = 4;
+
+ // Flat buffer encoded Iceberg data file (see FbIcebergDataFile). Contains metadata and
+ // stats for the iceberg file.
+ optional bytes iceberg_data_file_fb = 5;
}
// Per-partition statistics and metadata resulting from DML statements.
diff --git a/fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java
index 01351ad..33a47df 100644
--- a/fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java
@@ -28,13 +28,14 @@ import org.apache.iceberg.BaseTable;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DataFiles;
import org.apache.iceberg.DeleteFiles;
-import org.apache.iceberg.UpdateSchema;
+import org.apache.iceberg.Metrics;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.ReplacePartitions;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.UpdateSchema;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.types.Types;
@@ -47,6 +48,7 @@ import org.apache.impala.catalog.TableNotFoundException;
import org.apache.impala.catalog.Type;
import org.apache.impala.catalog.iceberg.IcebergCatalog;
import org.apache.impala.common.ImpalaRuntimeException;
+import org.apache.impala.fb.FbIcebergColumnStats;
import org.apache.impala.fb.FbIcebergDataFile;
import org.apache.impala.thrift.TColumn;
import org.apache.impala.thrift.TCreateTableParams;
@@ -273,9 +275,12 @@ public class IcebergCatalogOpExecutor {
}
for (ByteBuffer buf : dataFilesFb) {
FbIcebergDataFile dataFile = FbIcebergDataFile.getRootAsFbIcebergDataFile(buf);
+
PartitionSpec partSpec = nativeIcebergTable.specs().get(icebergOp.getSpec_id());
+ Metrics metrics = buildDataFileMetrics(feIcebergTable, dataFile);
DataFiles.Builder builder =
DataFiles.builder(partSpec)
+ .withMetrics(metrics)
.withPath(dataFile.path())
.withFormat(IcebergUtil.fbFileFormatToIcebergFileFormat(dataFile.format()))
.withRecordCount(dataFile.recordCount())
@@ -289,6 +294,32 @@ public class IcebergCatalogOpExecutor {
batchWrite.commit();
}
+ private static Metrics buildDataFileMetrics(FeIcebergTable feIcebergTable,
+ FbIcebergDataFile dataFile) {
+ Map<Integer, Long> columnSizes = new HashMap<>();
+ Map<Integer, Long> nullValueCounts = new HashMap<>();
+ Map<Integer, ByteBuffer> lowerBounds = new HashMap<>();
+ Map<Integer, ByteBuffer> upperBounds = new HashMap<>();
+ for (int i = 0; i < dataFile.perColumnStatsLength(); ++i) {
+ FbIcebergColumnStats stats = dataFile.perColumnStats(i);
+ if (stats != null) {
+ int fieldId = stats.fieldId();
+ if (fieldId != -1) {
+ columnSizes.put(fieldId, stats.totalCompressedByteSize());
+ nullValueCounts.put(fieldId, stats.nullCount());
+ if (stats.lowerBoundLength() > 0) {
+ lowerBounds.put(fieldId, stats.lowerBoundAsByteBuffer());
+ }
+ if (stats.upperBoundLength() > 0) {
+ upperBounds.put(fieldId, stats.upperBoundAsByteBuffer());
+ }
+ }
+ }
+ }
+ return new Metrics(dataFile.recordCount(), columnSizes, null,
+ nullValueCounts, null, lowerBounds, upperBounds);
+ }
+
/**
* Creates new snapshot for the iceberg table by deleting all data files.
*/
diff --git a/infra/python/deps/requirements.txt b/infra/python/deps/requirements.txt
index f86f084..cefff6d 100644
--- a/infra/python/deps/requirements.txt
+++ b/infra/python/deps/requirements.txt
@@ -66,6 +66,7 @@ six == 1.14.0
sqlparse == 0.3.1
texttable == 0.8.3
virtualenv == 16.7.10
+avro==1.10.2
# Required for Kudu:
Cython == 0.29.14
diff --git a/testdata/workloads/functional-query/queries/QueryTest/iceberg-partition-transform-insert.test b/testdata/workloads/functional-query/queries/QueryTest/iceberg-partition-transform-insert.test
index 1d75729..01a0513 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/iceberg-partition-transform-insert.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/iceberg-partition-transform-insert.test
@@ -106,7 +106,7 @@ where i = 2;
INT,STRING,DATE,TIMESTAMP
---- RUNTIME_PROFILE
aggregation(SUM, RowsRead): 1
-aggregation(SUM, NumRowGroups): 3
+aggregation(SUM, NumRowGroups): 1
====
---- QUERY
# Test partition pruning with RUNTIME_PROFILE.
@@ -142,7 +142,7 @@ where t = '2021-01-03 03:03:03.030000000';
INT,STRING,DATE,TIMESTAMP
---- RUNTIME_PROFILE
aggregation(SUM, RowsRead): 1
-aggregation(SUM, NumRowGroups): 2
+aggregation(SUM, NumRowGroups): 1
====
---- QUERY
# Test single col TRUNCATE
@@ -269,7 +269,7 @@ where s = 'the quick brown fox jumps over the lazy dog';
INT,BIGINT,DECIMAL,STRING
---- RUNTIME_PROFILE
aggregation(SUM, RowsRead): 2
-aggregation(SUM, NumRowGroups): 3
+aggregation(SUM, NumRowGroups): 2
====
---- QUERY
# Test partition pruning with RUNTIME_PROFILE.
@@ -280,7 +280,7 @@ where s = 'the quick impala';
INT,BIGINT,DECIMAL,STRING
---- RUNTIME_PROFILE
aggregation(SUM, RowsRead): 0
-aggregation(SUM, NumRowGroups): 1
+aggregation(SUM, NumRowGroups): 0
====
---- QUERY
# Test partition pruning with RUNTIME_PROFILE.
@@ -402,7 +402,7 @@ where t = '1969-02-15 13:55:03';
TIMESTAMP,DATE
---- RUNTIME_PROFILE
aggregation(SUM, RowsRead): 2
-aggregation(SUM, NumRowGroups): 2
+aggregation(SUM, NumRowGroups): 1
====
---- QUERY
# Test partition pruning with RUNTIME_PROFILE.
@@ -452,7 +452,7 @@ where d = '1969-12-15';
TIMESTAMP,DATE
---- RUNTIME_PROFILE
aggregation(SUM, RowsRead): 2
-aggregation(SUM, NumRowGroups): 2
+aggregation(SUM, NumRowGroups): 1
====
---- QUERY
# Create table with MONTH partition transform
@@ -560,7 +560,7 @@ where d = '1969-12-15';
TIMESTAMP,DATE
---- RUNTIME_PROFILE
aggregation(SUM, RowsRead): 1
-aggregation(SUM, NumRowGroups): 2
+aggregation(SUM, NumRowGroups): 1
====
---- QUERY
# Create table with DAY partition transform
@@ -739,7 +739,7 @@ where t = '1969-12-31 22:55:03';
TIMESTAMP
---- RUNTIME_PROFILE
aggregation(SUM, RowsRead): 1
-aggregation(SUM, NumRowGroups): 2
+aggregation(SUM, NumRowGroups): 1
====
---- QUERY
# Test partition pruning with RUNTIME_PROFILE.
@@ -751,7 +751,7 @@ where t = '1969-12-31 23:55:03';
TIMESTAMP
---- RUNTIME_PROFILE
aggregation(SUM, RowsRead): 2
-aggregation(SUM, NumRowGroups): 2
+aggregation(SUM, NumRowGroups): 1
====
---- QUERY
# Test partition pruning with RUNTIME_PROFILE.
@@ -827,7 +827,7 @@ where s = 'quick brown dog';
STRING,BIGINT,DECIMAL,TIMESTAMP,DATE
---- RUNTIME_PROFILE
aggregation(SUM, RowsRead): 1
-aggregation(SUM, NumRowGroups): 3
+aggregation(SUM, NumRowGroups): 1
====
---- QUERY
# Test partition pruning with RUNTIME_PROFILE.
diff --git a/testdata/workloads/functional-query/queries/QueryTest/iceberg-partitioned-insert.test b/testdata/workloads/functional-query/queries/QueryTest/iceberg-partitioned-insert.test
index 6ee2edd..c8d3a68 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/iceberg-partitioned-insert.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/iceberg-partitioned-insert.test
@@ -211,7 +211,8 @@ BIGINT
aggregation(SUM, NumRowGroups): 4
====
---- QUERY
-# 'timestamp_col' is not a partitioning column.
+# 'timestamp_col' is not a partitioning column, but min/max stats will be used to
+# eliminate row groups
select count(*) from alltypes_part
where timestamp_col = now();
---- RESULTS
@@ -219,7 +220,7 @@ where timestamp_col = now();
---- TYPES
BIGINT
---- RUNTIME_PROFILE
-aggregation(SUM, NumRowGroups): 8
+aggregation(SUM, NumRowGroups): 0
====
---- QUERY
# Iceberg partitions independent of column order
diff --git a/testdata/workloads/functional-query/queries/QueryTest/iceberg-upper-lower-bound-metrics.test b/testdata/workloads/functional-query/queries/QueryTest/iceberg-upper-lower-bound-metrics.test
new file mode 100644
index 0000000..d20fef8
--- /dev/null
+++ b/testdata/workloads/functional-query/queries/QueryTest/iceberg-upper-lower-bound-metrics.test
@@ -0,0 +1,500 @@
+====
+---- QUERY
+# Test STRING/INT/BIGINT/FLOAT/DOUBLE metrics
+create table ice_types1 (p INT, s STRING, i INT, bi BIGINT, f FLOAT, db DOUBLE)
+partitioned by spec (p)
+stored as iceberg
+tblproperties ('write.format.default'='parquet');
+---- RESULTS
+'Table has been created.'
+====
+---- QUERY
+insert into ice_types1 values
+ (0, 'aluminium', 1, 1, 0.126, 0.126), (0, 'bromine', 2, 2, 0.131, 0.131), (0, 'carbon', 3, 3, 3.45, 3.45),
+ (1, 'dubnium', 4, 4, 3.567, 3.567), (1, 'europium', 5, 5, 3.99, 3.99), (1, 'fermium', 6, 6, 4.01, 4.01),
+ (2, 'fermium', 6, 6, 4.01, 4.01), (2, 'helium', 8, 8, 4.65, 4.65), (2, 'iridium', 9, 9, 4.67, 4.67);
+select count(*) from ice_types1;
+---- RESULTS
+9
+---- TYPES
+BIGINT
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 3
+====
+---- QUERY
+show files in ice_types1;
+---- RESULTS: VERIFY_IS_SUBSET
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/ice_types1/data/p=0/.*.0.parq','.*',''
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/ice_types1/data/p=1/.*.0.parq','.*',''
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/ice_types1/data/p=2/.*.0.parq','.*',''
+---- TYPES
+STRING, STRING, STRING
+====
+---- QUERY
+# Lower/upper bounds metrics eliminate all row groups
+select count(*) from ice_types1
+where s >= 'z';
+---- RESULTS
+0
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 0
+aggregation(SUM, RowsRead): 0
+====
+---- QUERY
+# Where condition matches one row group's metrics
+select count(*) from ice_types1
+where s >= 'b' and s <= 'cz';
+---- RESULTS
+2
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 1
+aggregation(SUM, RowsRead): 3
+====
+---- QUERY
+# Where condition spans over 2 row groups
+select count(*) from ice_types1
+where s >= 'b' and s <= 'dz';
+---- RESULTS
+3
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 2
+aggregation(SUM, RowsRead): 6
+====
+---- QUERY
+# 'fermium' is the upper bound of one row group and the lower bound of another
+select count(*) from ice_types1
+where s = 'fermium';
+---- RESULTS
+2
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 2
+aggregation(SUM, RowsRead): 6
+====
+---- QUERY
+# Lower/upper bounds metrics eliminate all row groups
+select count(*) from ice_types1
+where i >= 10;
+---- RESULTS
+0
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 0
+aggregation(SUM, RowsRead): 0
+====
+---- QUERY
+# Where condition matches one row group's metrics
+select count(*) from ice_types1
+where i >= 2 and i <= 3;
+---- RESULTS
+2
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 1
+aggregation(SUM, RowsRead): 3
+====
+---- QUERY
+# Where condition spans over 2 row groups
+select count(*) from ice_types1
+where i >= 2 and i <= 4;
+---- RESULTS
+3
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 2
+aggregation(SUM, RowsRead): 6
+====
+---- QUERY
+# 6 is the upper bound of one row group and the lower bound of another
+select count(*) from ice_types1
+where i = 6;
+---- RESULTS
+2
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 2
+aggregation(SUM, RowsRead): 6
+====
+---- QUERY
+# Lower/upper bounds metrics eliminate all row groups
+select count(*) from ice_types1
+where bi >= 10;
+---- RESULTS
+0
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 0
+aggregation(SUM, RowsRead): 0
+====
+---- QUERY
+# Where condition matches one row group's metrics
+select count(*) from ice_types1
+where bi >= 2 and bi <= 3;
+---- RESULTS
+2
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 1
+aggregation(SUM, RowsRead): 3
+====
+---- QUERY
+# Where condition spans over 2 row groups
+select count(*) from ice_types1
+where bi >= 2 and bi <= 4;
+---- RESULTS
+3
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 2
+aggregation(SUM, RowsRead): 6
+====
+---- QUERY
+# 6 is the upper bound of one row group and the lower bound of another
+select count(*) from ice_types1
+where bi = 6;
+---- RESULTS
+2
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 2
+aggregation(SUM, RowsRead): 6
+====
+---- QUERY
+# Lower/upper bounds metrics eliminate all row groups
+select count(*) from ice_types1
+where f >= 5.0;
+---- RESULTS
+0
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 0
+aggregation(SUM, RowsRead): 0
+====
+---- QUERY
+# Where condition matches one row group's metrics
+select count(*) from ice_types1
+where f >= 0.13 and f <= 3.451;
+---- RESULTS
+2
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 1
+aggregation(SUM, RowsRead): 3
+====
+---- QUERY
+# Where condition spans over 2 row groups
+select count(*) from ice_types1
+where f >= 0.13 and f <= 3.57;
+---- RESULTS
+3
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 2
+aggregation(SUM, RowsRead): 6
+====
+---- QUERY
+# 4.01 is the upper bound of one row group and the lower bound of another
+select count(*) from ice_types1
+where f = 4.01;
+---- RESULTS
+2
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 2
+aggregation(SUM, RowsRead): 6
+====
+---- QUERY
+# Lower/upper bounds metrics eliminate all row groups
+select count(*) from ice_types1
+where db >= 5.0;
+---- RESULTS
+0
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 0
+aggregation(SUM, RowsRead): 0
+====
+---- QUERY
+# Where condition matches one row group's metrics
+select count(*) from ice_types1
+where db >= 0.13 and db <= 3.451;
+---- RESULTS
+2
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 1
+aggregation(SUM, RowsRead): 3
+====
+---- QUERY
+# Where condition spans over 2 row groups
+select count(*) from ice_types1
+where db >= 0.13 and db <= 3.57;
+---- RESULTS
+3
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 2
+aggregation(SUM, RowsRead): 6
+====
+---- QUERY
+# 4.01 is the upper bound of one row group and the lower bound of another
+select count(*) from ice_types1
+where db = 4.01;
+---- RESULTS
+2
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 2
+aggregation(SUM, RowsRead): 6
+====
+---- QUERY
+# Test DATE/TIMESTAMP metrics
+create table ice_types2 (p INT, dt DATE, ts TIMESTAMP)
+partitioned by spec (p)
+stored as iceberg
+tblproperties ('write.format.default'='parquet');
+---- RESULTS
+'Table has been created.'
+====
+---- QUERY
+insert into ice_types2 values
+ (0, DATE'1400-01-01', '1400-01-01 00:00:00'), (0, DATE'1969-12-31', '1969-12-31 12:34:59'), (0, DATE'1970-01-01', '1969-12-31 12:35:00 '),
+ (1, DATE'1970-01-02', '1969-12-31 12:35:01'), (1, DATE'1999-12-30', '1999-12-30 01:11:35'), (1, DATE'1999-12-31', '1999-12-31 23:59:59'),
+ (2, DATE'1999-12-31', '1999-12-31 23:59:59'), (2, DATE'9999-12-30', '9999-12-30 00:23:42'), (2, DATE'9999-12-30', '9999-12-31 03:04:05');
+select count(*) from ice_types2;
+---- RESULTS
+9
+---- TYPES
+BIGINT
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 3
+====
+---- QUERY
+show files in ice_types2;
+---- RESULTS: VERIFY_IS_SUBSET
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/ice_types2/data/p=0/.*.0.parq','.*',''
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/ice_types2/data/p=1/.*.0.parq','.*',''
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/ice_types2/data/p=2/.*.0.parq','.*',''
+---- TYPES
+STRING, STRING, STRING
+====
+---- QUERY
+# Lower/upper bounds metrics eliminate all row groups
+select count(*) from ice_types2
+where dt >= DATE'9999-12-31';
+---- RESULTS
+0
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 0
+aggregation(SUM, RowsRead): 0
+====
+---- QUERY
+# Where condition matches one row group's metrics
+select count(*) from ice_types2
+where dt >= DATE'1969-12-31' and dt <= DATE'1970-01-01';
+---- RESULTS
+2
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 1
+aggregation(SUM, RowsRead): 3
+====
+---- QUERY
+# Where condition spans over 2 row groups
+select count(*) from ice_types2
+where dt >= DATE'1969-12-31' and dt <= DATE'1970-01-02';
+---- RESULTS
+3
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 2
+aggregation(SUM, RowsRead): 6
+====
+---- QUERY
+# 1999-12-31 is the upper bound of one row group and the lower bound of another
+select count(*) from ice_types2
+where dt = DATE'1999-12-31';
+---- RESULTS
+2
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 2
+aggregation(SUM, RowsRead): 6
+====
+---- QUERY
+# Lower/upper bounds metrics eliminate all row groups
+select count(*) from ice_types2
+where ts >= '9999-12-31 03:04:06';
+---- RESULTS
+0
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 0
+aggregation(SUM, RowsRead): 0
+====
+---- QUERY
+# Where condition matches one row group's metrics
+select count(*) from ice_types2
+where dt >= '1969-12-31 12:34:59' and ts <= '1969-12-31 12:35:00';
+---- RESULTS
+2
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 1
+aggregation(SUM, RowsRead): 3
+====
+---- QUERY
+# Where condition spans over 2 row groups
+select count(*) from ice_types2
+where ts >= '1969-12-31 12:34:59' and ts <= '1969-12-31 12:35:01';
+---- RESULTS
+3
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 2
+aggregation(SUM, RowsRead): 6
+====
+---- QUERY
+# '1999-12-31 23:59:59' is the upper bound of one row group and the lower bound of another
+select count(*) from ice_types2
+where ts = '1999-12-31 23:59:59';
+---- RESULTS
+2
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 2
+aggregation(SUM, RowsRead): 6
+====
+---- QUERY
+# Test DECIMAL metrics
+create table ice_types3 (p INT, d1 DECIMAL(9, 3), d2 DECIMAL(18, 3), d3 DECIMAL(38, 3))
+partitioned by spec (p)
+stored as iceberg
+tblproperties ('write.format.default'='parquet');
+---- RESULTS
+'Table has been created.'
+====
+---- QUERY
+insert into ice_types3 values
+ (0, 123.456, 1234567890.123, 1234567890123456789.012),
+ (0, 123.457, 1234567890.124, 1234567890123456789.013),
+ (0, 123.458, 1234567890.125, 1234567890123456789.014),
+ (1, 123.459, 1234567890.126, 1234567890123456789.015),
+ (1, 333.333, 3333333333.333, 3333333333333333333.333),
+ (1, 341.234, 3412345678.901, 3412345678901234567.89),
+ (2, 341.234, 3412345678.901, 3412345678901234567.89),
+ (2, 123456.789, 123456789012345.678, 12345678901234567890123456789012345.678),
+ (2, 123456.790, 123456789012345.679, 12345678901234567890123456789012345.68);
+select count(*) from ice_types3;
+---- RESULTS
+9
+---- TYPES
+BIGINT
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 3
+====
+---- QUERY
+show files in ice_types3;
+---- RESULTS: VERIFY_IS_SUBSET
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/ice_types3/data/p=0/.*.0.parq','.*',''
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/ice_types3/data/p=1/.*.0.parq','.*',''
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/ice_types3/data/p=2/.*.0.parq','.*',''
+---- TYPES
+STRING, STRING, STRING
+====
+---- QUERY
+# Lower/upper bounds metrics eliminate all row groups
+select count(*) from ice_types3
+where d1 >= 123456.791;
+---- RESULTS
+0
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 0
+aggregation(SUM, RowsRead): 0
+====
+---- QUERY
+# Where condition matches one row group's metrics
+select count(*) from ice_types3
+where d1 >= 123.457 and d1 <= 123.458;
+---- RESULTS
+2
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 1
+aggregation(SUM, RowsRead): 3
+====
+---- QUERY
+# Where condition spans over 2 row groups
+select count(*) from ice_types3
+where d1 >= 123.457 and d1 <= 123.459;
+---- RESULTS
+3
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 2
+aggregation(SUM, RowsRead): 6
+====
+---- QUERY
+# 341.234 is the upper bound of one row group and the lower bound of another
+select count(*) from ice_types3
+where d1 = 341.234;
+---- RESULTS
+2
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 2
+aggregation(SUM, RowsRead): 6
+====
+---- QUERY
+# Lower/upper bounds metrics eliminate all row groups
+select count(*) from ice_types3
+where d2 >= 123456789012345.680;
+---- RESULTS
+0
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 0
+aggregation(SUM, RowsRead): 0
+====
+---- QUERY
+# Where condition matches one row group's metrics
+select count(*) from ice_types3
+where d2 >= 1234567890.124 and d2 <= 1234567890.125;
+---- RESULTS
+2
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 1
+aggregation(SUM, RowsRead): 3
+====
+---- QUERY
+# Where condition spans over 2 row groups
+select count(*) from ice_types3
+where d2 >= 1234567890.124 and d2 <= 1234567890.126;
+---- RESULTS
+3
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 2
+aggregation(SUM, RowsRead): 6
+====
+---- QUERY
+# 3412345678.901 is the upper bound of one row group and the lower bound of another
+select count(*) from ice_types3
+where d2 = 3412345678.901;
+---- RESULTS
+2
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 2
+aggregation(SUM, RowsRead): 6
+====
+---- QUERY
+# Lower/upper bounds metrics eliminate all row groups
+select count(*) from ice_types3
+where d3 >= 12345678901234567890123456789012345.681;
+---- RESULTS
+0
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 0
+aggregation(SUM, RowsRead): 0
+====
+---- QUERY
+# Where condition matches one row group's metrics
+select count(*) from ice_types3
+where d3 >= 1234567890123456789.013 and d3 <= 1234567890123456789.014;
+---- RESULTS
+2
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 1
+aggregation(SUM, RowsRead): 3
+====
+---- QUERY
+# Where condition spans over 2 row groups
+select count(*) from ice_types3
+where d3 >= 1234567890123456789.013 and d3 <= 1234567890123456789.015;
+---- RESULTS
+3
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 2
+aggregation(SUM, RowsRead): 6
+====
+---- QUERY
+# 3412345678901234567.89 is the upper bound of one row group and the lower bound of another
+select count(*) from ice_types3
+where d3 = 3412345678901234567.89;
+---- RESULTS
+2
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 2
+aggregation(SUM, RowsRead): 6
+====
diff --git a/tests/query_test/test_iceberg.py b/tests/query_test/test_iceberg.py
index 987dd42..489e384 100644
--- a/tests/query_test/test_iceberg.py
+++ b/tests/query_test/test_iceberg.py
@@ -23,6 +23,10 @@ import time
from subprocess import check_call
from parquet.ttypes import ConvertedType
+from avro.datafile import DataFileReader
+from avro.io import DatumReader
+import json
+
from tests.common.impala_test_suite import ImpalaTestSuite, LOG
from tests.common.skip import SkipIf
@@ -263,3 +267,181 @@ class TestIcebergTable(ImpalaTestSuite):
assert a_schema_element.converted_type == ConvertedType.UTF8
os.remove(local_file)
+
+ # Get hdfs path to manifest list that belongs to the sanpshot identified by
+ # 'snapshot_counter'.
+ def get_manifest_list_hdfs_path(self, tmp_path_prefix, db_name, table_name,
+ snapshot_counter):
+ local_path = '%s_%s.metadata.json' % (tmp_path_prefix, random.randint(0, 10000))
+ hdfs_path = get_fs_path('/test-warehouse/%s.db/%s/metadata/%s*.metadata.json'
+ % (db_name, table_name, snapshot_counter))
+ check_call(['hadoop', 'fs', '-copyToLocal', hdfs_path, local_path])
+
+ manifest_list_hdfs_path = None
+ try:
+ with open(local_path, 'r') as fp:
+ metadata = json.load(fp)
+ current_snapshot_id = metadata['current-snapshot-id']
+ for snapshot in metadata['snapshots']:
+ if snapshot['snapshot-id'] == current_snapshot_id:
+ manifest_list_hdfs_path = snapshot['manifest-list']
+ break
+ finally:
+ os.remove(local_path)
+ return manifest_list_hdfs_path
+
+ # Get list of hdfs paths to manifest files from the manifest list avro file.
+ def get_manifest_hdfs_path_list(self, tmp_path_prefix, manifest_list_hdfs_path):
+ local_path = '%s_%s.manifest_list.avro' % (tmp_path_prefix, random.randint(0, 10000))
+ check_call(['hadoop', 'fs', '-copyToLocal', manifest_list_hdfs_path, local_path])
+
+ manifest_hdfs_path_list = []
+ reader = None
+ try:
+ with open(local_path, 'rb') as fp:
+ reader = DataFileReader(fp, DatumReader())
+ for manifest in reader:
+ manifest_hdfs_path_list.append(manifest['manifest_path'])
+ finally:
+ if reader:
+ reader.close()
+ os.remove(local_path)
+ return manifest_hdfs_path_list
+
+ # Get 'data_file' structs from avro manifest files.
+ def get_data_file_list(self, tmp_path_prefix, manifest_hdfs_path_list):
+ datafiles = []
+ for hdfs_path in manifest_hdfs_path_list:
+ local_path = '%s_%s.manifest.avro' % (tmp_path_prefix, random.randint(0, 10000))
+ check_call(['hadoop', 'fs', '-copyToLocal', hdfs_path, local_path])
+
+ reader = None
+ try:
+ with open(local_path, 'rb') as fp:
+ reader = DataFileReader(fp, DatumReader())
+ datafiles.extend([rec['data_file'] for rec in reader])
+ finally:
+ if reader:
+ reader.close()
+ os.remove(local_path)
+ return datafiles
+
+ @SkipIf.not_hdfs
+ def test_writing_metrics_to_metadata(self, vector, unique_database):
+ # Create table
+ table_name = "ice_stats"
+ qualified_table_name = "%s.%s" % (unique_database, table_name)
+ query = 'create table %s ' \
+ '(s string, i int, b boolean, bi bigint, ts timestamp, dt date, ' \
+ 'dc decimal(10, 3)) ' \
+ 'stored as iceberg' \
+ % qualified_table_name
+ self.client.execute(query)
+
+ # Insert data
+ # 1st data file:
+ query = 'insert into %s values ' \
+ '("abc", 3, true, NULL, "1970-01-03 09:11:22", NULL, 56.34), ' \
+ '("def", NULL, false, NULL, "1969-12-29 14:45:59", DATE"1969-01-01", -10.0), ' \
+ '("ghij", 1, NULL, 123456789000000, "1970-01-01", DATE"1970-12-31", NULL), ' \
+ '(NULL, 0, NULL, 234567890000001, NULL, DATE"1971-01-01", NULL)' \
+ % qualified_table_name
+ self.execute_query(query)
+ # 2nd data file:
+ query = 'insert into %s values ' \
+ '(NULL, NULL, NULL, NULL, NULL, NULL, NULL), ' \
+ '(NULL, NULL, NULL, NULL, NULL, NULL, NULL)' \
+ % qualified_table_name
+ self.execute_query(query)
+
+ # Get hdfs path to manifest list file
+ manifest_list_hdfs_path = self.get_manifest_list_hdfs_path(
+ '/tmp/iceberg_metrics_test', unique_database, table_name, '00002')
+
+ # Get the list of hdfs paths to manifest files
+ assert manifest_list_hdfs_path is not None
+ manifest_hdfs_path_list = self.get_manifest_hdfs_path_list(
+ '/tmp/iceberg_metrics_test', manifest_list_hdfs_path)
+
+ # Get 'data_file' records from manifest files.
+ assert manifest_hdfs_path_list is not None and len(manifest_hdfs_path_list) > 0
+ datafiles = self.get_data_file_list('/tmp/iceberg_metrics_test',
+ manifest_hdfs_path_list)
+
+ # Check column stats in datafiles
+ assert datafiles is not None and len(datafiles) == 2
+
+ # The 1st datafile contains the 2 NULL rows
+ assert datafiles[0]['record_count'] == 2
+ assert datafiles[0]['column_sizes'] == \
+ [{'key': 1, 'value': 39},
+ {'key': 2, 'value': 39},
+ {'key': 3, 'value': 25},
+ {'key': 4, 'value': 39},
+ {'key': 5, 'value': 39},
+ {'key': 6, 'value': 39},
+ {'key': 7, 'value': 39}]
+ assert datafiles[0]['null_value_counts'] == \
+ [{'key': 1, 'value': 2},
+ {'key': 2, 'value': 2},
+ {'key': 3, 'value': 2},
+ {'key': 4, 'value': 2},
+ {'key': 5, 'value': 2},
+ {'key': 6, 'value': 2},
+ {'key': 7, 'value': 2}]
+ # Upper/lower bounds should be empty lists
+ assert datafiles[0]['lower_bounds'] == []
+ assert datafiles[0]['upper_bounds'] == []
+
+ # 2nd datafile
+ assert datafiles[1]['record_count'] == 4
+ assert datafiles[1]['column_sizes'] == \
+ [{'key': 1, 'value': 66},
+ {'key': 2, 'value': 56},
+ {'key': 3, 'value': 26},
+ {'key': 4, 'value': 59},
+ {'key': 5, 'value': 68},
+ {'key': 6, 'value': 56},
+ {'key': 7, 'value': 53}]
+ assert datafiles[1]['null_value_counts'] == \
+ [{'key': 1, 'value': 1},
+ {'key': 2, 'value': 1},
+ {'key': 3, 'value': 2},
+ {'key': 4, 'value': 2},
+ {'key': 5, 'value': 1},
+ {'key': 6, 'value': 1},
+ {'key': 7, 'value': 2}]
+ assert datafiles[1]['lower_bounds'] == \
+ [{'key': 1, 'value': 'abc'},
+ # INT is serialized as 4-byte little endian
+ {'key': 2, 'value': '\x00\x00\x00\x00'},
+ # BOOLEAN is serialized as 0x00 for FALSE
+ {'key': 3, 'value': '\x00'},
+ # BIGINT is serialized as 8-byte little endian
+ {'key': 4, 'value': '\x40\xaf\x0d\x86\x48\x70\x00\x00'},
+ # TIMESTAMP is serialized as 8-byte little endian (number of microseconds since
+ # 1970-01-01 00:00:00)
+ {'key': 5, 'value': '\xc0\xd7\xff\x06\xd0\xff\xff\xff'},
+ # DATE is serialized as 4-byte little endian (number of days since 1970-01-01)
+ {'key': 6, 'value': '\x93\xfe\xff\xff'},
+ # Unlike other numerical values, DECIMAL is serialized as big-endian.
+ {'key': 7, 'value': '\xd8\xf0'}]
+ assert datafiles[1]['upper_bounds'] == \
+ [{'key': 1, 'value': 'ghij'},
+ # INT is serialized as 4-byte little endian
+ {'key': 2, 'value': '\x03\x00\x00\x00'},
+ # BOOLEAN is serialized as 0x01 for TRUE
+ {'key': 3, 'value': '\x01'},
+ # BIGINT is serialized as 8-byte little endian
+ {'key': 4, 'value': '\x81\x58\xc2\x97\x56\xd5\x00\x00'},
+ # TIMESTAMP is serialized as 8-byte little endian (number of microseconds since
+ # 1970-01-01 00:00:00)
+ {'key': 5, 'value': '\x80\x02\x86\xef\x2f\x00\x00\x00'},
+ # DATE is serialized as 4-byte little endian (number of days since 1970-01-01)
+ {'key': 6, 'value': '\x6d\x01\x00\x00'},
+ # Unlike other numerical values, DECIMAL is serialized as big-endian.
+ {'key': 7, 'value': '\x00\xdc\x14'}]
+
+ def test_using_upper_lower_bound_metrics(self, vector, unique_database):
+ self.run_test_case('QueryTest/iceberg-upper-lower-bound-metrics', vector,
+ use_db=unique_database)