You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by wz...@apache.org on 2021/07/21 00:12:49 UTC

[impala] 02/02: IMPALA-10627: Use standard parquet-related Iceberg table properties

This is an automated email from the ASF dual-hosted git repository.

wzhou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit fabe994d1fb011afb88d1f0f5bf078113775c9db
Author: Attila Jeges <at...@cloudera.com>
AuthorDate: Thu Jun 3 16:59:34 2021 +0200

    IMPALA-10627: Use standard parquet-related Iceberg table properties
    
    This patch adds support for the following standard Iceberg properties:
    
    write.parquet.compression-codec:
      Parquet compression codec. Supported values are: NONE, GZIP, SNAPPY
      (default value), LZ4, ZSTD. The table property will be ignored if
      COMPRESSION_CODEC query option is set.
    
    write.parquet.compression-level:
      Parquet compression level. Used with ZSTD compression only.
      Supported range is [1, 22]. Default value is 3. The table property
      will be ignored if COMPRESSION_CODEC query option is set.
    
    write.parquet.row-group-size-bytes :
      Parquet row group size in bytes. Supported range is [8388608,
      2146435072] (8MB - 2047MB). The table property will be ignored if
      PARQUET_FILE_SIZE query option is set.
      If neither the table property nor the PARQUET_FILE_SIZE query option
      is set, the way Impala calculates row group size will remain
      unchanged.
    
    write.parquet.page-size-bytes:
      Parquet page size in bytes. Used for PLAIN encoding. Supported range
      is [65536, 1073741824] (64KB - 1GB).
      If the table property is unset, the way Impala calculates page size
      will remain unchanged.
    
    write.parquet.dict-size-bytes:
      Parquet dictionary page size in bytes. Used for dictionary encoding.
      Supported range is [65536, 1073741824] (64KB - 1GB).
      If the table property is unset, the way Impala calculates dictionary
      page size will remain unchanged.
    
    This patch also renames 'iceberg.file_format' table property to
    'write.format.default' which is the standard Iceberg name for the
    table property.
    
    Change-Id: I3b8aa9a52c13c41b48310d2f7c9c7426e1ff5f23
    Reviewed-on: http://gerrit.cloudera.org:8080/17654
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/exec/parquet/hdfs-parquet-table-writer.cc   | 124 ++++++++----
 be/src/exec/parquet/hdfs-parquet-table-writer.h    |  25 ++-
 be/src/runtime/descriptors.cc                      |   4 +
 be/src/runtime/descriptors.h                       |  17 ++
 common/thrift/CatalogObjects.thrift                |  11 +-
 .../analysis/AlterTableSetTblProperties.java       |  30 +++
 .../apache/impala/analysis/CreateTableStmt.java    |  65 ++++++-
 .../org/apache/impala/catalog/FeIcebergTable.java  |  97 ++++++++-
 .../org/apache/impala/catalog/IcebergTable.java    |  85 +++++++-
 .../impala/catalog/iceberg/IcebergCtasTarget.java  |  29 +++
 .../impala/catalog/local/LocalIcebergTable.java    |  29 +++
 .../java/org/apache/impala/util/IcebergUtil.java   | 141 ++++++++++++++
 .../functional/functional_schema_template.sql      |  15 +-
 .../queries/QueryTest/iceberg-alter.test           |  52 ++++-
 .../queries/QueryTest/iceberg-catalogs.test        |  10 +-
 .../queries/QueryTest/iceberg-create.test          |   8 +-
 .../queries/QueryTest/iceberg-insert.test          | 134 +++++++++++++
 .../queries/QueryTest/iceberg-negative.test        | 216 ++++++++++++++++++++-
 .../queries/QueryTest/iceberg-query.test           |  10 +-
 .../queries/QueryTest/show-create-table.test       | 160 +++++++++++----
 20 files changed, 1147 insertions(+), 115 deletions(-)

diff --git a/be/src/exec/parquet/hdfs-parquet-table-writer.cc b/be/src/exec/parquet/hdfs-parquet-table-writer.cc
index 01ab616..718609d 100644
--- a/be/src/exec/parquet/hdfs-parquet-table-writer.cc
+++ b/be/src/exec/parquet/hdfs-parquet-table-writer.cc
@@ -124,7 +124,7 @@ class HdfsParquetTableWriter::BaseColumnWriter {
     : parent_(parent),
       expr_eval_(expr_eval),
       codec_info_(codec_info),
-      page_size_(DEFAULT_DATA_PAGE_SIZE),
+      plain_page_size_(parent->default_plain_page_size()),
       current_page_(nullptr),
       num_values_(0),
       total_compressed_byte_size_(0),
@@ -356,12 +356,14 @@ class HdfsParquetTableWriter::BaseColumnWriter {
   // compressed.
   scoped_ptr<Codec> compressor_;
 
-  // Size of newly created pages. Defaults to DEFAULT_DATA_PAGE_SIZE and is increased
-  // when pages are not big enough. This only happens when there are enough unique values
-  // such that we switch from PLAIN_DICTIONARY/RLE_DICTIONARY to PLAIN encoding and then
-  // have very large values (i.e. greater than DEFAULT_DATA_PAGE_SIZE).
+  // Size of newly created PLAIN encoded pages. Defaults to DEFAULT_DATA_PAGE_SIZE or to
+  // the value of 'write.parquet.page-size-bytes' table property for Iceberg tables.
+  // Its value is increased when pages are not big enough. This only happens when there
+  // are enough unique values such that we switch from PLAIN_DICTIONARY/RLE_DICTIONARY to
+  // PLAIN encoding and then have very large values (i.e. greater than
+  // DEFAULT_DATA_PAGE_SIZE).
   // TODO: Consider removing and only creating a single large page as necessary.
-  int64_t page_size_;
+  int64_t plain_page_size_;
 
   // Pages belong to this column chunk. We need to keep them in memory in order to write
   // them together.
@@ -506,7 +508,9 @@ class HdfsParquetTableWriter::ColumnWriter :
       if (UNLIKELY(num_values_since_dict_size_check_ >=
                    DICTIONARY_DATA_PAGE_SIZE_CHECK_PERIOD)) {
         num_values_since_dict_size_check_ = 0;
-        if (dict_encoder_->EstimatedDataEncodedSize() >= page_size_) return false;
+        if (dict_encoder_->EstimatedDataEncodedSize() >= parent_->dict_page_size()) {
+          return false;
+        }
       }
       ++num_values_since_dict_size_check_;
       *bytes_needed = dict_encoder_->Put(*val);
@@ -522,7 +526,8 @@ class HdfsParquetTableWriter::ColumnWriter :
       *bytes_needed = plain_encoded_value_size_ < 0 ?
           ParquetPlainEncoder::ByteSize<T>(*val) :
           plain_encoded_value_size_;
-      if (current_page_->header.uncompressed_page_size + *bytes_needed > page_size_) {
+      if (current_page_->header.uncompressed_page_size + *bytes_needed >
+          plain_page_size_) {
         return false;
       }
       uint8_t* dst_ptr = values_buffer_ + current_page_->header.uncompressed_page_size;
@@ -917,7 +922,7 @@ inline Status HdfsParquetTableWriter::BaseColumnWriter::AppendRow(TupleRow* row)
 
     // Check how much space is needed to write this value. If that is larger than the
     // page size then increase page size and try again.
-    if (UNLIKELY(bytes_needed > page_size_)) {
+    if (UNLIKELY(bytes_needed > plain_page_size_)) {
       if (bytes_needed > MAX_DATA_PAGE_SIZE) {
         stringstream ss;
         ss << "Cannot write value of size "
@@ -926,8 +931,8 @@ inline Status HdfsParquetTableWriter::BaseColumnWriter::AppendRow(TupleRow* row)
            << PrettyPrinter::Print(MAX_DATA_PAGE_SIZE , TUnit::BYTES) << ".";
         return Status(ss.str());
       }
-      page_size_ = bytes_needed;
-      values_buffer_len_ = page_size_;
+      plain_page_size_ = bytes_needed;
+      values_buffer_len_ = plain_page_size_;
       values_buffer_ = parent_->reusable_col_mem_pool_->Allocate(values_buffer_len_);
     }
     NewPage();
@@ -1205,25 +1210,72 @@ HdfsParquetTableWriter::HdfsParquetTableWriter(HdfsTableSink* parent, RuntimeSta
     file_size_limit_(0),
     reusable_col_mem_pool_(new MemPool(parent_->mem_tracker())),
     per_file_mem_pool_(new MemPool(parent_->mem_tracker())),
-    row_idx_(0) {
+    row_idx_(0),
+    default_block_size_(0),
+    default_plain_page_size_(0),
+    dict_page_size_(0) {
   is_iceberg_file_ = table_desc->IsIcebergTable();
 }
 
 HdfsParquetTableWriter::~HdfsParquetTableWriter() {
 }
 
-void HdfsParquetTableWriter::ConfigureTimestampType() {
-  if (is_iceberg_file_) {
-    // The Iceberg spec states that timestamps are stored as INT64 micros.
-    timestamp_type_ = TParquetTimestampType::INT64_MICROS;
-    return;
-  }
+void HdfsParquetTableWriter::Configure() {
+  DCHECK(!is_iceberg_file_);
+
   timestamp_type_ = state_->query_options().parquet_timestamp_type;
+
+  string_utf8_ = state_->query_options().parquet_annotate_strings_utf8;
+
+  if (state_->query_options().__isset.parquet_file_size &&
+      state_->query_options().parquet_file_size > 0) {
+    // If the user specified a value explicitly, use it. InitNewFile() will verify that
+    // the actual file's block size is sufficient.
+    default_block_size_ = state_->query_options().parquet_file_size;
+  } else {
+    default_block_size_ = HDFS_BLOCK_SIZE;
+    // Blocks are usually HDFS_BLOCK_SIZE bytes, unless there are many columns, in
+    // which case a per-column minimum kicks in.
+    default_block_size_ = max(default_block_size_, MinBlockSize(columns_.size()));
+  }
+  // HDFS does not like block sizes that are not aligned
+  default_block_size_ = BitUtil::RoundUp(default_block_size_, HDFS_BLOCK_ALIGNMENT);
+
+  default_plain_page_size_ = DEFAULT_DATA_PAGE_SIZE;
+  dict_page_size_ = DEFAULT_DATA_PAGE_SIZE;
 }
 
-void HdfsParquetTableWriter::ConfigureStringType() {
-  string_utf8_ = is_iceberg_file_ ||
-                 state_->query_options().parquet_annotate_strings_utf8;
+void HdfsParquetTableWriter::ConfigureForIceberg() {
+  DCHECK(is_iceberg_file_);
+
+  // The Iceberg spec states that timestamps are stored as INT64 micros.
+  timestamp_type_ = TParquetTimestampType::INT64_MICROS;
+
+  string_utf8_ = true;
+
+  if (state_->query_options().__isset.parquet_file_size &&
+      state_->query_options().parquet_file_size > 0) {
+    // If the user specified a value explicitly, use it. InitNewFile() will verify that
+    // the actual file's block size is sufficient.
+    default_block_size_ = state_->query_options().parquet_file_size;
+  } else if (table_desc_->IcebergParquetRowGroupSize() > 0) {
+    // If the user specified a value explicitly, use it. InitNewFile() will verify that
+    // the actual file's block size is sufficient.
+    default_block_size_ = table_desc_->IcebergParquetRowGroupSize();
+  } else {
+    default_block_size_ = HDFS_BLOCK_SIZE;
+    // Blocks are usually HDFS_BLOCK_SIZE bytes, unless there are many columns, in
+    // which case a per-column minimum kicks in.
+    default_block_size_ = max(default_block_size_, MinBlockSize(columns_.size()));
+  }
+  // HDFS does not like block sizes that are not aligned
+  default_block_size_ = BitUtil::RoundUp(default_block_size_, HDFS_BLOCK_ALIGNMENT);
+
+  default_plain_page_size_ = table_desc_->IcebergParquetPlainPageSize();
+  if (default_plain_page_size_ <= 0) default_plain_page_size_ = DEFAULT_DATA_PAGE_SIZE;
+
+  dict_page_size_ = table_desc_->IcebergParquetDictPageSize();
+  if (dict_page_size_ <= 0) dict_page_size_ = DEFAULT_DATA_PAGE_SIZE;
 }
 
 Status HdfsParquetTableWriter::Init() {
@@ -1243,6 +1295,12 @@ Status HdfsParquetTableWriter::Init() {
   if (query_options.__isset.compression_codec) {
     codec = query_options.compression_codec.codec;
     clevel = query_options.compression_codec.compression_level;
+  } else if (table_desc_->IsIcebergTable()) {
+    TCompressionCodec compression_codec = table_desc_->IcebergParquetCompressionCodec();
+    codec = compression_codec.codec;
+    if (compression_codec.__isset.compression_level) {
+      clevel = compression_codec.compression_level;
+    }
   }
 
   if (!(codec == THdfsCompression::NONE ||
@@ -1290,8 +1348,11 @@ Status HdfsParquetTableWriter::Init() {
 
   Codec::CodecInfo codec_info(codec, clevel);
 
-  ConfigureTimestampType();
-  ConfigureStringType();
+  if (is_iceberg_file_) {
+    ConfigureForIceberg();
+  } else {
+    Configure();
+  }
 
   columns_.resize(num_cols);
   // Initialize each column structure.
@@ -1439,23 +1500,6 @@ int64_t HdfsParquetTableWriter::MinBlockSize(int64_t num_file_cols) const {
   return 3 * DEFAULT_DATA_PAGE_SIZE * num_file_cols;
 }
 
-uint64_t HdfsParquetTableWriter::default_block_size() const {
-  int64_t block_size;
-  if (state_->query_options().__isset.parquet_file_size &&
-      state_->query_options().parquet_file_size > 0) {
-    // If the user specified a value explicitly, use it. InitNewFile() will verify that
-    // the actual file's block size is sufficient.
-    block_size = state_->query_options().parquet_file_size;
-  } else {
-    block_size = HDFS_BLOCK_SIZE;
-    // Blocks are usually HDFS_BLOCK_SIZE bytes, unless there are many columns, in
-    // which case a per-column minimum kicks in.
-    block_size = max(block_size, MinBlockSize(columns_.size()));
-  }
-  // HDFS does not like block sizes that are not aligned
-  return BitUtil::RoundUp(block_size, HDFS_BLOCK_ALIGNMENT);
-}
-
 Status HdfsParquetTableWriter::InitNewFile() {
   DCHECK(current_row_group_ == nullptr);
 
diff --git a/be/src/exec/parquet/hdfs-parquet-table-writer.h b/be/src/exec/parquet/hdfs-parquet-table-writer.h
index cdc6c08..ca6d7a3 100644
--- a/be/src/exec/parquet/hdfs-parquet-table-writer.h
+++ b/be/src/exec/parquet/hdfs-parquet-table-writer.h
@@ -76,11 +76,13 @@ class HdfsParquetTableWriter : public HdfsTableWriter {
   virtual void Close() override;
 
   /// Returns the target HDFS block size to use.
-  virtual uint64_t default_block_size() const override;
+  virtual uint64_t default_block_size() const override { return default_block_size_; }
 
   virtual std::string file_extension() const override { return "parq"; }
 
   int32_t page_row_count_limit() const { return page_row_count_limit_; }
+  int64_t default_plain_page_size() const { return default_plain_page_size_; }
+  int64_t dict_page_size() const { return dict_page_size_; }
 
  private:
   /// Default data page size. In bytes.
@@ -162,11 +164,17 @@ class HdfsParquetTableWriter : public HdfsTableWriter {
   /// new row group.  current_row_group_ will be flushed.
   Status AddRowGroup();
 
+  /// Configures writer for non-Iceberg tables:
   /// Selects the Parquet timestamp type to be used by this writer.
-  void ConfigureTimestampType();
-
   /// Sets 'string_utf8_' based on query options and table type.
-  void ConfigureStringType();
+  /// Sets 'default_block_size_', 'default_plain_page_size_' and 'dict_page_size_'.
+  void Configure();
+
+  /// Configures writer for Iceberg tables:
+  /// Selects the Parquet timestamp type to be used by this writer.
+  /// Sets 'string_utf8_' to true.
+  /// Sets 'default_block_size_', 'default_plain_page_size_' and 'dict_page_size_'.
+  void ConfigureForIceberg();
 
   /// Updates output partition with some summary about the written file.
   void FinalizePartitionInfo();
@@ -236,6 +244,15 @@ class HdfsParquetTableWriter : public HdfsTableWriter {
 
   /// If true, STRING values are annotated with UTF8 in Parquet metadata.
   bool string_utf8_ = false;
+
+  // File block size, set in Configure() or ConfigureForIceberg().
+  int64_t default_block_size_;
+
+  // Default plain page size, set in Configure() or ConfigureForIceberg().
+  int64_t default_plain_page_size_;
+
+  // Dictionary page size, set in Configure() or ConfigureForIceberg().
+  int64_t dict_page_size_;
 };
 
 }
diff --git a/be/src/runtime/descriptors.cc b/be/src/runtime/descriptors.cc
index f29423c..607a675 100644
--- a/be/src/runtime/descriptors.cc
+++ b/be/src/runtime/descriptors.cc
@@ -253,6 +253,10 @@ HdfsTableDescriptor::HdfsTableDescriptor(const TTableDescriptor& tdesc, ObjectPo
     for (const TIcebergPartitionField& spec_field : spec.partition_fields) {
       iceberg_partition_names_.push_back(spec_field.field_name);
     }
+    iceberg_parquet_compression_codec_ = tdesc.icebergTable.parquet_compression_codec;
+    iceberg_parquet_row_group_size_ = tdesc.icebergTable.parquet_row_group_size;
+    iceberg_parquet_plain_page_size_ = tdesc.icebergTable.parquet_plain_page_size;
+    iceberg_parquet_dict_page_size_ = tdesc.icebergTable.parquet_dict_page_size;
   }
 }
 
diff --git a/be/src/runtime/descriptors.h b/be/src/runtime/descriptors.h
index 285fe25..4cbaacb 100644
--- a/be/src/runtime/descriptors.h
+++ b/be/src/runtime/descriptors.h
@@ -343,6 +343,19 @@ class HdfsTableDescriptor : public TableDescriptor {
   const std::vector<std::string>& IcebergPartitionNames() const {
     return iceberg_partition_names_;
   }
+  const TCompressionCodec& IcebergParquetCompressionCodec() const {
+    return iceberg_parquet_compression_codec_;
+  }
+  int64_t IcebergParquetRowGroupSize() const {
+    return iceberg_parquet_row_group_size_;
+  }
+
+  int64_t IcebergParquetPlainPageSize() const {
+    return iceberg_parquet_plain_page_size_;
+  }
+  int64_t IcebergParquetDictPageSize() const {
+    return iceberg_parquet_dict_page_size_;
+  }
 
   virtual std::string DebugString() const;
 
@@ -360,6 +373,10 @@ class HdfsTableDescriptor : public TableDescriptor {
   bool is_iceberg_ = false;
   std::string iceberg_table_location_;
   std::vector<std::string> iceberg_partition_names_;
+  TCompressionCodec iceberg_parquet_compression_codec_;
+  int64_t iceberg_parquet_row_group_size_;
+  int64_t iceberg_parquet_plain_page_size_;
+  int64_t iceberg_parquet_dict_page_size_;
 };
 
 class HBaseTableDescriptor : public TableDescriptor {
diff --git a/common/thrift/CatalogObjects.thrift b/common/thrift/CatalogObjects.thrift
index 6f3a07a..c708e17 100644
--- a/common/thrift/CatalogObjects.thrift
+++ b/common/thrift/CatalogObjects.thrift
@@ -88,7 +88,7 @@ enum THdfsCompression {
   LZ4_BLOCKED = 12
 }
 
-// Iceberg table file format identitied by table property 'iceberg.file_format'
+// Iceberg table file format identitied by table property 'write.format.default'
 enum TIcebergFileFormat {
   PARQUET = 0
   ORC = 1
@@ -575,6 +575,15 @@ struct TIcebergTable {
   4: optional map<string,THdfsFileDesc> path_hash_to_file_descriptor
   // Iceberg snapshot id of the table
   5: optional i64 snapshot_id
+  // Iceberg 'write.parquet.compression-codec' and 'write.parquet.compression-level' table
+  // properties
+  6: optional TCompressionCodec parquet_compression_codec
+  // Iceberg 'write.parquet.row-group-size-bytes' table property
+  7: optional i64 parquet_row_group_size
+  // Iceberg 'write.parquet.page-size-bytes' and 'write.parquet.dict-size-bytes' table
+  // properties
+  8: optional i64 parquet_plain_page_size;
+  9: optional i64 parquet_dict_page_size;
 }
 
 // Represents a table or view.
diff --git a/fe/src/main/java/org/apache/impala/analysis/AlterTableSetTblProperties.java b/fe/src/main/java/org/apache/impala/analysis/AlterTableSetTblProperties.java
index 3091188..731ce7e 100644
--- a/fe/src/main/java/org/apache/impala/analysis/AlterTableSetTblProperties.java
+++ b/fe/src/main/java/org/apache/impala/analysis/AlterTableSetTblProperties.java
@@ -36,6 +36,7 @@ import org.apache.impala.catalog.KuduTable;
 import org.apache.impala.catalog.TableLoadingException;
 import org.apache.impala.common.AnalysisException;
 import org.apache.impala.common.Pair;
+import org.apache.impala.thrift.TCompressionCodec;
 import org.apache.impala.thrift.TAlterTableParams;
 import org.apache.impala.thrift.TAlterTableSetTblPropertiesParams;
 import org.apache.impala.thrift.TAlterTableType;
@@ -50,6 +51,7 @@ import com.google.common.base.Preconditions;
 import com.google.common.base.Splitter;
 import com.google.common.base.Strings;
 import com.google.common.collect.Lists;
+import com.google.common.primitives.Longs;
 
 /**
 * Represents an ALTER TABLE SET [PARTITION ('k1'='a', 'k2'='b'...)]
@@ -157,6 +159,11 @@ public class AlterTableSetTblProperties extends AlterTableSetStmt {
     if (tblProperties_.containsKey(IcebergTable.ICEBERG_FILE_FORMAT)) {
       icebergTableFormatCheck(tblProperties_.get(IcebergTable.ICEBERG_FILE_FORMAT));
     }
+    icebergParquetCompressionCodecCheck();
+    icebergParquetRowGroupSizeCheck();
+    icebergParquetPageSizeCheck(IcebergTable.PARQUET_PLAIN_PAGE_SIZE, "page size");
+    icebergParquetPageSizeCheck(IcebergTable.PARQUET_DICT_PAGE_SIZE,
+        "dictionary page size");
   }
 
   private void icebergPropertyCheck(String property) throws AnalysisException {
@@ -196,6 +203,29 @@ public class AlterTableSetTblProperties extends AlterTableSetStmt {
     }
   }
 
+  private void icebergParquetCompressionCodecCheck() throws AnalysisException {
+    StringBuilder errMsg = new StringBuilder();
+    if (IcebergUtil.parseParquetCompressionCodec(false, tblProperties_, errMsg) == null) {
+      throw new AnalysisException(errMsg.toString());
+    }
+  }
+
+  private void icebergParquetRowGroupSizeCheck() throws AnalysisException {
+    StringBuilder errMsg = new StringBuilder();
+    if (IcebergUtil.parseParquetRowGroupSize(tblProperties_, errMsg) == null) {
+      throw new AnalysisException(errMsg.toString());
+    }
+  }
+
+  private void icebergParquetPageSizeCheck(String property, String descr)
+      throws AnalysisException {
+    StringBuilder errMsg = new StringBuilder();
+    if (IcebergUtil.parseParquetPageSize(getTblProperties(), property, descr,
+        errMsg) == null) {
+      throw new AnalysisException(errMsg.toString());
+    }
+  }
+
   /**
    * Check that Avro schema provided in avro.schema.url or avro.schema.literal is valid
    * Json and contains only supported Impala types. If both properties are set, then
diff --git a/fe/src/main/java/org/apache/impala/analysis/CreateTableStmt.java b/fe/src/main/java/org/apache/impala/analysis/CreateTableStmt.java
index 0edce29..89c0f0b 100644
--- a/fe/src/main/java/org/apache/impala/analysis/CreateTableStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/CreateTableStmt.java
@@ -35,9 +35,12 @@ import org.apache.impala.common.AnalysisException;
 import org.apache.impala.common.ImpalaRuntimeException;
 import org.apache.impala.common.RuntimeEnv;
 import org.apache.impala.service.BackendConfig;
+import org.apache.impala.thrift.TCompressionCodec;
 import org.apache.impala.thrift.TCreateTableParams;
+import org.apache.impala.thrift.THdfsCompression;
 import org.apache.impala.thrift.THdfsFileFormat;
 import org.apache.impala.thrift.TIcebergCatalog;
+import org.apache.impala.thrift.TIcebergFileFormat;
 import org.apache.impala.thrift.TIcebergPartitionTransformType;
 import org.apache.impala.thrift.TSortingOrder;
 import org.apache.impala.thrift.TTableName;
@@ -54,6 +57,7 @@ import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
 import com.google.common.collect.Maps;
 import com.google.common.primitives.Ints;
+import com.google.common.primitives.Longs;
 
 /**
  * Represents a CREATE TABLE statement.
@@ -626,13 +630,21 @@ public class CreateTableStmt extends StatementBase {
         IcebergTable.ICEBERG_STORAGE_HANDLER);
 
     String fileformat = getTblProperties().get(IcebergTable.ICEBERG_FILE_FORMAT);
-    if (fileformat != null && IcebergUtil.getIcebergFileFormat(fileformat) == null) {
+    TIcebergFileFormat icebergFileFormat = IcebergUtil.getIcebergFileFormat(fileformat);
+    if (fileformat != null && icebergFileFormat == null) {
       throw new AnalysisException("Invalid fileformat for Iceberg table: " + fileformat);
     }
     if (fileformat == null || fileformat.isEmpty()) {
       putGeneratedProperty(IcebergTable.ICEBERG_FILE_FORMAT, "parquet");
     }
 
+    validateIcebergParquetCompressionCodec(icebergFileFormat);
+    validateIcebergParquetRowGroupSize(icebergFileFormat);
+    validateIcebergParquetPageSize(icebergFileFormat,
+        IcebergTable.PARQUET_PLAIN_PAGE_SIZE, "page size");
+    validateIcebergParquetPageSize(icebergFileFormat,
+        IcebergTable.PARQUET_DICT_PAGE_SIZE, "dictionary page size");
+
     // Determine the Iceberg catalog being used. The default catalog is HiveCatalog.
     String catalogStr = getTblProperties().get(IcebergTable.ICEBERG_CATALOG);
     TIcebergCatalog catalog;
@@ -644,6 +656,57 @@ public class CreateTableStmt extends StatementBase {
     validateIcebergTableProperties(catalog);
   }
 
+  private void validateIcebergParquetCompressionCodec(
+      TIcebergFileFormat icebergFileFormat) throws AnalysisException {
+    if (icebergFileFormat != TIcebergFileFormat.PARQUET) {
+      if (getTblProperties().containsKey(IcebergTable.PARQUET_COMPRESSION_CODEC)) {
+          throw new AnalysisException(IcebergTable.PARQUET_COMPRESSION_CODEC +
+              " should be set only for parquet file format");
+      }
+      if (getTblProperties().containsKey(IcebergTable.PARQUET_COMPRESSION_LEVEL)) {
+          throw new AnalysisException(IcebergTable.PARQUET_COMPRESSION_LEVEL +
+              " should be set only for parquet file format");
+      }
+    } else {
+      StringBuilder errMsg = new StringBuilder();
+      if (IcebergUtil.parseParquetCompressionCodec(true, getTblProperties(), errMsg)
+          == null) {
+        throw new AnalysisException(errMsg.toString());
+      }
+    }
+  }
+
+  private void validateIcebergParquetRowGroupSize(TIcebergFileFormat icebergFileFormat)
+      throws AnalysisException {
+    if (getTblProperties().containsKey(IcebergTable.PARQUET_ROW_GROUP_SIZE)) {
+      if (icebergFileFormat != TIcebergFileFormat.PARQUET) {
+        throw new AnalysisException(IcebergTable.PARQUET_ROW_GROUP_SIZE +
+            " should be set only for parquet file format");
+      }
+    }
+
+    StringBuilder errMsg = new StringBuilder();
+    if (IcebergUtil.parseParquetRowGroupSize(getTblProperties(), errMsg) == null) {
+      throw new AnalysisException(errMsg.toString());
+    }
+  }
+
+  private void validateIcebergParquetPageSize(TIcebergFileFormat icebergFileFormat,
+      String pageSizeTblProp, String descr) throws AnalysisException {
+    if (getTblProperties().containsKey(pageSizeTblProp)) {
+      if (icebergFileFormat != TIcebergFileFormat.PARQUET) {
+        throw new AnalysisException(pageSizeTblProp +
+            " should be set only for parquet file format");
+      }
+    }
+
+    StringBuilder errMsg = new StringBuilder();
+    if (IcebergUtil.parseParquetPageSize(getTblProperties(), pageSizeTblProp, descr,
+        errMsg) == null) {
+      throw new AnalysisException(errMsg.toString());
+    }
+  }
+
   private void validateIcebergTableProperties(TIcebergCatalog catalog)
       throws AnalysisException {
     // Metadata location is only used by HiveCatalog, but we shouldn't allow setting this
diff --git a/fe/src/main/java/org/apache/impala/catalog/FeIcebergTable.java b/fe/src/main/java/org/apache/impala/catalog/FeIcebergTable.java
index 9191f91..d05a4fc 100644
--- a/fe/src/main/java/org/apache/impala/catalog/FeIcebergTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/FeIcebergTable.java
@@ -43,6 +43,8 @@ import org.apache.impala.common.FileSystemUtil;
 import org.apache.impala.common.Reference;
 import org.apache.impala.compat.HdfsShim;
 import org.apache.impala.thrift.TColumn;
+import org.apache.impala.thrift.TCompressionCodec;
+import org.apache.impala.thrift.THdfsCompression;
 import org.apache.impala.thrift.THdfsTable;
 import org.apache.impala.thrift.THdfsPartition;
 import org.apache.impala.thrift.TIcebergCatalog;
@@ -56,6 +58,7 @@ import org.apache.impala.util.ListMap;
 import org.apache.impala.util.TResultRowBuilder;
 
 import com.google.common.base.Preconditions;
+import com.google.common.primitives.Ints;
 
 /**
  * Frontend interface for interacting with an Iceberg-backed table.
@@ -90,6 +93,26 @@ public interface FeIcebergTable extends FeFsTable {
   TIcebergFileFormat getIcebergFileFormat();
 
   /**
+   * Return iceberg parquet compression codec from table properties
+   */
+  TCompressionCodec getIcebergParquetCompressionCodec();
+
+  /**
+   * Return iceberg parquet row group size in bytes from table properties
+   */
+  long getIcebergParquetRowGroupSize();
+
+  /**
+   * Return iceberg parquet plain page size in bytes from table properties
+   */
+  long getIcebergParquetPlainPageSize();
+
+  /**
+   * Return iceberg parquet dictionary page size in bytes from table properties
+   */
+  long getIcebergParquetDictPageSize();
+
+  /**
    * Return the table location of Iceberg table
    * When using 'hadoop.tables', this value is a normal table location
    * When using 'hadoop.catalog', this value is 'iceberg.catalog_location' + identifier
@@ -298,11 +321,73 @@ public interface FeIcebergTable extends FeFsTable {
      */
     public static TIcebergFileFormat getIcebergFileFormat(
         org.apache.hadoop.hive.metastore.api.Table msTable) {
-      TIcebergFileFormat fileFormat = IcebergUtil.getIcebergFileFormat(
-          msTable.getParameters().get(IcebergTable.ICEBERG_FILE_FORMAT));
+      TIcebergFileFormat fileFormat = null;
+      Map<String, String> params = msTable.getParameters();
+      if (params.containsKey(IcebergTable.ICEBERG_FILE_FORMAT)) {
+        fileFormat = IcebergUtil.getIcebergFileFormat(
+            params.get(IcebergTable.ICEBERG_FILE_FORMAT));
+      } else {
+        // Accept "iceberg.file_format" for backward compatibility.
+        fileFormat = IcebergUtil.getIcebergFileFormat(params.get("iceberg.file_format"));
+      }
       return fileFormat == null ? TIcebergFileFormat.PARQUET : fileFormat;
     }
 
+    /**
+     * Get iceberg parquet compression codec from hms table properties
+     */
+    public static TCompressionCodec getIcebergParquetCompressionCodec(
+        org.apache.hadoop.hive.metastore.api.Table msTable) {
+      THdfsCompression codec = IcebergUtil.getIcebergParquetCompressionCodec(
+          msTable.getParameters().get(IcebergTable.PARQUET_COMPRESSION_CODEC));
+      if (codec == null) codec = IcebergTable.DEFAULT_PARQUET_COMPRESSION_CODEC;
+      TCompressionCodec compression = new TCompressionCodec(codec);
+
+      // Compression level is interesting only if ZSTD codec is used.
+      if (codec == THdfsCompression.ZSTD) {
+        int clevel = IcebergTable.DEFAULT_PARQUET_ZSTD_COMPRESSION_LEVEL;
+
+        String clevelTblProp = msTable.getParameters().get(
+            IcebergTable.PARQUET_COMPRESSION_LEVEL);
+        if (clevelTblProp != null) {
+          Integer cl = Ints.tryParse(clevelTblProp);
+          if (cl != null && cl >= IcebergTable.MIN_PARQUET_COMPRESSION_LEVEL &&
+              cl <= IcebergTable.MAX_PARQUET_COMPRESSION_LEVEL) {
+            clevel = cl;
+          }
+        }
+        compression.setCompression_level(clevel);
+      }
+
+      return compression;
+    }
+
+    /**
+     * Get iceberg parquet row group size from hms table properties
+     */
+    public static long getIcebergParquetRowGroupSize(
+        org.apache.hadoop.hive.metastore.api.Table msTable) {
+      return IcebergUtil.getIcebergParquetRowGroupSize(
+          msTable.getParameters().get(IcebergTable.PARQUET_ROW_GROUP_SIZE));
+    }
+
+    /**
+     * Get iceberg parquet plain page size from hms table properties
+     */
+    public static long getIcebergParquetPlainPageSize(
+        org.apache.hadoop.hive.metastore.api.Table msTable) {
+      return IcebergUtil.getIcebergParquetPageSize(
+          msTable.getParameters().get(IcebergTable.PARQUET_PLAIN_PAGE_SIZE));
+    }
+
+    /**
+     * Get iceberg parquet dictionary page size from hms table properties
+     */
+    public static long getIcebergParquetDictPageSize(
+        org.apache.hadoop.hive.metastore.api.Table msTable) {
+      return IcebergUtil.getIcebergParquetPageSize(
+          msTable.getParameters().get(IcebergTable.PARQUET_DICT_PAGE_SIZE));
+    }
 
     public static TIcebergTable getTIcebergTable(FeIcebergTable icebergTable) {
       TIcebergTable tIcebergTable = new TIcebergTable();
@@ -320,6 +405,14 @@ public interface FeIcebergTable extends FeFsTable {
           entry.getValue().toThrift());
       }
       tIcebergTable.setSnapshot_id(icebergTable.snapshotId());
+      tIcebergTable.setParquet_compression_codec(
+          icebergTable.getIcebergParquetCompressionCodec());
+      tIcebergTable.setParquet_row_group_size(
+          icebergTable.getIcebergParquetRowGroupSize());
+      tIcebergTable.setParquet_plain_page_size(
+          icebergTable.getIcebergParquetPlainPageSize());
+      tIcebergTable.setParquet_dict_page_size(
+          icebergTable.getIcebergParquetDictPageSize());
       return tIcebergTable;
     }
 
diff --git a/fe/src/main/java/org/apache/impala/catalog/IcebergTable.java b/fe/src/main/java/org/apache/impala/catalog/IcebergTable.java
index dca9400..541690f 100644
--- a/fe/src/main/java/org/apache/impala/catalog/IcebergTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/IcebergTable.java
@@ -35,6 +35,8 @@ import org.apache.impala.analysis.IcebergPartitionSpec;
 import org.apache.impala.analysis.IcebergPartitionTransform;
 import org.apache.impala.catalog.HdfsPartition.FileDescriptor;
 import org.apache.impala.thrift.TCatalogObjectType;
+import org.apache.impala.thrift.TCompressionCodec;
+import org.apache.impala.thrift.THdfsCompression;
 import org.apache.impala.thrift.THdfsFileDesc;
 import org.apache.impala.thrift.THdfsTable;
 import org.apache.impala.thrift.TIcebergCatalog;
@@ -68,7 +70,7 @@ public class IcebergTable extends Table implements FeIcebergTable {
       "org.apache.iceberg.mr.hive.HiveIcebergStorageHandler";
 
   // Iceberg file format key in tblproperties
-  public static final String ICEBERG_FILE_FORMAT = "iceberg.file_format";
+  public static final String ICEBERG_FILE_FORMAT = "write.format.default";
 
   // Iceberg catalog type key in tblproperties
   public static final String ICEBERG_CATALOG = "iceberg.catalog";
@@ -86,12 +88,61 @@ public class IcebergTable extends Table implements FeIcebergTable {
   // table metadata. This property is only valid for tables in 'hive.catalog'.
   public static final String METADATA_LOCATION = "metadata_location";
 
+  // Parquet compression codec and compression level table properties.
+  public static final String PARQUET_COMPRESSION_CODEC =
+      "write.parquet.compression-codec";
+  public static final String PARQUET_COMPRESSION_LEVEL =
+      "write.parquet.compression-level";
+
+  // Default values for parquet compression codec.
+  public static final THdfsCompression DEFAULT_PARQUET_COMPRESSION_CODEC =
+      THdfsCompression.SNAPPY;
+  // Default values for parquet compression level (used with ZSTD codec).
+  public static final int DEFAULT_PARQUET_ZSTD_COMPRESSION_LEVEL = 3;
+  // Valid range for parquet compression level.
+  public static final int MIN_PARQUET_COMPRESSION_LEVEL = 1;
+  public static final int MAX_PARQUET_COMPRESSION_LEVEL = 22;
+
+  // Parquet row group size table property.
+  public static final String PARQUET_ROW_GROUP_SIZE =
+      "write.parquet.row-group-size-bytes";
+  // 0 means that the table property should be ignored.
+  public static final long UNSET_PARQUET_ROW_GROUP_SIZE = 0;
+  // Valid range for parquet row group size is [8MB, 2047MB]
+  // (see HDFS_MIN_FILE_SIZE defined in hdfs-parquet-table-writer.h)
+  public static final long MIN_PARQUET_ROW_GROUP_SIZE = 8 * 1024 * 1024;
+  public static final long MAX_PARQUET_ROW_GROUP_SIZE = 2047 * 1024 * 1024;
+
+  // Parquet plain page size table property.
+  public static final String PARQUET_PLAIN_PAGE_SIZE = "write.parquet.page-size-bytes";
+  // Parquet dictionary page size table property.
+  public static final String PARQUET_DICT_PAGE_SIZE = "write.parquet.dict-size-bytes";
+  // 0 means that the table property should be ignored.
+  public static final long UNSET_PARQUET_PAGE_SIZE = 0;
+  // Valid range for parquet plain and dictionary page size [64K, 1GB]
+  // (see DEFAULT_DATA_PAGE_SIZE and MAX_DATA_PAGE_SIZE defined in
+  // hdfs-parquet-table-writer.h)
+  public static final long MIN_PARQUET_PAGE_SIZE = 64 * 1024;
+  public static final long MAX_PARQUET_PAGE_SIZE = 1024 * 1024 * 1024;
+
   // Iceberg catalog type dependend on table properties
   private TIcebergCatalog icebergCatalog_;
 
   // Iceberg file format dependend on table properties
   private TIcebergFileFormat icebergFileFormat_;
 
+  // Iceberg parquet compression codec dependent on table properties
+  private TCompressionCodec icebergParquetCompressionCodec_;
+
+  // Iceberg parquet row group size dependent on table property
+  private long icebergParquetRowGroupSize_;
+
+  // Iceberg parquet plain page size dependent on table property
+  private long icebergParquetPlainPageSize_;
+
+  // Iceberg parquet dictionary page size dependent on table property
+  private long icebergParquetDictPageSize_;
+
   // The iceberg file system table location
   private String icebergTableLocation_;
 
@@ -119,6 +170,10 @@ public class IcebergTable extends Table implements FeIcebergTable {
     icebergTableLocation_ = msTable.getSd().getLocation();
     icebergCatalog_ = IcebergUtil.getTIcebergCatalog(msTable);
     icebergFileFormat_ = Utils.getIcebergFileFormat(msTable);
+    icebergParquetCompressionCodec_ = Utils.getIcebergParquetCompressionCodec(msTable);
+    icebergParquetRowGroupSize_ = Utils.getIcebergParquetRowGroupSize(msTable);
+    icebergParquetPlainPageSize_ = Utils.getIcebergParquetPlainPageSize(msTable);
+    icebergParquetDictPageSize_ = Utils.getIcebergParquetDictPageSize(msTable);
     hdfsTable_ = new HdfsTable(msTable, db, name, owner);
   }
 
@@ -192,6 +247,26 @@ public class IcebergTable extends Table implements FeIcebergTable {
   }
 
   @Override
+  public TCompressionCodec getIcebergParquetCompressionCodec() {
+    return icebergParquetCompressionCodec_;
+  }
+
+  @Override
+  public long getIcebergParquetRowGroupSize() {
+    return icebergParquetRowGroupSize_;
+  }
+
+  @Override
+  public long getIcebergParquetPlainPageSize() {
+    return icebergParquetPlainPageSize_;
+  }
+
+  @Override
+  public long getIcebergParquetDictPageSize() {
+    return icebergParquetDictPageSize_;
+  }
+
+  @Override
   public String getIcebergTableLocation() {
     return icebergTableLocation_;
   }
@@ -263,6 +338,10 @@ public class IcebergTable extends Table implements FeIcebergTable {
         // Loading hdfs table after loaded schema from Iceberg,
         // in case we create external Iceberg table skipping column info in sql.
         icebergFileFormat_ = Utils.getIcebergFileFormat(msTbl);
+        icebergParquetCompressionCodec_ = Utils.getIcebergParquetCompressionCodec(msTbl);
+        icebergParquetRowGroupSize_ = Utils.getIcebergParquetRowGroupSize(msTbl);
+        icebergParquetPlainPageSize_ = Utils.getIcebergParquetPlainPageSize(msTbl);
+        icebergParquetDictPageSize_ = Utils.getIcebergParquetDictPageSize(msTbl);
         hdfsTable_
             .load(false, msClient, msTable_, true, true, false, null, null,null, reason);
         pathHashToFileDescMap_ = Utils.loadAllPartition(this);
@@ -330,6 +409,10 @@ public class IcebergTable extends Table implements FeIcebergTable {
     super.loadFromThrift(thriftTable);
     TIcebergTable ticeberg = thriftTable.getIceberg_table();
     icebergTableLocation_ = ticeberg.getTable_location();
+    icebergParquetCompressionCodec_ = ticeberg.getParquet_compression_codec();
+    icebergParquetRowGroupSize_ = ticeberg.getParquet_row_group_size();
+    icebergParquetPlainPageSize_ = ticeberg.getParquet_plain_page_size();
+    icebergParquetDictPageSize_ = ticeberg.getParquet_dict_page_size();
     partitionSpecs_ = loadPartitionBySpecsFromThrift(ticeberg.getPartition_spec());
     defaultPartitionSpecId_ = ticeberg.getDefault_partition_spec_id();
     pathHashToFileDescMap_ = loadFileDescFromThrift(
diff --git a/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergCtasTarget.java b/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergCtasTarget.java
index 205aac7..7279975 100644
--- a/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergCtasTarget.java
+++ b/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergCtasTarget.java
@@ -54,6 +54,7 @@ import org.apache.impala.catalog.StructType;
 import org.apache.impala.thrift.CatalogObjectsConstants;
 import org.apache.impala.thrift.TCatalogObjectType;
 import org.apache.impala.thrift.TColumn;
+import org.apache.impala.thrift.TCompressionCodec;
 import org.apache.impala.thrift.THdfsPartition;
 import org.apache.impala.thrift.THdfsStorageDescriptor;
 import org.apache.impala.thrift.THdfsTable;
@@ -73,6 +74,10 @@ public class IcebergCtasTarget extends CtasTargetTable implements FeIcebergTable
   private Schema iceSchema_;
   private List<IcebergPartitionSpec> partitionSpecs_ = new ArrayList<>();
   private TIcebergFileFormat icebergFileFormat_;
+  private TCompressionCodec icebergParquetCompressionCodec_;
+  private long icebergParquetRowGroupSize_;
+  private long icebergParquetPlainPageSize_;
+  private long icebergParquetDictPageSize_;
   private TIcebergCatalog icebergCatalog_;
   private String icebergTableLocation_;
   private String icebergCatalogLocation_;
@@ -88,6 +93,10 @@ public class IcebergCtasTarget extends CtasTargetTable implements FeIcebergTable
     icebergCatalog_ = IcebergUtil.getTIcebergCatalog(msTbl);
     setLocations();
     icebergFileFormat_ = Utils.getIcebergFileFormat(msTbl);
+    icebergParquetCompressionCodec_ = Utils.getIcebergParquetCompressionCodec(msTbl);
+    icebergParquetRowGroupSize_ = Utils.getIcebergParquetRowGroupSize(msTbl);
+    icebergParquetPlainPageSize_ = Utils.getIcebergParquetPlainPageSize(msTbl);
+    icebergParquetDictPageSize_ = Utils.getIcebergParquetDictPageSize(msTbl);
     hdfsSd_ = HdfsStorageDescriptor.fromStorageDescriptor(name_, msTable_.getSd());
   }
 
@@ -186,6 +195,26 @@ public class IcebergCtasTarget extends CtasTargetTable implements FeIcebergTable
   }
 
   @Override
+  public TCompressionCodec getIcebergParquetCompressionCodec() {
+    return icebergParquetCompressionCodec_;
+  }
+
+  @Override
+  public long getIcebergParquetRowGroupSize() {
+    return icebergParquetRowGroupSize_;
+  }
+
+  @Override
+  public long getIcebergParquetPlainPageSize() {
+    return icebergParquetPlainPageSize_;
+  }
+
+  @Override
+  public long getIcebergParquetDictPageSize() {
+    return icebergParquetDictPageSize_;
+  }
+
+  @Override
   public String getIcebergTableLocation() {
     return icebergTableLocation_;
   }
diff --git a/fe/src/main/java/org/apache/impala/catalog/local/LocalIcebergTable.java b/fe/src/main/java/org/apache/impala/catalog/local/LocalIcebergTable.java
index 0de145e..9f7b83a 100644
--- a/fe/src/main/java/org/apache/impala/catalog/local/LocalIcebergTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/local/LocalIcebergTable.java
@@ -37,6 +37,7 @@ import org.apache.impala.catalog.FeFsTable;
 import org.apache.impala.catalog.FeIcebergTable;
 import org.apache.impala.catalog.HdfsPartition.FileDescriptor;
 import org.apache.impala.catalog.TableLoadingException;
+import org.apache.impala.thrift.TCompressionCodec;
 import org.apache.impala.thrift.THdfsPartition;
 import org.apache.impala.thrift.THdfsTable;
 import org.apache.impala.thrift.TIcebergCatalog;
@@ -55,6 +56,10 @@ import com.google.errorprone.annotations.Immutable;
 public class LocalIcebergTable extends LocalTable implements FeIcebergTable {
   private TableParams tableParams_;
   private TIcebergFileFormat icebergFileFormat_;
+  private TCompressionCodec icebergParquetCompressionCodec_;
+  private long icebergParquetRowGroupSize_;
+  private long icebergParquetPlainPageSize_;
+  private long icebergParquetDictPageSize_;
   private List<IcebergPartitionSpec> partitionSpecs_;
   private int defaultPartitionSpecId_;
   private Map<String, FileDescriptor> pathHashToFileDescMap_;
@@ -109,6 +114,10 @@ public class LocalIcebergTable extends LocalTable implements FeIcebergTable {
           (Exception)e);
     }
     icebergFileFormat_ = Utils.getIcebergFileFormat(msTable);
+    icebergParquetCompressionCodec_ = Utils.getIcebergParquetCompressionCodec(msTable);
+    icebergParquetRowGroupSize_ = Utils.getIcebergParquetRowGroupSize(msTable);
+    icebergParquetPlainPageSize_ = Utils.getIcebergParquetPlainPageSize(msTable);
+    icebergParquetDictPageSize_ = Utils.getIcebergParquetDictPageSize(msTable);
   }
 
   static void validateColumns(List<Column> impalaCols, List<FieldSchema> hmsCols) {
@@ -125,6 +134,26 @@ public class LocalIcebergTable extends LocalTable implements FeIcebergTable {
   }
 
   @Override
+  public TCompressionCodec getIcebergParquetCompressionCodec() {
+    return icebergParquetCompressionCodec_;
+  }
+
+  @Override
+  public long getIcebergParquetRowGroupSize() {
+    return icebergParquetRowGroupSize_;
+  }
+
+  @Override
+  public long getIcebergParquetPlainPageSize() {
+    return icebergParquetPlainPageSize_;
+  }
+
+  @Override
+  public long getIcebergParquetDictPageSize() {
+    return icebergParquetDictPageSize_;
+  }
+
+  @Override
   public String getIcebergTableLocation() {
     return tableParams_.icebergTableLocation_;
   }
diff --git a/fe/src/main/java/org/apache/impala/util/IcebergUtil.java b/fe/src/main/java/org/apache/impala/util/IcebergUtil.java
index 5c4ff03..6829e0e 100644
--- a/fe/src/main/java/org/apache/impala/util/IcebergUtil.java
+++ b/fe/src/main/java/org/apache/impala/util/IcebergUtil.java
@@ -31,9 +31,12 @@ import java.util.List;
 import java.util.Map;
 
 import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Maps;
 import com.google.common.hash.Hasher;
 import com.google.common.hash.Hashing;
+import com.google.common.primitives.Ints;
+import com.google.common.primitives.Longs;
 
 import org.apache.impala.catalog.IcebergStructField;
 import org.apache.impala.common.Pair;
@@ -70,7 +73,9 @@ import org.apache.impala.catalog.iceberg.IcebergHiveCatalog;
 import org.apache.impala.catalog.iceberg.IcebergCatalog;
 import org.apache.impala.catalog.iceberg.IcebergCatalogs;
 import org.apache.impala.common.ImpalaRuntimeException;
+import org.apache.impala.thrift.TCompressionCodec;
 import org.apache.impala.thrift.TCreateTableParams;
+import org.apache.impala.thrift.THdfsCompression;
 import org.apache.impala.thrift.THdfsFileFormat;
 import org.apache.impala.thrift.TIcebergCatalog;
 import org.apache.impala.thrift.TIcebergFileFormat;
@@ -308,6 +313,47 @@ public class IcebergUtil {
     return null;
   }
 
+  /**
+   * Map from parquet compression codec names to a compression type.
+   * The list of parquet supported compression codecs was taken from
+   * hdfs-parquet-table-writer.cc.
+   */
+  public static final ImmutableMap<String, THdfsCompression> PARQUET_CODEC_MAP =
+      ImmutableMap.<String, THdfsCompression>builder().
+          put("none", THdfsCompression.NONE).
+          put("gzip", THdfsCompression.GZIP).
+          put("snappy", THdfsCompression.SNAPPY).
+          put("lz4", THdfsCompression.LZ4).
+          put("zstd", THdfsCompression.ZSTD).
+          build();
+
+  public static THdfsCompression getIcebergParquetCompressionCodec(String codec) {
+    if (codec == null) return IcebergTable.DEFAULT_PARQUET_COMPRESSION_CODEC;
+    return PARQUET_CODEC_MAP.get(codec.toLowerCase());
+  }
+
+  public static long getIcebergParquetRowGroupSize(String rowGroupSize) {
+    if (rowGroupSize == null) return IcebergTable.UNSET_PARQUET_ROW_GROUP_SIZE;
+
+    Long rgSize = Longs.tryParse(rowGroupSize);
+    if (rgSize == null || rgSize < IcebergTable.MIN_PARQUET_ROW_GROUP_SIZE ||
+        rgSize > IcebergTable.MAX_PARQUET_ROW_GROUP_SIZE) {
+      return IcebergTable.UNSET_PARQUET_ROW_GROUP_SIZE;
+    }
+    return rgSize;
+  }
+
+  public static long getIcebergParquetPageSize(String pageSize) {
+    if (pageSize == null) return IcebergTable.UNSET_PARQUET_PAGE_SIZE;
+
+    Long pSize = Longs.tryParse(pageSize);
+    if (pSize == null || pSize < IcebergTable.MIN_PARQUET_PAGE_SIZE ||
+        pSize > IcebergTable.MAX_PARQUET_PAGE_SIZE) {
+      return IcebergTable.UNSET_PARQUET_PAGE_SIZE;
+    }
+    return pSize;
+  }
+
   public static IcebergPartitionTransform getPartitionTransform(
       PartitionField field, HashMap<String, Integer> transformParams)
       throws TableLoadingException {
@@ -640,4 +686,99 @@ public class IcebergUtil {
         ZoneOffset.UTC);
     return (int)ChronoUnit.HOURS.between(EPOCH, datetime);
   }
+
+  public static TCompressionCodec parseParquetCompressionCodec(
+      boolean onCreateTbl, Map<String, String> tblProperties, StringBuilder errMsg) {
+    String codecTblProp = tblProperties.get(IcebergTable.PARQUET_COMPRESSION_CODEC);
+    THdfsCompression codec = getIcebergParquetCompressionCodec(codecTblProp);
+    if (codec == null) {
+      errMsg.append("Invalid parquet compression codec for Iceberg table: ")
+          .append(codecTblProp);
+      return null;
+    }
+
+    TCompressionCodec compressionCodec = new TCompressionCodec();
+    if (tblProperties.containsKey(IcebergTable.PARQUET_COMPRESSION_CODEC)) {
+      compressionCodec.setCodec(codec);
+    }
+
+    if (onCreateTbl && codec != THdfsCompression.ZSTD) {
+      if (tblProperties.containsKey(IcebergTable.PARQUET_COMPRESSION_LEVEL)) {
+        errMsg.append("Parquet compression level cannot be set for codec ")
+          .append(codec)
+          .append(". Only ZSTD codec supports compression level table property.");
+        return null;
+      }
+    } else if (tblProperties.containsKey(IcebergTable.PARQUET_COMPRESSION_LEVEL)) {
+      String clevelTblProp = tblProperties.get(IcebergTable.PARQUET_COMPRESSION_LEVEL);
+      Integer clevel = Ints.tryParse(clevelTblProp);
+      if (clevel == null) {
+        errMsg.append("Invalid parquet compression level for Iceberg table: ")
+            .append(clevelTblProp);
+        return null;
+      } else if (clevel < IcebergTable.MIN_PARQUET_COMPRESSION_LEVEL ||
+          clevel > IcebergTable.MAX_PARQUET_COMPRESSION_LEVEL) {
+        errMsg.append("Parquet compression level for Iceberg table should fall in " +
+            "the range of [")
+            .append(String.valueOf(IcebergTable.MIN_PARQUET_COMPRESSION_LEVEL))
+            .append("..")
+            .append(String.valueOf(IcebergTable.MAX_PARQUET_COMPRESSION_LEVEL))
+            .append("]");
+        return null;
+      }
+      compressionCodec.setCompression_level(clevel);
+    }
+    return compressionCodec;
+  }
+
+  public static Long parseParquetRowGroupSize(Map<String, String> tblProperties,
+      StringBuilder errMsg) {
+    if (tblProperties.containsKey(IcebergTable.PARQUET_ROW_GROUP_SIZE)) {
+      String propVal = tblProperties.get(IcebergTable.PARQUET_ROW_GROUP_SIZE);
+      Long rowGroupSize = Longs.tryParse(propVal);
+      if (rowGroupSize == null) {
+        errMsg.append("Invalid parquet row group size for Iceberg table: ")
+            .append(propVal);
+        return null;
+      } else if (rowGroupSize < IcebergTable.MIN_PARQUET_ROW_GROUP_SIZE ||
+          rowGroupSize > IcebergTable.MAX_PARQUET_ROW_GROUP_SIZE) {
+        errMsg.append("Parquet row group size for Iceberg table should ")
+            .append("fall in the range of [")
+            .append(String.valueOf(IcebergTable.MIN_PARQUET_ROW_GROUP_SIZE))
+            .append("..")
+            .append(String.valueOf(IcebergTable.MAX_PARQUET_ROW_GROUP_SIZE))
+            .append("]");
+        return null;
+      }
+      return rowGroupSize;
+    }
+    return IcebergTable.UNSET_PARQUET_ROW_GROUP_SIZE;
+  }
+
+  public static Long parseParquetPageSize(Map<String, String> tblProperties,
+      String property, String descr, StringBuilder errMsg) {
+    if (tblProperties.containsKey(property)) {
+      String propVal = tblProperties.get(property);
+      Long pageSize = Longs.tryParse(propVal);
+      if (pageSize == null) {
+        errMsg.append("Invalid parquet ")
+            .append(descr)
+            .append(" for Iceberg table: ")
+            .append(propVal);
+        return null;
+      } else if (pageSize < IcebergTable.MIN_PARQUET_PAGE_SIZE ||
+          pageSize > IcebergTable.MAX_PARQUET_PAGE_SIZE) {
+        errMsg.append("Parquet ")
+            .append(descr)
+            .append(" for Iceberg table should fall in the range of [")
+            .append(String.valueOf(IcebergTable.MIN_PARQUET_PAGE_SIZE))
+            .append("..")
+            .append(String.valueOf(IcebergTable.MAX_PARQUET_PAGE_SIZE))
+            .append("]");
+        return null;
+      }
+      return pageSize;
+    }
+    return IcebergTable.UNSET_PARQUET_PAGE_SIZE;
+  }
 }
diff --git a/testdata/datasets/functional/functional_schema_template.sql b/testdata/datasets/functional/functional_schema_template.sql
index 3c0e900..b3ce714 100644
--- a/testdata/datasets/functional/functional_schema_template.sql
+++ b/testdata/datasets/functional/functional_schema_template.sql
@@ -2972,7 +2972,7 @@ iceberg_partitioned
 CREATE EXTERNAL TABLE IF NOT EXISTS {db_name}{db_suffix}.{table_name}
 STORED AS ICEBERG
 LOCATION '/test-warehouse/iceberg_test/iceberg_partitioned'
-TBLPROPERTIES('iceberg.file_format'='parquet', 'iceberg.catalog'='hadoop.tables');
+TBLPROPERTIES('write.format.default'='parquet', 'iceberg.catalog'='hadoop.tables');
 ---- DEPENDENT_LOAD
 `hadoop fs -mkdir -p /test-warehouse/iceberg_test && \
 hadoop fs -put -f ${IMPALA_HOME}/testdata/data/iceberg_test/iceberg_partitioned /test-warehouse/iceberg_test/
@@ -2985,7 +2985,7 @@ iceberg_non_partitioned
 CREATE EXTERNAL TABLE IF NOT EXISTS {db_name}{db_suffix}.{table_name}
 STORED AS ICEBERG
 LOCATION '/test-warehouse/iceberg_test/iceberg_non_partitioned'
-TBLPROPERTIES('iceberg.file_format'='parquet', 'iceberg.catalog'='hadoop.tables');
+TBLPROPERTIES('write.format.default'='parquet', 'iceberg.catalog'='hadoop.tables');
 ---- DEPENDENT_LOAD
 `hadoop fs -mkdir -p /test-warehouse/iceberg_test && \
 hadoop fs -put -f ${IMPALA_HOME}/testdata/data/iceberg_test/iceberg_non_partitioned /test-warehouse/iceberg_test/
@@ -2997,7 +2997,7 @@ hadoop_catalog_test_external
 ---- CREATE
 CREATE EXTERNAL TABLE IF NOT EXISTS {db_name}{db_suffix}.{table_name}
 STORED AS ICEBERG
-TBLPROPERTIES('iceberg.file_format'='parquet', 'iceberg.catalog'='hadoop.catalog',
+TBLPROPERTIES('write.format.default'='parquet', 'iceberg.catalog'='hadoop.catalog',
 'iceberg.catalog_location'='/test-warehouse/iceberg_test/hadoop_catalog/hadoop_catalog_test',
 'iceberg.table_identifier'='functional_parquet.hadoop_catalog_test');
 ---- DEPENDENT_LOAD
@@ -3011,7 +3011,10 @@ iceberg_partitioned_orc_external
 ---- CREATE
 CREATE EXTERNAL TABLE IF NOT EXISTS {db_name}{db_suffix}.{table_name}
 STORED AS ICEBERG
-TBLPROPERTIES('iceberg.file_format'='orc', 'iceberg.catalog'='hadoop.catalog', 'iceberg.catalog_location'='/test-warehouse/iceberg_test/hadoop_catalog/iceberg_partitioned_orc', 'iceberg.table_identifier'='functional_parquet.iceberg_partitioned_orc');
+TBLPROPERTIES('write.format.default'='orc',
+'iceberg.catalog'='hadoop.catalog',
+'iceberg.catalog_location'='/test-warehouse/iceberg_test/hadoop_catalog/iceberg_partitioned_orc',
+'iceberg.table_identifier'='functional_parquet.iceberg_partitioned_orc');
 ---- DEPENDENT_LOAD
 `hadoop fs -mkdir -p /test-warehouse/iceberg_test/hadoop_catalog && \
 hadoop fs -put -f ${IMPALA_HOME}/testdata/data/iceberg_test/hadoop_catalog/iceberg_partitioned_orc /test-warehouse/iceberg_test/hadoop_catalog/
@@ -3023,7 +3026,7 @@ complextypestbl_iceberg_orc
 ---- CREATE
 CREATE EXTERNAL TABLE IF NOT EXISTS {db_name}{db_suffix}.{table_name}
 STORED AS ICEBERG
-TBLPROPERTIES('iceberg.file_format'='orc', 'iceberg.catalog'='hadoop.catalog',
+TBLPROPERTIES('write.format.default'='orc', 'iceberg.catalog'='hadoop.catalog',
               'iceberg.catalog_location'='/test-warehouse/iceberg_test/hadoop_catalog',
               'iceberg.table_identifier'='ice.complextypestbl_iceberg_orc');
 ---- DEPENDENT_LOAD
@@ -3037,7 +3040,7 @@ iceberg_resolution_test_external
 ---- CREATE
 CREATE EXTERNAL TABLE IF NOT EXISTS {db_name}{db_suffix}.{table_name}
 STORED AS ICEBERG
-TBLPROPERTIES('iceberg.file_format'='parquet', 'iceberg.catalog'='hadoop.catalog',
+TBLPROPERTIES('write.format.default'='parquet', 'iceberg.catalog'='hadoop.catalog',
 'iceberg.catalog_location'='/test-warehouse/iceberg_test/hadoop_catalog/iceberg_resolution_test',
 'iceberg.table_identifier'='functional_parquet.iceberg_resolution_test');
 ---- DEPENDENT_LOAD
diff --git a/testdata/workloads/functional-query/queries/QueryTest/iceberg-alter.test b/testdata/workloads/functional-query/queries/QueryTest/iceberg-alter.test
index ed64e55..504341d 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/iceberg-alter.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/iceberg-alter.test
@@ -123,18 +123,18 @@ Could not resolve table reference: 'iceberg_rename'
 ---- QUERY
 CREATE TABLE iceberg_changing_fileformats (i int)
 STORED AS ICEBERG
-TBLPROPERTIES('iceberg.file_format'='orc');
+TBLPROPERTIES('write.format.default'='orc');
 DESCRIBE FORMATTED iceberg_changing_fileformats;
 ---- RESULTS: VERIFY_IS_SUBSET
-'','iceberg.file_format ','orc                 '
+'','write.format.default','orc                 '
 ---- TYPES
 string, string, string
 ====
 ---- QUERY
-ALTER TABLE iceberg_changing_fileformats set TBLPROPERTIES('iceberg.file_format'='parquet');
+ALTER TABLE iceberg_changing_fileformats set TBLPROPERTIES('write.format.default'='parquet');
 DESCRIBE FORMATTED iceberg_changing_fileformats;
 ---- RESULTS: VERIFY_IS_SUBSET
-'','iceberg.file_format ','parquet             '
+'','write.format.default','parquet             '
 ---- TYPES
 string, string, string
 ====
@@ -147,14 +147,14 @@ SELECT * FROM iceberg_changing_fileformats;
 INT
 ====
 ---- QUERY
-ALTER TABLE iceberg_changing_fileformats set TBLPROPERTIES('iceberg.file_format'='ORC');
+ALTER TABLE iceberg_changing_fileformats set TBLPROPERTIES('write.format.default'='ORC');
 ---- CATCH
 Attempt to set Iceberg data file format to ORC
 ====
 ---- QUERY
 DESCRIBE FORMATTED iceberg_changing_fileformats;
 ---- RESULTS: VERIFY_IS_SUBSET
-'','iceberg.file_format ','parquet             '
+'','write.format.default','parquet             '
 ---- TYPES
 string, string, string
 ====
@@ -285,3 +285,43 @@ DESCRIBE ice_alter_cols;
 ---- TYPES
 STRING,STRING,STRING,STRING
 ====
+---- QUERY
+CREATE TABLE iceberg_changing_parq_tblprops (i int)
+STORED AS ICEBERG
+TBLPROPERTIES (
+    'write.parquet.row-group-size-bytes'='134217728',
+    'write.parquet.compression-codec'='zstd',
+    'write.parquet.compression-level'='12',
+    'write.parquet.page-size-bytes'='65536',
+    'write.parquet.dict-size-bytes'='131072'
+);
+DESCRIBE FORMATTED iceberg_changing_parq_tblprops;
+---- RESULTS: VERIFY_IS_SUBSET
+'','write.format.default','parquet             '
+'','write.parquet.row-group-size-bytes','134217728           '
+'','write.parquet.compression-codec','zstd                '
+'','write.parquet.compression-level','12                  '
+'','write.parquet.page-size-bytes','65536               '
+'','write.parquet.dict-size-bytes','131072              '
+---- TYPES
+string, string, string
+====
+---- QUERY
+ALTER TABLE iceberg_changing_parq_tblprops set TBLPROPERTIES(
+    'write.parquet.row-group-size-bytes'='268435456',
+    'write.parquet.compression-codec'='snappy',
+    'write.parquet.compression-level'='11',
+    'write.parquet.page-size-bytes'='131072',
+    'write.parquet.dict-size-bytes'='65536'
+);
+DESCRIBE FORMATTED iceberg_changing_parq_tblprops;
+---- RESULTS: VERIFY_IS_SUBSET
+'','write.format.default','parquet             '
+'','write.parquet.row-group-size-bytes','268435456           '
+'','write.parquet.compression-codec','snappy              '
+'','write.parquet.compression-level','11                  '
+'','write.parquet.page-size-bytes','131072              '
+'','write.parquet.dict-size-bytes','65536               '
+---- TYPES
+string, string, string
+====
diff --git a/testdata/workloads/functional-query/queries/QueryTest/iceberg-catalogs.test b/testdata/workloads/functional-query/queries/QueryTest/iceberg-catalogs.test
index de4ab34..3df404e 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/iceberg-catalogs.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/iceberg-catalogs.test
@@ -12,7 +12,7 @@ TBLPROPERTIES('iceberg.catalog'='ice_hadoop_cat');
 DESCRIBE FORMATTED iceberg_hadoop_catalogs;
 ---- RESULTS: VERIFY_IS_SUBSET
 'Location:           ','$NAMENODE/test-warehouse/ice_hadoop_cat/$DATABASE/iceberg_hadoop_catalogs','NULL'
-'','iceberg.file_format ','parquet             '
+'','write.format.default','parquet             '
 '','iceberg.catalog     ','ice_hadoop_cat      '
 ---- TYPES
 string, string, string
@@ -29,7 +29,7 @@ TBLPROPERTIES('iceberg.catalog'='ice_hadoop_cat',
 DESCRIBE FORMATTED iceberg_hadoop_catalogs_with_id;
 ---- RESULTS: VERIFY_IS_SUBSET
 'Location:           ','$NAMENODE/test-warehouse/ice_hadoop_cat/org/db/tbl','NULL'
-'','iceberg.file_format ','parquet             '
+'','write.format.default','parquet             '
 '','iceberg.catalog     ','ice_hadoop_cat      '
 '','iceberg.table_identifier','org.db.tbl          '
 '','name                ','org.db.tbl          '
@@ -63,7 +63,7 @@ TBLPROPERTIES('iceberg.catalog'='ice_hadoop_cat',
 DESCRIBE FORMATTED iceberg_hadoop_cat_with_id_ext;
 ---- RESULTS: VERIFY_IS_SUBSET
 'Location:           ','$NAMENODE/test-warehouse/ice_hadoop_cat/org/db/tbl','NULL'
-'','iceberg.file_format ','parquet             '
+'','write.format.default','parquet             '
 '','iceberg.catalog     ','ice_hadoop_cat      '
 '','iceberg.table_identifier','org.db.tbl          '
 '','name                ','org.db.tbl          '
@@ -103,7 +103,7 @@ TBLPROPERTIES('iceberg.catalog'='ice_hive_cat');
 DESCRIBE FORMATTED iceberg_hive_catalogs;
 ---- RESULTS: VERIFY_IS_SUBSET
 'Location:           ','$NAMENODE/test-warehouse/$DATABASE.db/iceberg_hive_catalogs','NULL'
-'','iceberg.file_format ','parquet             '
+'','write.format.default','parquet             '
 '','iceberg.catalog     ','ice_hive_cat        '
 ---- TYPES
 string, string, string
@@ -135,7 +135,7 @@ TBLPROPERTIES('iceberg.catalog'='ice_hive_cat',
 DESCRIBE FORMATTED iceberg_hive_catalogs_ext;
 ---- RESULTS: VERIFY_IS_SUBSET
 'Location:           ','$NAMENODE/test-warehouse/$DATABASE.db/iceberg_hive_catalogs','NULL'
-'','iceberg.file_format ','parquet             '
+'','write.format.default','parquet             '
 '','iceberg.catalog     ','ice_hive_cat        '
 '','iceberg.table_identifier','$DATABASE.iceberg_hive_catalogs'
 '','name                ','$DATABASE.iceberg_hive_catalogs'
diff --git a/testdata/workloads/functional-query/queries/QueryTest/iceberg-create.test b/testdata/workloads/functional-query/queries/QueryTest/iceberg-create.test
index 5486554..67ad59f 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/iceberg-create.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/iceberg-create.test
@@ -266,7 +266,7 @@ DESCRIBE FORMATTED iceberg_hadoop_cat_query;
 ---- RESULTS: VERIFY_IS_SUBSET
 'Location:           ','$NAMENODE/test-warehouse/$DATABASE/hadoop_catalog_test/iceberg_test/$DATABASE/iceberg_hadoop_cat_query','NULL'
 '','iceberg.catalog_location','/test-warehouse/$DATABASE/hadoop_catalog_test/iceberg_test'
-'','iceberg.file_format ','parquet             '
+'','write.format.default','parquet             '
 '','iceberg.catalog     ','hadoop.catalog      '
 ---- TYPES
 string, string, string
@@ -283,7 +283,7 @@ DESCRIBE FORMATTED iceberg_hadoop_cat_with_ident;
 ---- RESULTS: VERIFY_IS_SUBSET
 'Location:           ','$NAMENODE/test-warehouse/$DATABASE/hadoop_catalog_test/iceberg_test/org/db/tbl','NULL'
 '','iceberg.catalog_location','/test-warehouse/$DATABASE/hadoop_catalog_test/iceberg_test'
-'','iceberg.file_format ','parquet             '
+'','write.format.default','parquet             '
 '','iceberg.catalog     ','hadoop.catalog      '
 ---- TYPES
 string, string, string
@@ -305,7 +305,7 @@ DESCRIBE FORMATTED iceberg_hadoop_cat_with_ident_ext;
 ---- RESULTS: VERIFY_IS_SUBSET
 'Location:           ','$NAMENODE/test-warehouse/$DATABASE/hadoop_catalog_test/iceberg_test/org/db/tbl','NULL'
 '','iceberg.catalog_location','/test-warehouse/$DATABASE/hadoop_catalog_test/iceberg_test'
-'','iceberg.file_format ','parquet             '
+'','write.format.default','parquet             '
 '','iceberg.catalog     ','hadoop.catalog      '
 ---- TYPES
 string, string, string
@@ -335,7 +335,7 @@ PARTITIONED BY SPEC
   DAY(register_time)
 )
 STORED AS ICEBERG
-TBLPROPERTIES('iceberg.file_format'='orc','iceberg.catalog'='hadoop.catalog',
+TBLPROPERTIES('write.format.default'='orc','iceberg.catalog'='hadoop.catalog',
 'iceberg.catalog_location'='/test-warehouse/$DATABASE/hadoop_catalog_test/iceberg_test');
 ---- RESULTS
 'Table has been created.'
diff --git a/testdata/workloads/functional-query/queries/QueryTest/iceberg-insert.test b/testdata/workloads/functional-query/queries/QueryTest/iceberg-insert.test
index 9075765..a05b440 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/iceberg-insert.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/iceberg-insert.test
@@ -269,3 +269,137 @@ row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/custom_hive_cat/data/.*.0.parq'
 ---- TYPES
 STRING, STRING, STRING
 ====
+---- QUERY
+# Create a table that is a subset of 'alltypes' table, i.e. it only
+# contains the data types supported by Iceberg.
+create table iceberg_alltypes_parq_tblprop(
+  id INT COMMENT 'Add a comment',
+  bool_col BOOLEAN,
+  int_col INT,
+  bigint_col BIGINT,
+  float_col FLOAT,
+  double_col DOUBLE,
+  date_col DATE,
+  string_col STRING,
+  timestamp_col TIMESTAMP
+)
+stored as iceberg
+tblproperties('write.format.default'='parquet',
+    'write.parquet.row-group-size-bytes'='8388608',
+    'write.parquet.compression-codec'='gzip',
+    'write.parquet.page-size-bytes'='65536',
+    'write.parquet.dict-size-bytes'='1073741824'
+);
+---- RESULTS
+'Table has been created.'
+====
+---- QUERY
+insert into iceberg_alltypes_parq_tblprop
+select id, bool_col, int_col, bigint_col, float_col, double_col,
+CAST(date_string_col as date FORMAT 'MM/DD/YY'), string_col, timestamp_col
+from functional.alltypes;
+---- RESULTS
+: 7300
+====
+---- QUERY
+alter table iceberg_alltypes_parq_tblprop set tblproperties (
+    'write.parquet.row-group-size-bytes'='536870912',
+    'write.parquet.compression-codec'='none',
+    'write.parquet.page-size-bytes'='134217728',
+    'write.parquet.dict-size-bytes'='805306368');
+====
+---- QUERY
+insert into iceberg_alltypes_parq_tblprop
+select id, bool_col, int_col, bigint_col, float_col, double_col,
+CAST(date_string_col as date FORMAT 'MM/DD/YY'), string_col, timestamp_col
+from functional.alltypes;
+---- RESULTS
+: 7300
+====
+---- QUERY
+alter table iceberg_alltypes_parq_tblprop set tblproperties (
+    'write.parquet.row-group-size-bytes'='1073741824',
+    'write.parquet.compression-codec'='zstd',
+    'write.parquet.compression-level'='1',
+    'write.parquet.page-size-bytes'='402653184',
+    'write.parquet.dict-size-bytes'='536870912');
+====
+---- QUERY
+insert into iceberg_alltypes_parq_tblprop
+select id, bool_col, int_col, bigint_col, float_col, double_col,
+CAST(date_string_col as date FORMAT 'MM/DD/YY'), string_col, timestamp_col
+from functional.alltypes;
+---- RESULTS
+: 7300
+====
+---- QUERY
+alter table iceberg_alltypes_parq_tblprop set tblproperties (
+    'write.parquet.row-group-size-bytes'='1610612736',
+    'write.parquet.compression-codec'='zstd',
+    'write.parquet.compression-level'='13',
+    'write.parquet.page-size-bytes'='536870912',
+    'write.parquet.dict-size-bytes'='402653184');
+====
+---- QUERY
+insert into iceberg_alltypes_parq_tblprop
+select id, bool_col, int_col, bigint_col, float_col, double_col,
+CAST(date_string_col as date FORMAT 'MM/DD/YY'), string_col, timestamp_col
+from functional.alltypes;
+---- RESULTS
+: 7300
+====
+---- QUERY
+alter table iceberg_alltypes_parq_tblprop set tblproperties (
+    'write.parquet.row-group-size-bytes'='1879048192',
+    'write.parquet.compression-codec'='zstd',
+    'write.parquet.compression-level'='18',
+    'write.parquet.page-size-bytes'='805306368',
+    'write.parquet.dict-size-bytes'='134217728');
+====
+---- QUERY
+insert into iceberg_alltypes_parq_tblprop
+select id, bool_col, int_col, bigint_col, float_col, double_col,
+CAST(date_string_col as date FORMAT 'MM/DD/YY'), string_col, timestamp_col
+from functional.alltypes;
+---- RESULTS
+: 7300
+====
+---- QUERY
+alter table iceberg_alltypes_parq_tblprop set tblproperties (
+    'write.parquet.row-group-size-bytes'='2146435072',
+    'write.parquet.compression-codec'='zstd',
+    'write.parquet.compression-level'='22',
+    'write.parquet.page-size-bytes'='1073741824',
+    'write.parquet.dict-size-bytes'='65536');
+====
+---- QUERY
+insert into iceberg_alltypes_parq_tblprop
+select id, bool_col, int_col, bigint_col, float_col, double_col,
+CAST(date_string_col as date FORMAT 'MM/DD/YY'), string_col, timestamp_col
+from functional.alltypes;
+---- RESULTS
+: 7300
+====
+---- QUERY
+alter table iceberg_alltypes_parq_tblprop unset tblproperties (
+    'write.parquet.row-group-size-bytes',
+    'write.parquet.compression-codec',
+    'write.parquet.compression-level',
+    'write.parquet.page-size-bytes',
+    'write.parquet.dict-size-bytes');
+====
+---- QUERY
+insert into iceberg_alltypes_parq_tblprop
+select id, bool_col, int_col, bigint_col, float_col, double_col,
+CAST(date_string_col as date FORMAT 'MM/DD/YY'), string_col, timestamp_col
+from functional.alltypes;
+---- RESULTS
+: 7300
+====
+---- QUERY
+select count(*) from iceberg_alltypes_parq_tblprop;
+---- RESULTS
+51100
+---- TYPES
+BIGINT
+====
diff --git a/testdata/workloads/functional-query/queries/QueryTest/iceberg-negative.test b/testdata/workloads/functional-query/queries/QueryTest/iceberg-negative.test
index d287c78..faa7506 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/iceberg-negative.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/iceberg-negative.test
@@ -329,14 +329,14 @@ Syntax error in line
 ---- QUERY
 CREATE TABLE iceberg_wrong_fileformat (i int)
 STORED AS ICEBERG
-TBLPROPERTIES('iceberg.file_format'='or');
+TBLPROPERTIES('write.format.default'='or');
 ---- CATCH
 Invalid fileformat for Iceberg table: or
 ====
 ---- QUERY
 CREATE TABLE iceberg_set_wrong_fileformat (i int)
 STORED AS ICEBERG;
-ALTER TABLE iceberg_set_wrong_fileformat SET TBLPROPERTIES ('iceberg.file_format'='parq');
+ALTER TABLE iceberg_set_wrong_fileformat SET TBLPROPERTIES ('write.format.default'='parq');
 ---- CATCH
 Invalid fileformat for Iceberg table: parq
 ====
@@ -347,3 +347,215 @@ STORED AS ICEBERG;
 ---- CATCH
 Unsupported iceberg partition type: WRONG
 ====
+---- QUERY
+CREATE TABLE iceberg_wrong_parquet_row_group_size1 ( i int)
+STORED AS ICEBERG
+TBLPROPERTIES(
+    'write.format.default'='orc',
+    'write.parquet.row-group-size-bytes'='134217728');
+---- CATCH
+write.parquet.row-group-size-bytes should be set only for parquet file format
+====
+---- QUERY
+CREATE TABLE iceberg_wrong_parquet_row_group_size2 ( i int)
+STORED AS ICEBERG
+TBLPROPERTIES('write.parquet.row-group-size-bytes'='8388607');
+---- CATCH
+Parquet row group size for Iceberg table should fall in the range of [8388608..2146435072]
+====
+---- QUERY
+CREATE TABLE iceberg_set_wrong_parquet_row_group_size2 ( i int)
+STORED AS ICEBERG;
+ALTER TABLE iceberg_set_wrong_parquet_row_group_size2 SET
+TBLPROPERTIES('write.parquet.row-group-size-bytes'='8388607');
+---- CATCH
+Parquet row group size for Iceberg table should fall in the range of [8388608..2146435072]
+====
+---- QUERY
+CREATE TABLE iceberg_wrong_parquet_row_group_size3 ( i int)
+STORED AS ICEBERG
+TBLPROPERTIES('write.parquet.row-group-size-bytes'='134217728a');
+---- CATCH
+Invalid parquet row group size for Iceberg table: 134217728a
+====
+---- QUERY
+CREATE TABLE iceberg_set_wrong_parquet_row_group_size3 ( i int)
+STORED AS ICEBERG;
+ALTER TABLE iceberg_set_wrong_parquet_row_group_size3 SET
+TBLPROPERTIES('write.parquet.row-group-size-bytes'='134217728a');
+---- CATCH
+Invalid parquet row group size for Iceberg table: 134217728a
+====
+---- QUERY
+CREATE TABLE iceberg_wrong_parquet_row_group_size4 ( i int)
+STORED AS ICEBERG
+TBLPROPERTIES('write.parquet.row-group-size-bytes'='2146435073');
+---- CATCH
+Parquet row group size for Iceberg table should fall in the range of [8388608..2146435072]
+====
+---- QUERY
+CREATE TABLE iceberg_set_wrong_parquet_row_group_size4 ( i int)
+STORED AS ICEBERG;
+ALTER TABLE iceberg_set_wrong_parquet_row_group_size4 SET
+TBLPROPERTIES('write.parquet.row-group-size-bytes'='2146435073');
+---- CATCH
+Parquet row group size for Iceberg table should fall in the range of [8388608..2146435072]
+====
+---- QUERY
+CREATE TABLE iceberg_wrong_parquet_page_size1 ( i int)
+STORED AS ICEBERG
+TBLPROPERTIES('write.format.default'='orc', 'write.parquet.page-size-bytes'='65536');
+---- CATCH
+write.parquet.page-size-bytes should be set only for parquet file format
+====
+---- QUERY
+CREATE TABLE iceberg_wrong_parquet_page_size2 ( i int)
+STORED AS ICEBERG
+TBLPROPERTIES('write.parquet.page-size-bytes'='65535');
+---- CATCH
+Parquet page size for Iceberg table should fall in the range of [65536..1073741824]
+====
+---- QUERY
+CREATE TABLE iceberg_set_wrong_parquet_page_size2 ( i int)
+STORED AS ICEBERG;
+ALTER TABLE iceberg_set_wrong_parquet_page_size2 SET
+TBLPROPERTIES('write.parquet.page-size-bytes'='65535');
+---- CATCH
+Parquet page size for Iceberg table should fall in the range of [65536..1073741824]
+====
+---- QUERY
+CREATE TABLE iceberg_wrong_parquet_page_size3 ( i int)
+STORED AS ICEBERG
+TBLPROPERTIES('write.parquet.page-size-bytes'='655 36');
+---- CATCH
+Invalid parquet page size for Iceberg table: 655 36
+====
+---- QUERY
+CREATE TABLE iceberg_set_wrong_parquet_page_size3 ( i int)
+STORED AS ICEBERG;
+ALTER TABLE iceberg_set_wrong_parquet_page_size3 SET
+TBLPROPERTIES('write.parquet.page-size-bytes'='655 36');
+---- CATCH
+Invalid parquet page size for Iceberg table: 655 36
+====
+---- QUERY
+CREATE TABLE iceberg_wrong_parquet_page_size4 ( i int)
+STORED AS ICEBERG
+TBLPROPERTIES('write.parquet.page-size-bytes'='1073741825');
+---- CATCH
+Parquet page size for Iceberg table should fall in the range of [65536..1073741824]
+====
+---- QUERY
+CREATE TABLE iceberg_set_wrong_parquet_page_size4 ( i int)
+STORED AS ICEBERG;
+ALTER TABLE iceberg_set_wrong_parquet_page_size4 SET
+TBLPROPERTIES('write.parquet.page-size-bytes'='1073741825');
+---- CATCH
+Parquet page size for Iceberg table should fall in the range of [65536..1073741824]
+====
+---- QUERY
+CREATE TABLE iceberg_wrong_parquet_dict_size1 ( i int)
+STORED AS ICEBERG
+TBLPROPERTIES('write.format.default'='orc', 'write.parquet.dict-size-bytes'='65536');
+---- CATCH
+write.parquet.dict-size-bytes should be set only for parquet file format
+====
+---- QUERY
+CREATE TABLE iceberg_wrong_parquet_dict_size2 ( i int)
+STORED AS ICEBERG
+TBLPROPERTIES('write.parquet.dict-size-bytes'='65535');
+---- CATCH
+Parquet dictionary page size for Iceberg table should fall in the range of [65536..1073741824]
+====
+---- QUERY
+CREATE TABLE iceberg_set_wrong_parquet_dict_size2 ( i int)
+STORED AS ICEBERG;
+ALTER TABLE iceberg_set_wrong_parquet_dict_size2 SET
+TBLPROPERTIES('write.parquet.dict-size-bytes'='65535');
+---- CATCH
+Parquet dictionary page size for Iceberg table should fall in the range of [65536..1073741824]
+====
+---- QUERY
+CREATE TABLE iceberg_wrong_parquet_dict_size3 ( i int)
+STORED AS ICEBERG
+TBLPROPERTIES('write.parquet.dict-size-bytes'='655 36');
+---- CATCH
+Invalid parquet dictionary page size for Iceberg table: 655 36
+====
+---- QUERY
+CREATE TABLE iceberg_set_wrong_parquet_dict_size3 ( i int)
+STORED AS ICEBERG;
+ALTER TABLE iceberg_set_wrong_parquet_dict_size3 SET
+TBLPROPERTIES('write.parquet.dict-size-bytes'='655 36');
+---- CATCH
+Invalid parquet dictionary page size for Iceberg table: 655 36
+====
+---- QUERY
+CREATE TABLE iceberg_wrong_parquet_dict_size4 ( i int)
+STORED AS ICEBERG
+TBLPROPERTIES('write.parquet.dict-size-bytes'='1073741825');
+---- CATCH
+Parquet dictionary page size for Iceberg table should fall in the range of [65536..1073741824]
+====
+---- QUERY
+CREATE TABLE iceberg_set_wrong_parquet_dict_size4 ( i int)
+STORED AS ICEBERG;
+ALTER TABLE iceberg_set_wrong_parquet_dict_size4 SET
+TBLPROPERTIES('write.parquet.dict-size-bytes'='1073741825');
+---- CATCH
+Parquet dictionary page size for Iceberg table should fall in the range of [65536..1073741824]
+====
+---- QUERY
+CREATE TABLE iceberg_wrong_parquet_comp_codec1 ( i int)
+STORED AS ICEBERG
+TBLPROPERTIES('write.format.default'='orc',
+'write.parquet.compression-codec'='snappy');
+---- CATCH
+write.parquet.compression-codec should be set only for parquet file format
+====
+---- QUERY
+CREATE TABLE iceberg_wrong_parquet_comp_codec2 ( i int)
+STORED AS ICEBERG
+TBLPROPERTIES('write.parquet.compression-codec'='snapp');
+---- CATCH
+Invalid parquet compression codec for Iceberg table: snapp
+====
+---- QUERY
+CREATE TABLE iceberg_set_wrong_parquet_comp_codec2 ( i int)
+STORED AS ICEBERG;
+ALTER TABLE iceberg_set_wrong_parquet_comp_codec2 SET
+TBLPROPERTIES('write.parquet.compression-codec'='snapp');
+---- CATCH
+Invalid parquet compression codec for Iceberg table: snapp
+====
+---- QUERY
+CREATE TABLE iceberg_wrong_parquet_comp_level1 ( i int)
+STORED AS ICEBERG
+TBLPROPERTIES('write.format.default'='orc',
+'write.parquet.compression-level'='2');
+---- CATCH
+write.parquet.compression-level should be set only for parquet file format
+====
+---- QUERY
+CREATE TABLE iceberg_wrong_parquet_comp_level2 ( i int)
+STORED AS ICEBERG
+TBLPROPERTIES('write.parquet.compression-level'='2');
+---- CATCH
+Parquet compression level cannot be set for codec SNAPPY. Only ZSTD codec supports compression level table property.
+====
+---- QUERY
+CREATE TABLE iceberg_wrong_parquet_comp_level3 ( i int)
+STORED AS ICEBERG
+TBLPROPERTIES('write.parquet.compression-codec'='zstd',
+'write.parquet.compression-level'='0');
+---- CATCH
+Parquet compression level for Iceberg table should fall in the range of [1..22]
+====
+---- QUERY
+CREATE TABLE iceberg_set_wrong_parquet_comp_level4 ( i int)
+STORED AS ICEBERG;
+ALTER TABLE iceberg_set_wrong_parquet_comp_level4 SET
+TBLPROPERTIES('write.parquet.compression-level'='0');
+---- CATCH
+Parquet compression level for Iceberg table should fall in the range of [1..22]
+====
diff --git a/testdata/workloads/functional-query/queries/QueryTest/iceberg-query.test b/testdata/workloads/functional-query/queries/QueryTest/iceberg-query.test
index aaa5a5c..137904e 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/iceberg-query.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/iceberg-query.test
@@ -368,7 +368,7 @@ STRING,STRING,STRING
 describe formatted iceberg_partitioned;
 ---- RESULTS: VERIFY_IS_SUBSET
 'Location:           ','$NAMENODE/test-warehouse/iceberg_test/iceberg_partitioned','NULL'
-'','iceberg.file_format ','parquet             '
+'','write.format.default','parquet             '
 '','iceberg.catalog     ','hadoop.tables       '
 ---- TYPES
 string, string, string
@@ -377,7 +377,7 @@ string, string, string
 describe formatted iceberg_non_partitioned;
 ---- RESULTS: VERIFY_IS_SUBSET
 'Location:           ','$NAMENODE/test-warehouse/iceberg_test/iceberg_non_partitioned','NULL'
-'','iceberg.file_format ','parquet             '
+'','write.format.default','parquet             '
 '','iceberg.catalog     ','hadoop.tables       '
 ---- TYPES
 string, string, string
@@ -388,7 +388,7 @@ describe formatted hadoop_catalog_test_external;
 'Location:           ','$NAMENODE/test-warehouse/iceberg_test/hadoop_catalog/hadoop_catalog_test/functional_parquet/hadoop_catalog_test','NULL'
 '','iceberg.catalog_location','/test-warehouse/iceberg_test/hadoop_catalog/hadoop_catalog_test'
 '','iceberg.table_identifier','functional_parquet.hadoop_catalog_test'
-'','iceberg.file_format ','parquet             '
+'','write.format.default','parquet             '
 '','iceberg.catalog     ','hadoop.catalog      '
 ---- TYPES
 string, string, string
@@ -399,7 +399,7 @@ describe formatted iceberg_partitioned_orc_external;
 'Location:           ','$NAMENODE/test-warehouse/iceberg_test/hadoop_catalog/iceberg_partitioned_orc/functional_parquet/iceberg_partitioned_orc','NULL'
 '','iceberg.catalog_location','/test-warehouse/iceberg_test/hadoop_catalog/iceberg_partitioned_orc'
 '','iceberg.table_identifier','functional_parquet.iceberg_partitioned_orc'
-'','iceberg.file_format ','orc                 '
+'','write.format.default','orc                 '
 '','iceberg.catalog     ','hadoop.catalog      '
 ---- TYPES
 string, string, string
@@ -469,7 +469,7 @@ describe formatted iceberg_resolution_test_external;
 'Location:           ','$NAMENODE/test-warehouse/iceberg_test/hadoop_catalog/iceberg_resolution_test/functional_parquet/iceberg_resolution_test','NULL'
 '','iceberg.catalog_location','/test-warehouse/iceberg_test/hadoop_catalog/iceberg_resolution_test'
 '','iceberg.table_identifier','functional_parquet.iceberg_resolution_test'
-'','iceberg.file_format ','parquet             '
+'','write.format.default','parquet             '
 '','iceberg.catalog     ','hadoop.catalog      '
 ---- TYPES
 string, string, string
diff --git a/testdata/workloads/functional-query/queries/QueryTest/show-create-table.test b/testdata/workloads/functional-query/queries/QueryTest/show-create-table.test
index 5389a01..fd6ae89 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/show-create-table.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/show-create-table.test
@@ -561,69 +561,125 @@ CREATE TABLE iceberg_test1 (
   level STRING
 )
 STORED AS ICEBERG
-TBLPROPERTIES('iceberg.file_format'='parquet', 'iceberg.catalog'='hadoop.tables')
+TBLPROPERTIES('write.format.default'='parquet',
+    'write.parquet.compression-codec'='zstd',
+    'write.parquet.compression-level'='12',
+    'write.parquet.row-group-size-bytes'='134217728',
+    'write.parquet.page-size-bytes'='65536',
+    'write.parquet.dict-size-bytes'='131072',
+    'iceberg.catalog'='hadoop.tables')
 ---- RESULTS-HIVE
 CREATE TABLE show_create_table_test_db.iceberg_test1 (
   level STRING NULL
 )
 STORED AS ICEBERG
 LOCATION '$$location_uri$$'
-TBLPROPERTIES('iceberg.file_format'='parquet', 'iceberg.catalog'='hadoop.tables')
+TBLPROPERTIES('write.format.default'='parquet',
+    'write.parquet.compression-codec'='zstd',
+    'write.parquet.compression-level'='12',
+    'write.parquet.row-group-size-bytes'='134217728',
+    'write.parquet.page-size-bytes'='65536',
+    'write.parquet.dict-size-bytes'='131072',
+    'iceberg.catalog'='hadoop.tables')
 ---- RESULTS-HIVE-3
 CREATE EXTERNAL TABLE show_create_table_test_db.iceberg_test1 (
   level STRING NULL
 )
 STORED AS ICEBERG
 LOCATION '$$location_uri$$'
-TBLPROPERTIES ('external.table.purge'='TRUE', 'iceberg.file_format'='parquet',
-'iceberg.catalog'='hadoop.tables')
+TBLPROPERTIES ('external.table.purge'='TRUE',
+    'write.format.default'='parquet',
+    'write.parquet.compression-codec'='zstd',
+    'write.parquet.compression-level'='12',
+    'write.parquet.row-group-size-bytes'='134217728',
+    'write.parquet.page-size-bytes'='65536',
+    'write.parquet.dict-size-bytes'='131072',
+    'iceberg.catalog'='hadoop.tables')
 ====
 ---- CREATE_TABLE
 CREATE TABLE iceberg_test2 (
   level STRING
 )
 STORED AS ICEBERG
-TBLPROPERTIES('iceberg.file_format'='parquet', 'iceberg.catalog'='hadoop.catalog',
-'iceberg.catalog_location'='/test-warehouse/hadoop_catalog_test')
+TBLPROPERTIES('write.format.default'='parquet',
+    'write.parquet.compression-codec'='zstd',
+    'write.parquet.compression-level'='12',
+    'write.parquet.row-group-size-bytes'='134217728',
+    'write.parquet.page-size-bytes'='65536',
+    'write.parquet.dict-size-bytes'='131072',
+    'iceberg.catalog'='hadoop.catalog',
+    'iceberg.catalog_location'='/test-warehouse/hadoop_catalog_test')
 ---- RESULTS-HIVE
 CREATE TABLE show_create_table_test_db.iceberg_test2 (
   level STRING NULL
 )
 STORED AS ICEBERG
-TBLPROPERTIES('iceberg.file_format'='parquet', 'iceberg.catalog'='hadoop.catalog',
-'iceberg.catalog_location'='/test-warehouse/hadoop_catalog_test')
+TBLPROPERTIES('write.format.default'='parquet',
+    'write.parquet.compression-codec'='zstd',
+    'write.parquet.compression-level'='12',
+    'write.parquet.row-group-size-bytes'='134217728',
+    'write.parquet.page-size-bytes'='65536',
+    'write.parquet.dict-size-bytes'='131072',
+    'iceberg.catalog'='hadoop.catalog',
+    'iceberg.catalog_location'='/test-warehouse/hadoop_catalog_test')
 ---- RESULTS-HIVE-3
 CREATE EXTERNAL TABLE show_create_table_test_db.iceberg_test2 (
   level STRING NULL
 )
 STORED AS ICEBERG
-TBLPROPERTIES ('external.table.purge'='TRUE', 'iceberg.file_format'='parquet',
-'iceberg.catalog'='hadoop.catalog', 'iceberg.catalog_location'='/test-warehouse/hadoop_catalog_test')
+TBLPROPERTIES ('external.table.purge'='TRUE',
+    'write.format.default'='parquet',
+    'write.parquet.compression-codec'='zstd',
+    'write.parquet.compression-level'='12',
+    'write.parquet.row-group-size-bytes'='134217728',
+    'write.parquet.page-size-bytes'='65536',
+    'write.parquet.dict-size-bytes'='131072',
+    'iceberg.catalog'='hadoop.catalog',
+    'iceberg.catalog_location'='/test-warehouse/hadoop_catalog_test')
 ====
 ---- CREATE_TABLE
 CREATE TABLE iceberg_test3 (
   level STRING
 )
 STORED AS ICEBERG
-TBLPROPERTIES('iceberg.file_format'='parquet', 'iceberg.catalog'='hadoop.catalog',
-'iceberg.catalog_location'='/test-warehouse/hadoop_catalog_test',
-'iceberg.table_identifier'='org.my_db.my_table')
+TBLPROPERTIES('write.format.default'='parquet',
+    'write.parquet.compression-codec'='zstd',
+    'write.parquet.compression-level'='12',
+    'write.parquet.row-group-size-bytes'='134217728',
+    'write.parquet.page-size-bytes'='65536',
+    'write.parquet.dict-size-bytes'='131072',
+    'iceberg.catalog'='hadoop.catalog',
+    'iceberg.catalog_location'='/test-warehouse/hadoop_catalog_test',
+    'iceberg.table_identifier'='org.my_db.my_table')
 ---- RESULTS-HIVE
 CREATE TABLE show_create_table_test_db.iceberg_test3 (
   level STRING NULL
 )
 STORED AS ICEBERG
-TBLPROPERTIES('iceberg.file_format'='parquet', 'iceberg.catalog'='hadoop.catalog',
-'iceberg.catalog_location'='/test-warehouse/hadoop_catalog_test',
-'iceberg.table_identifier'='org.my_db.my_table')
+TBLPROPERTIES('write.format.default'='parquet',
+    'write.parquet.compression-codec'='zstd',
+    'write.parquet.compression-level'='12',
+    'write.parquet.row-group-size-bytes'='134217728',
+    'write.parquet.page-size-bytes'='65536',
+    'write.parquet.dict-size-bytes'='131072',
+    'iceberg.catalog'='hadoop.catalog',
+    'iceberg.catalog_location'='/test-warehouse/hadoop_catalog_test',
+    'iceberg.table_identifier'='org.my_db.my_table')
 ---- RESULTS-HIVE-3
 CREATE EXTERNAL TABLE show_create_table_test_db.iceberg_test3 (
   level STRING NULL
 )
 STORED AS ICEBERG
-TBLPROPERTIES ('external.table.purge'='TRUE', 'iceberg.file_format'='parquet',
-'iceberg.catalog'='hadoop.catalog', 'iceberg.catalog_location'='/test-warehouse/hadoop_catalog_test',
-'iceberg.table_identifier'='org.my_db.my_table')
+TBLPROPERTIES ('external.table.purge'='TRUE',
+    'write.format.default'='parquet',
+    'write.parquet.compression-codec'='zstd',
+    'write.parquet.compression-level'='12',
+    'write.parquet.row-group-size-bytes'='134217728',
+    'write.parquet.page-size-bytes'='65536',
+    'write.parquet.dict-size-bytes'='131072',
+    'iceberg.catalog'='hadoop.catalog',
+    'iceberg.catalog_location'='/test-warehouse/hadoop_catalog_test',
+    'iceberg.table_identifier'='org.my_db.my_table')
 ====
 ---- CREATE_TABLE
 CREATE TABLE iceberg_test1_partitioned (
@@ -643,7 +699,12 @@ PARTITIONED BY SPEC (
   TRUNCATE(5, p4)
 )
 STORED AS ICEBERG
-TBLPROPERTIES('iceberg.file_format'='parquet',
+TBLPROPERTIES('write.format.default'='parquet',
+    'write.parquet.compression-codec'='zstd',
+    'write.parquet.compression-level'='12',
+    'write.parquet.row-group-size-bytes'='134217728',
+    'write.parquet.page-size-bytes'='65536',
+    'write.parquet.dict-size-bytes'='131072',
     'iceberg.catalog'='hadoop.catalog',
     'iceberg.catalog_location'='/test-warehouse/hadoop_catalog_test')
 ---- RESULTS-HIVE-3
@@ -664,8 +725,13 @@ PARTITIONED BY SPEC (
   TRUNCATE(5, p4)
 )
 STORED AS ICEBERG
-TBLPROPERTIES ('external.table.purge'='TRUE', 'iceberg.catalog'='hadoop.catalog',
-    'iceberg.file_format'='parquet',
+TBLPROPERTIES ('external.table.purge'='TRUE',
+    'write.format.default'='parquet',
+    'write.parquet.compression-codec'='zstd',
+    'write.parquet.compression-level'='12',
+    'write.parquet.row-group-size-bytes'='134217728',
+    'write.parquet.page-size-bytes'='65536',
+    'write.parquet.dict-size-bytes'='131072',
     'iceberg.catalog'='hadoop.catalog',
     'iceberg.catalog_location'='/test-warehouse/hadoop_catalog_test')
 ====
@@ -674,21 +740,21 @@ CREATE TABLE iceberg_test_orc (
   level STRING
 )
 STORED AS ICEBERG
-TBLPROPERTIES('iceberg.file_format'='orc', 'iceberg.catalog'='hadoop.tables')
+TBLPROPERTIES('write.format.default'='orc', 'iceberg.catalog'='hadoop.tables')
 ---- RESULTS-HIVE
 CREATE TABLE show_create_table_test_db.iceberg_test_orc (
   level STRING NULL
 )
 STORED AS ICEBERG
 LOCATION '$$location_uri$$'
-TBLPROPERTIES('iceberg.file_format'='orc', 'iceberg.catalog'='hadoop.tables')
+TBLPROPERTIES('write.format.default'='orc', 'iceberg.catalog'='hadoop.tables')
 ---- RESULTS-HIVE-3
 CREATE EXTERNAL TABLE show_create_table_test_db.iceberg_test_orc (
   level STRING NULL
 )
 STORED AS ICEBERG
 LOCATION '$$location_uri$$'
-TBLPROPERTIES ('external.table.purge'='TRUE', 'iceberg.file_format'='orc',
+TBLPROPERTIES ('external.table.purge'='TRUE', 'write.format.default'='orc',
 'iceberg.catalog'='hadoop.tables')
 ====
 ---- CREATE_TABLE
@@ -703,7 +769,7 @@ CREATE EXTERNAL TABLE show_create_table_test_db.iceberg_default_tbl (
 )
 STORED AS ICEBERG
 LOCATION '$$location_uri$$'
-TBLPROPERTIES ('iceberg.file_format'='parquet',
+TBLPROPERTIES ('write.format.default'='parquet',
 'external.table.purge'='TRUE', 'table_type'='ICEBERG')
 ====
 ---- CREATE_TABLE
@@ -712,14 +778,14 @@ CREATE TABLE iceberg_default_tbl_orc (
   level STRING
 )
 STORED AS ICEBERG
-TBLPROPERTIES ('iceberg.file_format'='orc');
+TBLPROPERTIES ('write.format.default'='orc');
 ---- RESULTS-HIVE-3
 CREATE EXTERNAL TABLE show_create_table_test_db.iceberg_default_tbl_orc (
   level STRING NULL
 )
 STORED AS ICEBERG
 LOCATION '$$location_uri$$'
-TBLPROPERTIES ('iceberg.file_format'='orc',
+TBLPROPERTIES ('write.format.default'='orc',
 'external.table.purge'='TRUE', 'table_type'='ICEBERG')
 ====
 ---- CREATE_TABLE
@@ -735,7 +801,7 @@ CREATE EXTERNAL TABLE show_create_table_test_db.iceberg_hive_cat_explicit (
 )
 STORED AS ICEBERG
 LOCATION '$$location_uri$$'
-TBLPROPERTIES ('iceberg.file_format'='parquet', 'iceberg.catalog'='hive.catalog',
+TBLPROPERTIES ('write.format.default'='parquet', 'iceberg.catalog'='hive.catalog',
 'external.table.purge'='TRUE', 'table_type'='ICEBERG')
 ====
 ---- CREATE_TABLE
@@ -745,7 +811,13 @@ CREATE TABLE iceberg_nullable_test (
   register_time DATE
 )
 STORED AS ICEBERG
-TBLPROPERTIES('iceberg.file_format'='parquet', 'iceberg.catalog'='hadoop.tables')
+TBLPROPERTIES('write.format.default'='parquet',
+    'write.parquet.compression-codec'='zstd',
+    'write.parquet.compression-level'='12',
+    'write.parquet.row-group-size-bytes'='134217728',
+    'write.parquet.page-size-bytes'='65536',
+    'write.parquet.dict-size-bytes'='131072',
+    'iceberg.catalog'='hadoop.tables')
 ---- RESULTS-HIVE
 CREATE TABLE show_create_table_test_db.iceberg_nullable_test (
   level STRING NOT NULL,
@@ -754,7 +826,13 @@ CREATE TABLE show_create_table_test_db.iceberg_nullable_test (
 )
 STORED AS ICEBERG
 LOCATION '$$location_uri$$'
-TBLPROPERTIES('iceberg.file_format'='parquet', 'iceberg.catalog'='hadoop.tables')
+TBLPROPERTIES('write.format.default'='parquet',
+    'write.parquet.compression-codec'='zstd',
+    'write.parquet.compression-level'='12',
+    'write.parquet.row-group-size-bytes'='134217728',
+    'write.parquet.page-size-bytes'='65536',
+    'write.parquet.dict-size-bytes'='131072',
+    'iceberg.catalog'='hadoop.tables')
 ---- RESULTS-HIVE-3
 CREATE EXTERNAL TABLE show_create_table_test_db.iceberg_nullable_test (
   level STRING NOT NULL,
@@ -763,8 +841,14 @@ CREATE EXTERNAL TABLE show_create_table_test_db.iceberg_nullable_test (
 )
 STORED AS ICEBERG
 LOCATION '$$location_uri$$'
-TBLPROPERTIES ('external.table.purge'='TRUE', 'iceberg.file_format'='parquet',
-'iceberg.catalog'='hadoop.tables')
+TBLPROPERTIES ('external.table.purge'='TRUE',
+    'write.format.default'='parquet',
+    'write.parquet.compression-codec'='zstd',
+    'write.parquet.compression-level'='12',
+    'write.parquet.row-group-size-bytes'='134217728',
+    'write.parquet.page-size-bytes'='65536',
+    'write.parquet.dict-size-bytes'='131072',
+    'iceberg.catalog'='hadoop.tables')
 ====
 ---- CREATE_TABLE
 CREATE TABLE iceberg_old_style_partitions (i INT)
@@ -779,7 +863,7 @@ CREATE EXTERNAL TABLE show_create_table_test_db.iceberg_old_style_partitions (
 PARTITIONED BY SPEC (p, d)
 STORED AS ICEBERG
 LOCATION '$$location_uri$$'
-TBLPROPERTIES ('external.table.purge'='TRUE', 'iceberg.file_format'='parquet',
+TBLPROPERTIES ('external.table.purge'='TRUE', 'write.format.default'='parquet',
 'table_type'='ICEBERG')
 ====
 ---- CREATE_TABLE
@@ -796,7 +880,7 @@ CREATE EXTERNAL TABLE show_create_table_test_db.iceberg_ctas (
 PARTITIONED BY SPEC (BUCKET(5, id))
 STORED AS ICEBERG
 LOCATION '$$location_uri$$'
-TBLPROPERTIES ('external.table.purge'='TRUE', 'iceberg.file_format'='parquet',
+TBLPROPERTIES ('external.table.purge'='TRUE', 'write.format.default'='parquet',
 'table_type'='ICEBERG')
 ====
 ---- CREATE_TABLE
@@ -814,7 +898,7 @@ CREATE EXTERNAL TABLE show_create_table_test_db.iceberg_ctas_ht (
 PARTITIONED BY SPEC (BUCKET(5, id))
 STORED AS ICEBERG
 LOCATION '$$location_uri$$'
-TBLPROPERTIES ('external.table.purge'='TRUE', 'iceberg.file_format'='parquet',
+TBLPROPERTIES ('external.table.purge'='TRUE', 'write.format.default'='parquet',
 'iceberg.catalog'='hadoop.tables')
 ====
 ---- CREATE_TABLE
@@ -827,7 +911,7 @@ CREATE EXTERNAL TABLE show_create_table_test_db.iceberg_catalogs_hive (i INT NUL
 PARTITIONED BY SPEC (BUCKET(3, i))
 STORED AS ICEBERG
 LOCATION '$$location_uri$$'
-TBLPROPERTIES ('external.table.purge'='TRUE', 'iceberg.file_format'='parquet',
+TBLPROPERTIES ('external.table.purge'='TRUE', 'write.format.default'='parquet',
 'iceberg.catalog'='ice_hive_cat', 'table_type'='ICEBERG')
 ====
 ---- CREATE_TABLE
@@ -840,6 +924,6 @@ CREATE EXTERNAL TABLE show_create_table_test_db.iceberg_catalogs_hadoop (i INT N
 PARTITIONED BY SPEC (BUCKET(3, i))
 STORED AS ICEBERG
 LOCATION '$$location_uri$$'
-TBLPROPERTIES ('external.table.purge'='TRUE', 'iceberg.file_format'='parquet',
+TBLPROPERTIES ('external.table.purge'='TRUE', 'write.format.default'='parquet',
 'iceberg.catalog'='ice_hadoop_cat')
 ====