You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by st...@apache.org on 2024/01/28 00:27:09 UTC

(impala) branch master updated: IMPALA-12742: DELETE/UPDATE Iceberg table partitioned by DATE fails with error

This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new 595212b4e IMPALA-12742: DELETE/UPDATE Iceberg table partitioned by DATE fails with error
595212b4e is described below

commit 595212b4ea4f0cabd2504090626f5753f8feb865
Author: Zoltan Borok-Nagy <bo...@cloudera.com>
AuthorDate: Thu Jan 25 18:13:20 2024 +0100

    IMPALA-12742: DELETE/UPDATE Iceberg table partitioned by DATE fails with error
    
    Iceberg tables can be identity partitioned by any type, e.g. int, date
    and even float. If a table is partitioned, the file path contains the
    partition value in human readable form, and this form is expected to
    be passed to CatalogD. When an UPDATE or DELETE command is executed,
    we don't transform the integer date value to human readable format,
    which causes errors in CatalogD.
    
    With this patch, we transform identity-partitioned date values to
    human-readable format.
    
    Note on floating point numbers:
    When users partition their data via floating point values (users should
    not do that), then the file paths created for delete files might not
    correspond to the data files (e.g. '1.1' vs '1.100000023841858'). Though
    the values are the same in the Iceberg metadata layer, so it doesn't
    cause correctness issues.
    
    Testing:
     * added e2e tests for DELETEs
     * added e2e tests for UPDATEs
    
    Change-Id: I506f95527e741efe18c71706e2cdea51b45958b8
    Reviewed-on: http://gerrit.cloudera.org:8080/20954
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/exec/iceberg-delete-sink-base.cc            | 68 ++++++++++++++++------
 be/src/exec/iceberg-delete-sink-base.h             |  7 ++-
 be/src/exec/table-sink-base.cc                     |  2 +-
 be/src/runtime/descriptors.cc                      |  3 +-
 be/src/runtime/descriptors.h                       | 12 +---
 common/thrift/CatalogObjects.thrift                |  1 +
 .../impala/analysis/IcebergPartitionField.java     | 13 ++++-
 .../org/apache/impala/catalog/FeIcebergTable.java  | 15 +++--
 .../org/apache/impala/catalog/IcebergTable.java    |  3 +-
 .../java/org/apache/impala/catalog/ScalarType.java | 25 ++++----
 .../main/java/org/apache/impala/catalog/Type.java  | 34 ++++++-----
 .../impala/catalog/iceberg/IcebergCtasTarget.java  |  2 +-
 .../org/apache/impala/util/IcebergUtilTest.java    |  6 +-
 .../QueryTest/iceberg-delete-partitioned.test      | 20 +++++++
 .../QueryTest/iceberg-update-partitions.test       | 23 ++++++++
 15 files changed, 160 insertions(+), 74 deletions(-)

diff --git a/be/src/exec/iceberg-delete-sink-base.cc b/be/src/exec/iceberg-delete-sink-base.cc
index 2ad64589e..eaadec4c0 100644
--- a/be/src/exec/iceberg-delete-sink-base.cc
+++ b/be/src/exec/iceberg-delete-sink-base.cc
@@ -62,12 +62,47 @@ Status IcebergDeleteSinkBase::Open(RuntimeState* state) {
   return Status::OK();
 }
 
+inline bool IsDate(const TScalarType& tscalar) {
+  return tscalar.type == TPrimitiveType::DATE;
+}
+
+inline bool IsTimestamp(const TScalarType& tscalar) {
+  return tscalar.type == TPrimitiveType::TIMESTAMP;
+}
+
+inline bool IsDateTime(const TScalarType& tscalar) {
+  return tscalar.type == TPrimitiveType::DATETIME;
+}
+
 std::string IcebergDeleteSinkBase::HumanReadablePartitionValue(
-    TIcebergPartitionTransformType::type transform_type, const std::string& value,
+    const TIcebergPartitionField& part_field, const std::string& value,
     Status* transform_result) {
-  if (!iceberg::IsTimeBasedPartition(transform_type) ||
-    value == table_desc_->null_partition_key_value()) {
-    *transform_result = Status::OK();
+  *transform_result = Status::OK();
+  TIcebergPartitionTransformType::type transform_type =
+      part_field.transform.transform_type;
+  if (value == table_desc_->null_partition_key_value()) {
+    // We don't need to transfrom NULL values.
+    return value;
+  }
+  if (transform_type == TIcebergPartitionTransformType::IDENTITY) {
+    const TScalarType& scalar_type = part_field.type;
+    // Timestamp and DateTime (not even implemented yet) types are not supported for
+    // IDENTITY-partitioning.
+    if (IsTimestamp(scalar_type) || IsDateTime(scalar_type)) {
+      *transform_result = Status(Substitute(
+        "Unsupported type for IDENTITY-partitioning: $0", to_string(scalar_type.type)));
+      return value;
+    }
+    if (IsDate(scalar_type)) {
+      // With IDENTITY partitioning, only DATEs are problematic, because DATEs are stored
+      // as an offset from the unix epoch. So we need to handle the values similarly to
+      // the DAY-transformed values.
+      return iceberg::HumanReadableTime(
+          TIcebergPartitionTransformType::DAY, value, transform_result);
+    }
+  }
+  if (!iceberg::IsTimeBasedPartition(transform_type)) {
+    // Don't need to convert values of non-time transforms.
     return value;
   }
   return iceberg::HumanReadableTime(transform_type, value, transform_result);
@@ -105,13 +140,11 @@ Status IcebergDeleteSinkBase::ConstructPartitionInfo(int32_t spec_id,
   }
   output_partition->iceberg_spec_id = spec_id;
 
-  vector<string> non_void_partition_names;
-  vector<TIcebergPartitionTransformType::type> non_void_partition_transforms;
+  vector<TIcebergPartitionField> non_void_partition_fields;
   if (LIKELY(spec_id == table_desc_->IcebergSpecId())) {
     // If 'spec_id' is the default spec id, then just copy the already populated
-    // non void partition names and transforms.
-    non_void_partition_names = table_desc_->IcebergNonVoidPartitionNames();
-    non_void_partition_transforms = table_desc_->IcebergNonVoidPartitionTransforms();
+    // non void partition fields.
+    non_void_partition_fields = table_desc_->IcebergNonVoidPartitionFields();
   } else {
     // Otherwise collect the non-void partition names belonging to 'spec_id'.
     const TIcebergPartitionSpec& partition_spec =
@@ -119,13 +152,12 @@ Status IcebergDeleteSinkBase::ConstructPartitionInfo(int32_t spec_id,
     for (const TIcebergPartitionField& spec_field : partition_spec.partition_fields) {
       auto transform_type = spec_field.transform.transform_type;
       if (transform_type != TIcebergPartitionTransformType::VOID) {
-        non_void_partition_names.push_back(spec_field.field_name);
-        non_void_partition_transforms.push_back(transform_type);
+        non_void_partition_fields.push_back(spec_field);
       }
     }
   }
 
-  if (non_void_partition_names.empty()) {
+  if (non_void_partition_fields.empty()) {
     DCHECK(partition_values_str.empty());
     return Status::OK();
   }
@@ -142,23 +174,21 @@ Status IcebergDeleteSinkBase::ConstructPartitionInfo(int32_t spec_id,
     partition_values_decoded.push_back(std::move(decoded_val));
   }
 
-  DCHECK_EQ(partition_values_decoded.size(), non_void_partition_names.size());
-  DCHECK_EQ(partition_values_decoded.size(), non_void_partition_transforms.size());
+  DCHECK_EQ(partition_values_decoded.size(), non_void_partition_fields.size());
 
   stringstream url_encoded_partition_name_ss;
 
   for (int i = 0; i < partition_values_decoded.size(); ++i) {
-    auto transform_type = non_void_partition_transforms[i];
+    const TIcebergPartitionField& part_field = non_void_partition_fields[i];
     stringstream raw_partition_key_value_ss;
     stringstream url_encoded_partition_key_value_ss;
 
-    raw_partition_key_value_ss << non_void_partition_names[i] << "=";
-    url_encoded_partition_key_value_ss << non_void_partition_names[i] << "=";
+    raw_partition_key_value_ss << part_field.field_name << "=";
+    url_encoded_partition_key_value_ss << part_field.field_name << "=";
 
     string& value_str = partition_values_decoded[i];
     Status transform_status;
-    value_str = HumanReadablePartitionValue(
-        transform_type, value_str, &transform_status);
+    value_str = HumanReadablePartitionValue(part_field, value_str, &transform_status);
     if (!transform_status.ok()) return transform_status;
     raw_partition_key_value_ss << value_str;
 
diff --git a/be/src/exec/iceberg-delete-sink-base.h b/be/src/exec/iceberg-delete-sink-base.h
index e4fe15547..f1e19481b 100644
--- a/be/src/exec/iceberg-delete-sink-base.h
+++ b/be/src/exec/iceberg-delete-sink-base.h
@@ -53,12 +53,13 @@ class IcebergDeleteSinkBase : public TableSinkBase {
   /// Returns the human-readable representation of a partition transform value. It is used
   /// to create the file paths. IcebergUtil.partitionDataFromDataFile() also expects
   /// partition values in this representation.
-  /// E.g. if 'transform_type' is MONTH and 'value' is "7" this function returns
-  /// "1970-08".
+  /// E.g. if 'part_field' has transform MONTH and 'value' is "7" this function returns
+  /// "1970-08". If 'part_field' has transform IDENTITY but the column is DATE we also
+  /// need to transform the partition value to a human-readable format.
   /// Parse errors are set in 'transform_result'. If it is not OK, the return value
   /// of this function does not contain any meaningful value.
   std::string HumanReadablePartitionValue(
-      TIcebergPartitionTransformType::type transform_type, const std::string& value,
+      const TIcebergPartitionField& part_field, const std::string& value,
       Status* transform_result);
 };
 
diff --git a/be/src/exec/table-sink-base.cc b/be/src/exec/table-sink-base.cc
index 4fc9cf7b9..c21ed002a 100644
--- a/be/src/exec/table-sink-base.cc
+++ b/be/src/exec/table-sink-base.cc
@@ -98,7 +98,7 @@ Status TableSinkBase::ClosePartitionFile(
 string TableSinkBase::GetPartitionName(int i) {
   if (IsIceberg()) {
     DCHECK_LT(i, partition_key_expr_evals_.size());
-    return table_desc_->IcebergNonVoidPartitionNames()[i];
+    return table_desc_->IcebergNonVoidPartitionFields()[i].field_name;
   } else {
     DCHECK_LT(i, table_desc_->num_clustering_cols());
     return table_desc_->col_descs()[i].name();
diff --git a/be/src/runtime/descriptors.cc b/be/src/runtime/descriptors.cc
index c3567fdba..1d472c0df 100644
--- a/be/src/runtime/descriptors.cc
+++ b/be/src/runtime/descriptors.cc
@@ -264,8 +264,7 @@ HdfsTableDescriptor::HdfsTableDescriptor(const TTableDescriptor& tdesc, ObjectPo
     for (const TIcebergPartitionField& spec_field : spec.partition_fields) {
       auto transform_type = spec_field.transform.transform_type;
       if (transform_type == TIcebergPartitionTransformType::VOID) continue;
-      iceberg_non_void_partition_names_.push_back(spec_field.field_name);
-      iceberg_non_void_partition_transforms_.push_back(transform_type);
+      iceberg_non_void_partition_fields_.push_back(spec_field);
     }
     iceberg_parquet_compression_codec_ = tdesc.icebergTable.parquet_compression_codec;
     iceberg_parquet_row_group_size_ = tdesc.icebergTable.parquet_row_group_size;
diff --git a/be/src/runtime/descriptors.h b/be/src/runtime/descriptors.h
index 35e585315..c3f5058f1 100644
--- a/be/src/runtime/descriptors.h
+++ b/be/src/runtime/descriptors.h
@@ -476,12 +476,8 @@ class HdfsTableDescriptor : public TableDescriptor {
   const std::vector<TIcebergPartitionSpec>& IcebergPartitionSpecs() const {
     return iceberg_partition_specs_;
   }
-  const std::vector<std::string>& IcebergNonVoidPartitionNames() const {
-    return iceberg_non_void_partition_names_;
-  }
-  const std::vector<TIcebergPartitionTransformType::type>&
-      IcebergNonVoidPartitionTransforms() const {
-    return iceberg_non_void_partition_transforms_;
+  const std::vector<TIcebergPartitionField>& IcebergNonVoidPartitionFields() const {
+    return iceberg_non_void_partition_fields_;
   }
   const TCompressionCodec& IcebergParquetCompressionCodec() const {
     return iceberg_parquet_compression_codec_;
@@ -517,9 +513,7 @@ class HdfsTableDescriptor : public TableDescriptor {
   bool is_iceberg_ = false;
   std::string iceberg_table_location_;
   std::vector<TIcebergPartitionSpec> iceberg_partition_specs_;
-  std::vector<std::string> iceberg_non_void_partition_names_;
-  std::vector<TIcebergPartitionTransformType::type>
-      iceberg_non_void_partition_transforms_;
+  std::vector<TIcebergPartitionField> iceberg_non_void_partition_fields_;
   TCompressionCodec iceberg_parquet_compression_codec_;
   int64_t iceberg_parquet_row_group_size_;
   int64_t iceberg_parquet_plain_page_size_;
diff --git a/common/thrift/CatalogObjects.thrift b/common/thrift/CatalogObjects.thrift
index 4fc84476e..158ee9385 100644
--- a/common/thrift/CatalogObjects.thrift
+++ b/common/thrift/CatalogObjects.thrift
@@ -609,6 +609,7 @@ struct TIcebergPartitionField {
   3: required string orig_field_name
   4: required string field_name
   5: required TIcebergPartitionTransform transform
+  6: required Types.TScalarType type
 }
 
 struct TIcebergPartitionSpec {
diff --git a/fe/src/main/java/org/apache/impala/analysis/IcebergPartitionField.java b/fe/src/main/java/org/apache/impala/analysis/IcebergPartitionField.java
index 2e9915a9e..4ccd53ae0 100644
--- a/fe/src/main/java/org/apache/impala/analysis/IcebergPartitionField.java
+++ b/fe/src/main/java/org/apache/impala/analysis/IcebergPartitionField.java
@@ -17,6 +17,9 @@
 
 package org.apache.impala.analysis;
 
+import com.google.common.base.Preconditions;
+import org.apache.impala.catalog.ScalarType;
+import org.apache.impala.catalog.Type;
 import org.apache.impala.common.AnalysisException;
 import org.apache.impala.thrift.TIcebergPartitionField;
 import org.apache.impala.thrift.TIcebergPartitionTransformType;
@@ -43,18 +46,23 @@ public class IcebergPartitionField extends StmtNode {
   // Partition transform type and transform param for this partition field.
   private IcebergPartitionTransform transform_;
 
+  // Result type of the partition field. Must be a primitive type.
+  private ScalarType type_;
+
   public IcebergPartitionField(int sourceId, int fieldId, String origFieldName,
-      String fieldName, IcebergPartitionTransform transform) {
+      String fieldName, IcebergPartitionTransform transform, Type type) {
+    Preconditions.checkState(type.isScalarType());
     sourceId_ = sourceId;
     fieldId_ = fieldId;
     origFieldName_ = origFieldName;
     fieldName_ = fieldName;
     transform_ = transform;
+    type_ = (ScalarType)type;
   }
 
   // This constructor is called when creating a partitioned Iceberg table.
   public IcebergPartitionField(String fieldName, IcebergPartitionTransform transform) {
-    this(0, 0, fieldName, fieldName, transform);
+    this(0, 0, fieldName, fieldName, transform, Type.NULL);
   }
 
   public String getFieldName() {
@@ -97,6 +105,7 @@ public class IcebergPartitionField extends StmtNode {
     result.setOrig_field_name(origFieldName_);
     result.setField_name(fieldName_);
     result.setTransform(transform_.toThrift());
+    result.setType(type_.toTScalarType());
     return result;
   }
 }
diff --git a/fe/src/main/java/org/apache/impala/catalog/FeIcebergTable.java b/fe/src/main/java/org/apache/impala/catalog/FeIcebergTable.java
index d04cf2530..c839f80bb 100644
--- a/fe/src/main/java/org/apache/impala/catalog/FeIcebergTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/FeIcebergTable.java
@@ -53,6 +53,7 @@ import org.apache.iceberg.Snapshot;
 import org.apache.iceberg.SnapshotSummary;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.transforms.Transform;
 import org.apache.iceberg.util.PropertyUtil;
 import org.apache.iceberg.util.SnapshotUtil;
 import org.apache.impala.analysis.IcebergPartitionField;
@@ -87,6 +88,7 @@ import org.apache.impala.thrift.TNetworkAddress;
 import org.apache.impala.thrift.TResultSet;
 import org.apache.impala.thrift.TResultSetMetadata;
 import org.apache.impala.util.HdfsCachingUtil;
+import org.apache.impala.util.IcebergSchemaConverter;
 import org.apache.impala.util.IcebergUtil;
 import org.apache.impala.util.ListMap;
 import org.apache.impala.util.TResultRowBuilder;
@@ -889,21 +891,24 @@ public interface FeIcebergTable extends FeFsTable {
     public static List<IcebergPartitionSpec> loadPartitionSpecByIceberg(
         FeIcebergTable table) throws ImpalaRuntimeException {
       List<IcebergPartitionSpec> ret = new ArrayList<>();
-      for (PartitionSpec spec : table.getIcebergApiTable().specs().values()) {
-        ret.add(convertPartitionSpec(spec));
+      Table iceApiTable = table.getIcebergApiTable();
+      for (PartitionSpec spec : iceApiTable.specs().values()) {
+        ret.add(convertPartitionSpec(iceApiTable.schema(), spec));
       }
       return ret;
     }
 
-    public static IcebergPartitionSpec convertPartitionSpec(PartitionSpec spec)
-        throws ImpalaRuntimeException {
+    public static IcebergPartitionSpec convertPartitionSpec(Schema schema,
+        PartitionSpec spec) throws ImpalaRuntimeException {
       List<IcebergPartitionField> fields = new ArrayList<>();
       Map<String, Integer> transformParams =
           IcebergUtil.getPartitionTransformParams(spec);
       for (PartitionField field : spec.fields()) {
         fields.add(new IcebergPartitionField(field.sourceId(), field.fieldId(),
             spec.schema().findColumnName(field.sourceId()), field.name(),
-            IcebergUtil.getPartitionTransform(field, transformParams)));
+            IcebergUtil.getPartitionTransform(field, transformParams),
+            IcebergSchemaConverter.toImpalaType(
+                field.transform().getResultType(schema.findType(field.sourceId())))));
       }
       return new IcebergPartitionSpec(spec.specId(), fields);
     }
diff --git a/fe/src/main/java/org/apache/impala/catalog/IcebergTable.java b/fe/src/main/java/org/apache/impala/catalog/IcebergTable.java
index d789e12c5..4e927c06c 100644
--- a/fe/src/main/java/org/apache/impala/catalog/IcebergTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/IcebergTable.java
@@ -559,7 +559,8 @@ public class IcebergTable extends Table implements FeIcebergTable {
           fields.add(new IcebergPartitionField(field.getSource_id(), field.getField_id(),
               field.getOrig_field_name(), field.getField_name(),
               new IcebergPartitionTransform(field.getTransform().getTransform_type(),
-                  transformParam)));
+                  transformParam),
+              Type.fromTScalarType(field.getType())));
         }
         ret.add(new IcebergPartitionSpec(param.getSpec_id(),
             fields));
diff --git a/fe/src/main/java/org/apache/impala/catalog/ScalarType.java b/fe/src/main/java/org/apache/impala/catalog/ScalarType.java
index 12f7a63f0..f466e2530 100644
--- a/fe/src/main/java/org/apache/impala/catalog/ScalarType.java
+++ b/fe/src/main/java/org/apache/impala/catalog/ScalarType.java
@@ -215,38 +215,37 @@ public class ScalarType extends Type {
     return StringUtils.repeat(' ', lpad) + toSql();
   }
 
-  @Override
-  public void toThrift(TColumnType container) {
-    TTypeNode node = new TTypeNode();
-    container.types.add(node);
+  public TScalarType toTScalarType() {
+    TScalarType scalarType = new TScalarType();
     switch(type_) {
       case VARCHAR:
       case CHAR:
       case FIXED_UDA_INTERMEDIATE: {
-        node.setType(TTypeNodeType.SCALAR);
-        TScalarType scalarType = new TScalarType();
         scalarType.setType(type_.toThrift());
         scalarType.setLen(len_);
-        node.setScalar_type(scalarType);
         break;
       }
       case DECIMAL: {
-        node.setType(TTypeNodeType.SCALAR);
-        TScalarType scalarType = new TScalarType();
         scalarType.setType(type_.toThrift());
         scalarType.setScale(scale_);
         scalarType.setPrecision(precision_);
-        node.setScalar_type(scalarType);
         break;
       }
       default: {
-        node.setType(TTypeNodeType.SCALAR);
-        TScalarType scalarType = new TScalarType();
         scalarType.setType(type_.toThrift());
-        node.setScalar_type(scalarType);
         break;
       }
     }
+    return scalarType;
+  }
+
+  @Override
+  public void toThrift(TColumnType container) {
+    TTypeNode node = new TTypeNode();
+    container.types.add(node);
+    TScalarType scalarType = toTScalarType();
+    node.setType(TTypeNodeType.SCALAR);
+    node.setScalar_type(scalarType);
   }
 
   public int decimalPrecision() {
diff --git a/fe/src/main/java/org/apache/impala/catalog/Type.java b/fe/src/main/java/org/apache/impala/catalog/Type.java
index b30036ae3..f0246d2f9 100644
--- a/fe/src/main/java/org/apache/impala/catalog/Type.java
+++ b/fe/src/main/java/org/apache/impala/catalog/Type.java
@@ -434,6 +434,23 @@ public abstract class Type {
     return t.first;
   }
 
+  public static Type fromTScalarType(TScalarType scalarType) {
+    if (scalarType.getType() == TPrimitiveType.CHAR) {
+      Preconditions.checkState(scalarType.isSetLen());
+      return ScalarType.createCharType(scalarType.getLen());
+    } else if (scalarType.getType() == TPrimitiveType.VARCHAR) {
+      Preconditions.checkState(scalarType.isSetLen());
+      return ScalarType.createVarcharType(scalarType.getLen());
+    } else if (scalarType.getType() == TPrimitiveType.DECIMAL) {
+      Preconditions.checkState(scalarType.isSetPrecision()
+          && scalarType.isSetScale());
+      return ScalarType.createDecimalType(scalarType.getPrecision(),
+          scalarType.getScale());
+    } else {
+      return ScalarType.createType(PrimitiveType.fromThrift(scalarType.getType()));
+    }
+  }
+
   /**
    * Constructs a ColumnType rooted at the TTypeNode at nodeIdx in TColumnType.
    * Returned pair: The resulting ColumnType and the next nodeIdx that is not a child
@@ -445,22 +462,7 @@ public abstract class Type {
     switch (node.getType()) {
       case SCALAR: {
         Preconditions.checkState(node.isSetScalar_type());
-        TScalarType scalarType = node.getScalar_type();
-        if (scalarType.getType() == TPrimitiveType.CHAR) {
-          Preconditions.checkState(scalarType.isSetLen());
-          type = ScalarType.createCharType(scalarType.getLen());
-        } else if (scalarType.getType() == TPrimitiveType.VARCHAR) {
-          Preconditions.checkState(scalarType.isSetLen());
-          type = ScalarType.createVarcharType(scalarType.getLen());
-        } else if (scalarType.getType() == TPrimitiveType.DECIMAL) {
-          Preconditions.checkState(scalarType.isSetPrecision()
-              && scalarType.isSetScale());
-          type = ScalarType.createDecimalType(scalarType.getPrecision(),
-              scalarType.getScale());
-        } else {
-          type = ScalarType.createType(
-              PrimitiveType.fromThrift(scalarType.getType()));
-        }
+        type = fromTScalarType(node.getScalar_type());
         ++nodeIdx;
         break;
       }
diff --git a/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergCtasTarget.java b/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergCtasTarget.java
index 4edd9271a..5ceeefd07 100644
--- a/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergCtasTarget.java
+++ b/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergCtasTarget.java
@@ -141,7 +141,7 @@ public class IcebergCtasTarget extends CtasTargetTable implements FeIcebergTable
         "Exception caught during generating Iceberg schema:", ex);
     }
     IcebergPartitionSpec resolvedIcebergSpec =
-        FeIcebergTable.Utils.convertPartitionSpec(iceSpec);
+        FeIcebergTable.Utils.convertPartitionSpec(iceSchema_, iceSpec);
     partitionSpecs_.add(resolvedIcebergSpec);
   }
 
diff --git a/fe/src/test/java/org/apache/impala/util/IcebergUtilTest.java b/fe/src/test/java/org/apache/impala/util/IcebergUtilTest.java
index 3482c7d41..0090dd75c 100644
--- a/fe/src/test/java/org/apache/impala/util/IcebergUtilTest.java
+++ b/fe/src/test/java/org/apache/impala/util/IcebergUtilTest.java
@@ -305,7 +305,8 @@ public class IcebergUtilTest {
       IcebergPartitionTransform icebergPartitionTransform =
           new IcebergPartitionTransform(TIcebergPartitionTransformType.IDENTITY);
       IcebergPartitionField field =
-          new IcebergPartitionField(id, 106, "name", "name", icebergPartitionTransform);
+          new IcebergPartitionField(id, 106, "name", "name", icebergPartitionTransform,
+              column.getType());
       ImmutableList<IcebergPartitionField> fieldList = ImmutableList.of(field);
       IcebergPartitionSpec icebergPartitionSpec = new IcebergPartitionSpec(4, fieldList);
       assertTrue(isPartitionColumn(column, icebergPartitionSpec));
@@ -319,7 +320,8 @@ public class IcebergUtilTest {
       IcebergPartitionTransform icebergPartitionTransform =
           new IcebergPartitionTransform(TIcebergPartitionTransformType.IDENTITY);
       IcebergPartitionField field =
-          new IcebergPartitionField(107, 106, "name", "name", icebergPartitionTransform);
+          new IcebergPartitionField(107, 106, "name", "name", icebergPartitionTransform,
+              column.getType());
       ImmutableList<IcebergPartitionField> fieldList = ImmutableList.of(field);
       IcebergPartitionSpec icebergPartitionSpec = new IcebergPartitionSpec(4, fieldList);
       assertFalse(isPartitionColumn(column, icebergPartitionSpec));
diff --git a/testdata/workloads/functional-query/queries/QueryTest/iceberg-delete-partitioned.test b/testdata/workloads/functional-query/queries/QueryTest/iceberg-delete-partitioned.test
index f2809539b..0e8d8e973 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/iceberg-delete-partitioned.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/iceberg-delete-partitioned.test
@@ -783,3 +783,23 @@ row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/numeric_truncate/data/int_col_t
 ---- TYPES
 STRING, STRING, STRING, STRING
 ====
+---- QUERY
+create table ice_alltypes_part_v2 (i INT NULL, p_bool BOOLEAN NULL, p_int INT NULL, p_bigint BIGINT NULL,
+    p_float FLOAT NULL, p_double DOUBLE NULL, p_decimal DECIMAL(6,3) NULL, p_date DATE NULL, p_string STRING NULL)
+PARTITIONED BY SPEC (p_bool, p_int, p_bigint, p_float, p_double, p_decimal, p_date, p_string)
+STORED AS ICEBERG
+TBLPROPERTIES ('format-version'='2');
+insert into ice_alltypes_part_v2 select * from functional_parquet.iceberg_alltypes_part;
+---- DML_RESULTS: ice_alltypes_part_v2
+1,true,1,11,1.100000023841858,2.222,123.321,2022-02-22,'impala'
+2,true,1,11,1.100000023841858,2.222,123.321,2022-02-22,'impala'
+---- TYPES
+INT, BOOLEAN, INT, BIGINT, FLOAT, DOUBLE, DECIMAL, DATE, STRING
+====
+---- QUERY
+delete from ice_alltypes_part_v2 where i=1;
+---- DML_RESULTS: ice_alltypes_part_v2
+2,true,1,11,1.100000023841858,2.222,123.321,2022-02-22,'impala'
+---- TYPES
+INT, BOOLEAN, INT, BIGINT, FLOAT, DOUBLE, DECIMAL, DATE, STRING
+====
diff --git a/testdata/workloads/functional-query/queries/QueryTest/iceberg-update-partitions.test b/testdata/workloads/functional-query/queries/QueryTest/iceberg-update-partitions.test
index cb3becfa6..5cd376882 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/iceberg-update-partitions.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/iceberg-update-partitions.test
@@ -458,3 +458,26 @@ UPDATE numeric_truncate SET dec_10_2 = 75.20, dec_8_0 = 531, bigint_col = 2111,
 ---- TYPES
 INT,INT,BIGINT,DECIMAL,DECIMAL
 ====
+---- QUERY
+create table ice_alltypes_part_v2 (i INT NULL, p_bool BOOLEAN NULL, p_int INT NULL, p_bigint BIGINT NULL,
+    p_float FLOAT NULL, p_double DOUBLE NULL, p_decimal DECIMAL(6,3) NULL, p_date DATE NULL, p_string STRING NULL)
+PARTITIONED BY SPEC (p_bool, p_int, p_bigint, p_float, p_double, p_decimal, p_date, p_string)
+STORED AS ICEBERG
+TBLPROPERTIES ('format-version'='2');
+insert into ice_alltypes_part_v2 select * from functional_parquet.iceberg_alltypes_part;
+---- DML_RESULTS: ice_alltypes_part_v2
+1,true,1,11,1.100000023841858,2.222,123.321,2022-02-22,'impala'
+2,true,1,11,1.100000023841858,2.222,123.321,2022-02-22,'impala'
+---- TYPES
+INT, BOOLEAN, INT, BIGINT, FLOAT, DOUBLE, DECIMAL, DATE, STRING
+====
+---- QUERY
+update ice_alltypes_part_v2 set i = cast(i + 1 as int);
+update ice_alltypes_part_v2 set p_int = i;
+update ice_alltypes_part_v2 set p_date = add_months(p_date, i);
+---- DML_RESULTS: ice_alltypes_part_v2
+2,true,2,11,1.100000023841858,2.222,123.321,2022-04-22,'impala'
+3,true,3,11,1.100000023841858,2.222,123.321,2022-05-22,'impala'
+---- TYPES
+INT, BOOLEAN, INT, BIGINT, FLOAT, DOUBLE, DECIMAL, DATE, STRING
+====