You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by bo...@apache.org on 2022/12/15 14:11:08 UTC

[impala] branch master updated (a551ed5e7 -> 33929bfcc)

This is an automated email from the ASF dual-hosted git repository.

boroknagyz pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


    from a551ed5e7 IMPALA-11800: Pin versions:set to 2.13.0 to avoid regression
     new 05a4b778d IMPALA-11339: Add Iceberg LOAD DATA INPATH statement
     new 33929bfcc IMPALA-11787, IMPALA-11516: Cardinality estimate for UNION in Iceberg position-delete plans can double the actual table cardinality

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 be/src/service/client-request-state.cc             |  53 ++++
 be/src/service/client-request-state.h              |   3 +
 common/fbs/IcebergObjects.fbs                      |   1 +
 common/thrift/Frontend.thrift                      |  23 ++
 .../org/apache/impala/analysis/LoadDataStmt.java   | 105 ++++++-
 .../apache/impala/analysis/QueryStringBuilder.java | 155 +++++++++++
 .../org/apache/impala/common/FileSystemUtil.java   |  22 +-
 .../org/apache/impala/planner/HdfsScanNode.java    |  16 +-
 .../org/apache/impala/planner/IcebergScanNode.java |  63 ++++-
 .../apache/impala/planner/IcebergScanPlanner.java  |  80 +++++-
 .../java/org/apache/impala/service/Frontend.java   |  50 +++-
 .../java/org/apache/impala/util/IcebergUtil.java   |  23 +-
 .../apache/impala/analysis/AnalyzeStmtsTest.java   |  16 ++
 .../org/apache/impala/planner/PlannerTest.java     |   3 +-
 .../org/apache/impala/planner/PlannerTestBase.java |   5 +
 .../queries/PlannerTest/iceberg-v2-tables.test     | 307 +++++++++++++++++----
 .../queries/PlannerTest/tablesample.test           |  12 +-
 .../queries/QueryTest/iceberg-load.test            | 142 ++++++++++
 tests/query_test/test_iceberg.py                   |  52 +++-
 19 files changed, 1026 insertions(+), 105 deletions(-)
 create mode 100644 fe/src/main/java/org/apache/impala/analysis/QueryStringBuilder.java
 create mode 100644 testdata/workloads/functional-query/queries/QueryTest/iceberg-load.test


[impala] 02/02: IMPALA-11787, IMPALA-11516: Cardinality estimate for UNION in Iceberg position-delete plans can double the actual table cardinality

Posted by bo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

boroknagyz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 33929bfccc995c22890ef8783d5f4671ef30bcae
Author: Zoltan Borok-Nagy <bo...@cloudera.com>
AuthorDate: Tue Dec 13 19:10:47 2022 +0100

    IMPALA-11787, IMPALA-11516: Cardinality estimate for UNION in Iceberg position-delete plans can double the actual table cardinality
    
    The plan for Iceberg tables with position-delete files includes a UNION
    operator that takes the following inputs:
      LHS: Scan of the data files that don't have corresponding delete files
      RHS: ANTI JOIN that filters the data files that do have corresponding
           delete files based on the content of the delete files.
    
    The planner's cardinality estimates for each of these two inputs to the
    UNION can be as large as the full row count of the table (assuming no
    other predicates in the scan) and the planner simply sums these in the
    UNION which can result in a cardinality estimate for the UNION that's
    twice the size of the table.
    
    In this patch IcebergScanNode overrides computeCardinalities() of the
    HdfsScanNode. The method is implemented similarly with a few
    modifications:
    
    * we exactly know the record counts of the data files
    * for table sampling we know the file descriptors, hence the record
      counts as well
    * IDENTITY-based partition conjuncts already filtered out the files, so
      we don't need their selectivity
    
    So we calculate the SCAN NODE's cardinalities much more precisely.
    This patch also sets the column stats for the virtual columns of the
    scan node of the left-hand side of the ANTI JOIN. But because of
    IMPALA-11797 the ANTI JOIN's cardinality always equals to the
    LHS cardinality. IMPALA-11619 can also resolve this.
    
    Testing:
     * planner tests updated
    
    Change-Id: Ie2927c58c4adfd0ba1e135b63454ac9b07991cbf
    Reviewed-on: http://gerrit.cloudera.org:8080/19354
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 common/fbs/IcebergObjects.fbs                      |   1 +
 .../org/apache/impala/planner/HdfsScanNode.java    |  16 +-
 .../org/apache/impala/planner/IcebergScanNode.java |  63 ++++-
 .../apache/impala/planner/IcebergScanPlanner.java  |  80 +++++-
 .../java/org/apache/impala/util/IcebergUtil.java   |  23 +-
 .../org/apache/impala/planner/PlannerTest.java     |   3 +-
 .../org/apache/impala/planner/PlannerTestBase.java |   5 +
 .../queries/PlannerTest/iceberg-v2-tables.test     | 307 +++++++++++++++++----
 .../queries/PlannerTest/tablesample.test           |  12 +-
 9 files changed, 419 insertions(+), 91 deletions(-)

diff --git a/common/fbs/IcebergObjects.fbs b/common/fbs/IcebergObjects.fbs
index a5a8c6add..db7fac7d1 100644
--- a/common/fbs/IcebergObjects.fbs
+++ b/common/fbs/IcebergObjects.fbs
@@ -46,6 +46,7 @@ table FbIcebergPartitionTransformValue {
 
 table FbIcebergMetadata {
   file_format : FbIcebergDataFileFormat;
+  record_count : long;
   partition_keys : [FbIcebergPartitionTransformValue];
 }
 
diff --git a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
index cb663e432..adbc54aae 100644
--- a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
@@ -328,6 +328,9 @@ public class HdfsScanNode extends ScanNode {
   // this scan node has the count(*) optimization enabled.
   protected SlotDescriptor countStarSlot_ = null;
 
+  // Sampled file descriptors if table sampling is used.
+  Map<SampledPartitionMetadata, List<FileDescriptor>> sampledFiles_ = null;
+
   // Conjuncts used to trim the set of partitions passed to this node.
   // Used only to display EXPLAIN information.
   private final List<Expr> partitionConjuncts_;
@@ -1135,7 +1138,6 @@ public class HdfsScanNode extends ScanNode {
    */
   private void computeScanRangeLocations(Analyzer analyzer)
       throws ImpalaRuntimeException {
-    Map<SampledPartitionMetadata, List<FileDescriptor>> sampledFiles = null;
     if (sampleParams_ != null) {
       long percentBytes = sampleParams_.getPercentBytes();
       long randomSeed;
@@ -1147,15 +1149,15 @@ public class HdfsScanNode extends ScanNode {
       // Pass a minimum sample size of 0 because users cannot set a minimum sample size
       // for scans directly. For compute stats, a minimum sample size can be set, and
       // the sampling percent is adjusted to reflect it.
-      sampledFiles = getFilesSample(percentBytes, 0, randomSeed);
+      sampledFiles_ = getFilesSample(percentBytes, 0, randomSeed);
     }
 
     long scanRangeBytesLimit = analyzer.getQueryCtx().client_request.getQuery_options()
         .getMax_scan_range_length();
     scanRangeSpecs_ = new TScanRangeSpec();
 
-    if (sampledFiles != null) {
-      numPartitionsPerFs_ = sampledFiles.keySet().stream().collect(Collectors.groupingBy(
+    if (sampledFiles_ != null) {
+      numPartitionsPerFs_ = sampledFiles_.keySet().stream().collect(Collectors.groupingBy(
           SampledPartitionMetadata::getPartitionFsType, Collectors.counting()));
     } else {
       numPartitionsPerFs_.putAll(partitions_.stream().collect(
@@ -1191,9 +1193,9 @@ public class HdfsScanNode extends ScanNode {
       // conservatively estimate 1 row per file
       simpleLimitNumRows += fileDescs.size();
 
-      if (sampledFiles != null) {
+      if (sampledFiles_ != null) {
         // If we are sampling, check whether this partition is included in the sample.
-        fileDescs = sampledFiles.get(
+        fileDescs = sampledFiles_.get(
             new SampledPartitionMetadata(partition.getId(), partition.getFsType()));
         if (fileDescs == null) continue;
       }
@@ -1482,7 +1484,7 @@ public class HdfsScanNode extends ScanNode {
    * Sets these members:
    * extrapolatedNumRows_, inputCardinality_, cardinality_
    */
-  private void computeCardinalities(Analyzer analyzer) {
+  protected void computeCardinalities(Analyzer analyzer) {
     // Choose between the extrapolated row count and the one based on stored stats.
     extrapolatedNumRows_ = FeFsTable.Utils.getExtrapolatedNumRows(tbl_,
             sumValues(totalBytesPerFs_));
diff --git a/fe/src/main/java/org/apache/impala/planner/IcebergScanNode.java b/fe/src/main/java/org/apache/impala/planner/IcebergScanNode.java
index d85ada46c..9325b3b29 100644
--- a/fe/src/main/java/org/apache/impala/planner/IcebergScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/IcebergScanNode.java
@@ -25,6 +25,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Random;
 
+import org.apache.impala.analysis.Analyzer;
 import org.apache.impala.analysis.Expr;
 import org.apache.impala.analysis.MultiAggregateInfo;
 import org.apache.impala.analysis.TableRef;
@@ -34,9 +35,12 @@ import org.apache.impala.catalog.FeFsTable;
 import org.apache.impala.catalog.FeIcebergTable;
 import org.apache.impala.catalog.HdfsFileFormat;
 import org.apache.impala.catalog.HdfsPartition.FileDescriptor;
+import org.apache.impala.catalog.Type;
 import org.apache.impala.common.ImpalaException;
 import org.apache.impala.common.ImpalaRuntimeException;
 import org.apache.impala.fb.FbIcebergDataFileFormat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
@@ -45,11 +49,18 @@ import com.google.common.collect.Lists;
  * Scan of a single iceberg table.
  */
 public class IcebergScanNode extends HdfsScanNode {
+  private final static Logger LOG = LoggerFactory.getLogger(IcebergScanNode.class);
 
   private List<FileDescriptor> fileDescs_;
+  // Conjuncts on columns not involved in IDENTITY-partitioning. Subset of 'conjuncts_',
+  // but this does not include conjuncts on IDENTITY-partitioned columns, because such
+  // conjuncts have already been pushed to Iceberg to filter out partitions/files, so
+  // they don't have further selectivity on the surviving files.
+  private List<Expr> nonIdentityConjuncts_;
 
   public IcebergScanNode(PlanNodeId id, TableRef tblRef, List<Expr> conjuncts,
-      MultiAggregateInfo aggInfo, List<FileDescriptor> fileDescs)
+      MultiAggregateInfo aggInfo, List<FileDescriptor> fileDescs,
+      List<Expr> nonIdentityConjuncts)
       throws ImpalaRuntimeException {
     super(id, tblRef.getDesc(), conjuncts,
         getIcebergPartition(((FeIcebergTable)tblRef.getTable()).getFeFsTable()), tblRef,
@@ -58,6 +69,7 @@ public class IcebergScanNode extends HdfsScanNode {
     Preconditions.checkState(partitions_.size() == 1);
 
     fileDescs_ = fileDescs;
+    nonIdentityConjuncts_ = nonIdentityConjuncts;
     //TODO IMPALA-11577: optimize file format counting
     boolean hasParquet = false;
     boolean hasOrc = false;
@@ -90,10 +102,53 @@ public class IcebergScanNode extends HdfsScanNode {
   }
 
   /**
-   * In some cases we exactly know the cardinality, e.g. POSITION DELETE scan node.
+   * Computes cardinalities of the Iceberg scan node. Implemented based on
+   * HdfsScanNode.computeCardinalities with some modifications:
+   *   - we exactly know the record counts of the data files
+   *   - IDENTITY-based partition conjuncts already filtered out the files, so
+   *     we don't need their selectivity
    */
-  public void setCardinality(long cardinality) {
-    cardinality_ = cardinality;
+  @Override
+  protected void computeCardinalities(Analyzer analyzer) {
+    cardinality_ = 0;
+
+    if (sampledFiles_ != null) {
+      for (List<FileDescriptor> sampledFileDescs : sampledFiles_.values()) {
+        for (FileDescriptor fd : sampledFileDescs) {
+          cardinality_ += fd.getFbFileMetadata().icebergMetadata().recordCount();
+        }
+      }
+    } else {
+      for (FileDescriptor fd : fileDescs_) {
+        cardinality_ += fd.getFbFileMetadata().icebergMetadata().recordCount();
+      }
+    }
+
+    // Adjust cardinality for all collections referenced along the tuple's path.
+    for (Type t: desc_.getPath().getMatchedTypes()) {
+      if (t.isCollectionType()) cardinality_ *= PlannerContext.AVG_COLLECTION_SIZE;
+    }
+    inputCardinality_ = cardinality_;
+
+    if (cardinality_ > 0) {
+      double selectivity = computeCombinedSelectivity(nonIdentityConjuncts_);
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("cardinality_=" + Long.toString(cardinality_) +
+                  " sel=" + Double.toString(selectivity));
+      }
+      cardinality_ = applySelectivity(cardinality_, selectivity);
+    }
+
+    cardinality_ = capCardinalityAtLimit(cardinality_);
+
+    if (countStarSlot_ != null) {
+      // We are doing optimized count star. Override cardinality with total num files.
+      inputCardinality_ = fileDescs_.size();
+      cardinality_ = fileDescs_.size();
+    }
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("IcebergScanNode: cardinality_=" + Long.toString(cardinality_));
+    }
   }
 
   /**
diff --git a/fe/src/main/java/org/apache/impala/planner/IcebergScanPlanner.java b/fe/src/main/java/org/apache/impala/planner/IcebergScanPlanner.java
index fbbce6455..730ce3af9 100644
--- a/fe/src/main/java/org/apache/impala/planner/IcebergScanPlanner.java
+++ b/fe/src/main/java/org/apache/impala/planner/IcebergScanPlanner.java
@@ -27,6 +27,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.function.Predicate;
 import java.util.stream.Collectors;
 
 import org.apache.curator.shaded.com.google.common.collect.Lists;
@@ -51,6 +52,7 @@ import org.apache.impala.analysis.IsNullPredicate;
 import org.apache.impala.analysis.JoinOperator;
 import org.apache.impala.analysis.LiteralExpr;
 import org.apache.impala.catalog.Column;
+import org.apache.impala.catalog.ColumnStats;
 import org.apache.impala.analysis.BoolLiteral;
 import org.apache.impala.analysis.DateLiteral;
 import org.apache.impala.analysis.MultiAggregateInfo;
@@ -80,6 +82,7 @@ import org.apache.impala.common.InternalException;
 import org.apache.impala.common.Pair;
 import org.apache.impala.planner.JoinNode.DistributionMode;
 import org.apache.impala.thrift.TColumnStats;
+import org.apache.impala.thrift.TIcebergPartitionTransformType;
 import org.apache.impala.thrift.TVirtualColumnType;
 import org.apache.impala.util.ExprUtil;
 import org.apache.impala.util.IcebergUtil;
@@ -111,6 +114,9 @@ public class IcebergScanPlanner {
   private List<FileDescriptor> dataFilesWithDeletes_ = new ArrayList<>();
   private Set<FileDescriptor> deleteFiles_ = new HashSet<>();
 
+  // Conjuncts on columns not involved in IDENTITY-partitioning.
+  private List<Expr> nonIdentityConjuncts_ = new ArrayList<>();
+
   // Statistics about the data and delete files. Useful for memory estimates of the
   // ANTI JOIN
   private long deletesRecordCount_ = 0;
@@ -143,7 +149,7 @@ public class IcebergScanPlanner {
       // If there are no delete files we can just create a single SCAN node.
       Preconditions.checkState(dataFilesWithDeletes_.isEmpty());
       ret = new IcebergScanNode(ctx_.getNextNodeId(), tblRef_, conjuncts_,
-          aggInfo_, dataFilesWithoutDeletes_);
+          aggInfo_, dataFilesWithoutDeletes_, nonIdentityConjuncts_);
       ret.init(analyzer_);
     } else {
       // Let's create a bit more complex plan in the presence of delete files.
@@ -172,7 +178,8 @@ public class IcebergScanPlanner {
 
   private PlanNode planWithoutIceberg() throws ImpalaException {
     PlanNode ret = new IcebergScanNode(ctx_.getNextNodeId(), tblRef_, conjuncts_,
-        aggInfo_, getIceTable().getContentFileStore().getDataFiles());
+        aggInfo_, getIceTable().getContentFileStore().getDataFiles(),
+        nonIdentityConjuncts_);
     ret.init(analyzer_);
     return ret;
   }
@@ -187,7 +194,8 @@ public class IcebergScanPlanner {
     // If there are data files without corresponding delete files to be applied, we
     // can just create a SCAN node for these and do a UNION ALL with the ANTI JOIN.
     IcebergScanNode dataScanNode = new IcebergScanNode(
-      ctx_.getNextNodeId(), tblRef_, conjuncts_, aggInfo_, dataFilesWithoutDeletes_);
+        ctx_.getNextNodeId(), tblRef_, conjuncts_, aggInfo_, dataFilesWithoutDeletes_,
+        nonIdentityConjuncts_);
     dataScanNode.init(analyzer_);
     List<Expr> outputExprs = tblRef_.getDesc().getSlots().stream().map(
         entry -> new SlotRef(entry)).collect(Collectors.toList());
@@ -217,13 +225,14 @@ public class IcebergScanPlanner {
     addDataVirtualPositionSlots(tblRef_);
     addDeletePositionSlots(deleteDeltaRef);
     IcebergScanNode dataScanNode = new IcebergScanNode(
-      dataScanNodeId, tblRef_, conjuncts_, aggInfo_, dataFilesWithDeletes_);
+        dataScanNodeId, tblRef_, conjuncts_, aggInfo_, dataFilesWithDeletes_,
+        nonIdentityConjuncts_);
     dataScanNode.init(analyzer_);
     IcebergScanNode deleteScanNode = new IcebergScanNode(
         deleteScanNodeId, deleteDeltaRef, /*conjuncts=*/Collections.emptyList(),
-        aggInfo_, Lists.newArrayList(deleteFiles_));
+        aggInfo_, Lists.newArrayList(deleteFiles_),
+        /*nonIdentityConjuncts=*/Collections.emptyList());
     deleteScanNode.init(analyzer_);
-    deleteScanNode.setCardinality(deletesRecordCount_);
 
     // Now let's create the JOIN node
     List<BinaryPredicate> positionJoinConjuncts = createPositionJoinConjuncts(
@@ -249,6 +258,14 @@ public class IcebergScanPlanner {
       SingleNodePlanner.addSlotRefToDesc(analyzer_, rawPath);
       rawPath.remove(rawPath.size() - 1);
     }
+    for (SlotDescriptor insertSlotDesc : tblRef.getDesc().getSlots()) {
+      TVirtualColumnType virtColType = insertSlotDesc.getVirtualColumnType();
+      if (virtColType == TVirtualColumnType.INPUT_FILE_NAME) {
+        insertSlotDesc.setStats(virtualInputFileNameStats());
+      } else if (virtColType == TVirtualColumnType.FILE_POSITION) {
+        insertSlotDesc.setStats(virtualFilePositionStats());
+      }
+    }
   }
 
   private void addDeletePositionSlots(TableRef tblRef)
@@ -297,6 +314,18 @@ public class IcebergScanPlanner {
     return ret;
   }
 
+  private ColumnStats virtualInputFileNameStats() {
+    ColumnStats ret = new ColumnStats(Type.STRING);
+    ret.setNumDistinctValues(dataFilesWithDeletes_.size());
+    return ret;
+  }
+
+  private ColumnStats virtualFilePositionStats() {
+    ColumnStats ret = new ColumnStats(Type.BIGINT);
+    ret.setNumDistinctValues(deletesRecordCount_ / dataFilesWithDeletes_.size());
+    return ret;
+  }
+
   private void filterFileDescriptors() throws ImpalaException {
     TimeTravelSpec timeTravelSpec = tblRef_.getTimeTravelSpec();
 
@@ -434,25 +463,49 @@ public class IcebergScanPlanner {
   private void extractIcebergConjuncts() throws ImpalaException {
     boolean isPartitionColumnIncluded = false;
     Map<SlotId, SlotDescriptor> idToSlotDesc = new HashMap<>();
+    Set<Expr> identityConjuncts = new HashSet<>();
     for (SlotDescriptor slotDesc : tblRef_.getDesc().getSlots()) {
       idToSlotDesc.put(slotDesc.getId(), slotDesc);
     }
     for (Expr expr : conjuncts_) {
       if (isPartitionColumnIncluded(expr, idToSlotDesc)) {
         isPartitionColumnIncluded = true;
-        break;
+        if (isIdentityPartitionIncluded(expr, idToSlotDesc)) {
+          identityConjuncts.add(expr);
+        }
       }
     }
     if (!isPartitionColumnIncluded) {
+      // No partition conjuncts, i.e. every conjunct is non-identity conjunct.
+      nonIdentityConjuncts_ = conjuncts_;
       return;
     }
     for (Expr expr : conjuncts_) {
-      tryConvertIcebergPredicate(expr);
+      if (tryConvertIcebergPredicate(expr)) {
+        if (!identityConjuncts.contains(expr)) {
+          nonIdentityConjuncts_.add(expr);
+        }
+      } else {
+        nonIdentityConjuncts_.add(expr);
+      }
     }
   }
 
   private boolean isPartitionColumnIncluded(Expr expr,
       Map<SlotId, SlotDescriptor> idToSlotDesc) {
+    return hasPartitionTransformType(expr, idToSlotDesc,
+        transformType -> transformType != TIcebergPartitionTransformType.VOID);
+  }
+
+  private boolean isIdentityPartitionIncluded(Expr expr,
+      Map<SlotId, SlotDescriptor> idToSlotDesc) {
+    return hasPartitionTransformType(expr, idToSlotDesc,
+        transformType -> transformType == TIcebergPartitionTransformType.IDENTITY);
+  }
+
+  private boolean hasPartitionTransformType(Expr expr,
+      Map<SlotId, SlotDescriptor> idToSlotDesc,
+      Predicate<TIcebergPartitionTransformType> pred) {
     List<TupleId> tupleIds = Lists.newArrayList();
     List<SlotId> slotIds = Lists.newArrayList();
     expr.getIds(tupleIds, slotIds);
@@ -467,8 +520,11 @@ public class IcebergScanPlanner {
       if (col == null) continue;
       Preconditions.checkState(col instanceof IcebergColumn);
       IcebergColumn iceCol = (IcebergColumn)col;
-      if (IcebergUtil.isPartitionColumn(iceCol,
-          getIceTable().getDefaultPartitionSpec())) {
+      TIcebergPartitionTransformType transformType =
+          IcebergUtil.getPartitionTransformType(
+              iceCol,
+              getIceTable().getDefaultPartitionSpec());
+      if (pred.test(transformType)) {
         return true;
       }
     }
@@ -508,13 +564,15 @@ public class IcebergScanPlanner {
   /**
    * Transform impala predicate to iceberg predicate
    */
-  private void tryConvertIcebergPredicate(Expr expr)
+  private boolean tryConvertIcebergPredicate(Expr expr)
       throws ImpalaException {
     Expression predicate = convertIcebergPredicate(expr);
     if (predicate != null) {
       icebergPredicates_.add(predicate);
       LOG.debug("Push down the predicate: " + predicate + " to iceberg");
+      return true;
     }
+    return false;
   }
 
   private Expression convertIcebergPredicate(Expr expr)
diff --git a/fe/src/main/java/org/apache/impala/util/IcebergUtil.java b/fe/src/main/java/org/apache/impala/util/IcebergUtil.java
index 3f9dc7314..6d08e4866 100644
--- a/fe/src/main/java/org/apache/impala/util/IcebergUtil.java
+++ b/fe/src/main/java/org/apache/impala/util/IcebergUtil.java
@@ -922,6 +922,7 @@ public class IcebergUtil {
     if (fileFormat != -1) {
       FbIcebergMetadata.addFileFormat(fbb, fileFormat);
     }
+    FbIcebergMetadata.addRecordCount(fbb, cf.recordCount());
     if (partKeysOffset != -1) {
       FbIcebergMetadata.addPartitionKeys(fbb, partKeysOffset);
     }
@@ -1015,14 +1016,26 @@ public class IcebergUtil {
     });
   }
 
-  public static boolean isPartitionColumn(IcebergColumn column,
+  /**
+   * Returns the partition transform type used for this column in the given spec.
+   * Returns TIcebergPartitionTransformType.VOID if the column is not used as a
+   * partitioning column.
+   */
+  public static TIcebergPartitionTransformType getPartitionTransformType(
+      IcebergColumn column,
       IcebergPartitionSpec spec) {
-    if (!spec.hasPartitionFields()) return false;
+    if (!spec.hasPartitionFields()) return TIcebergPartitionTransformType.VOID;
     for (IcebergPartitionField partField : spec.getIcebergPartitionFields()) {
       if (partField.getTransformType() == TIcebergPartitionTransformType.VOID) continue;
-      if (column.getFieldId() != partField.getSourceId()) continue;
-      return true;
+      if (column.getFieldId() == partField.getSourceId()) {
+        return partField.getTransformType();
+      }
     }
-    return false;
+    return TIcebergPartitionTransformType.VOID;
+  }
+
+  public static boolean isPartitionColumn(IcebergColumn column,
+      IcebergPartitionSpec spec) {
+    return getPartitionTransformType(column, spec) != TIcebergPartitionTransformType.VOID;
   }
 }
diff --git a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
index 5f2383e48..d040a32d3 100644
--- a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
+++ b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
@@ -1281,8 +1281,7 @@ public class PlannerTest extends PlannerTestBase {
   @Test
   public void testIcebergV2TableScans() {
     runPlannerTestFile("iceberg-v2-tables", "functional_parquet",
-        ImmutableSet.of(
-            PlannerTestOption.DO_NOT_VALIDATE_ROWCOUNT_ESTIMATION_FOR_PARTITIONS));
+        ImmutableSet.of(PlannerTestOption.VALIDATE_CARDINALITY));
   }
 
   /**
diff --git a/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java b/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java
index c9bba8fdf..71c35a9c0 100644
--- a/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java
+++ b/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java
@@ -246,6 +246,11 @@ public class PlannerTestBase extends FrontendTestBase {
         // All partitions of insertTableId are okay.
         if (tableDesc.getId() == insertTableId) continue;
         if (!tableDesc.isSetHdfsTable()) continue;
+        // Iceberg partitions are handled differently, in Impala there's always a single
+        // HMS partition in an Iceberg table and actual partition/file pruning is
+        // handled by Iceberg. This means 'scanRangePartitions' can be empty while the
+        // descriptor table still has the single HMS partition.
+        if (tableDesc.isSetIcebergTable() && scanRangePartitions.isEmpty()) continue;
         THdfsTable hdfsTable = tableDesc.getHdfsTable();
         for (Map.Entry<Long, THdfsPartition> e :
              hdfsTable.getPartitions().entrySet()) {
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/iceberg-v2-tables.test b/testdata/workloads/functional-planner/queries/PlannerTest/iceberg-v2-tables.test
index d9db7f81f..e02ca0d7f 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/iceberg-v2-tables.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/iceberg-v2-tables.test
@@ -51,7 +51,7 @@ PLAN-ROOT SINK
 |  row-size=8B cardinality=1
 |
 02:DELETE EVENTS HASH JOIN [LEFT ANTI JOIN]
-|  row-size=20B cardinality=4.73K
+|  row-size=20B cardinality=3
 |
 |--01:SCAN HDFS [functional_parquet.iceberg_v2_delete_positional-POSITION-DELETE-01 functional_parquet.iceberg_v2_delete_positional-position-delete]
 |     HDFS partitions=1/1 files=1 size=1.54KB
@@ -214,7 +214,7 @@ PLAN-ROOT SINK
 |  row-size=36B cardinality=1
 |
 |--02:DELETE EVENTS HASH JOIN [LEFT ANTI JOIN]
-|  |  row-size=36B cardinality=10
+|  |  row-size=36B cardinality=6
 |  |
 |  |--01:SCAN HDFS [functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files-POSITION-DELETE-01 functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files-position-delete]
 |  |     HDFS partitions=1/1 files=2 size=5.33KB
@@ -222,11 +222,11 @@ PLAN-ROOT SINK
 |  |
 |  00:SCAN HDFS [functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files]
 |     HDFS partitions=1/1 files=2 size=1.22KB
-|     row-size=36B cardinality=10
+|     row-size=36B cardinality=6
 |
 03:SCAN HDFS [functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files]
    HDFS partitions=1/1 files=2 size=1.22KB
-   row-size=36B cardinality=1
+   row-size=36B cardinality=4
 ---- DISTRIBUTEDPLAN
 PLAN-ROOT SINK
 |
@@ -239,7 +239,7 @@ PLAN-ROOT SINK
 |  row-size=36B cardinality=1
 |
 |--02:DELETE EVENTS HASH JOIN [LEFT ANTI JOIN, PARTITIONED]
-|  |  row-size=36B cardinality=10
+|  |  row-size=36B cardinality=6
 |  |
 |  |--06:EXCHANGE [HASH(functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files-position-delete.pos,functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files-position-delete.file_path)]
 |  |  |
@@ -251,11 +251,11 @@ PLAN-ROOT SINK
 |  |
 |  00:SCAN HDFS [functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files]
 |     HDFS partitions=1/1 files=2 size=1.22KB
-|     row-size=36B cardinality=10
+|     row-size=36B cardinality=6
 |
 03:SCAN HDFS [functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files]
    HDFS partitions=1/1 files=2 size=1.22KB
-   row-size=36B cardinality=10
+   row-size=36B cardinality=4
 ====
 SELECT * from iceberg_v2_positional_not_all_data_files_have_delete_files
 ---- PLAN
@@ -263,10 +263,10 @@ PLAN-ROOT SINK
 |
 04:UNION
 |  pass-through-operands: all
-|  row-size=36B cardinality=20
+|  row-size=36B cardinality=10
 |
 |--02:DELETE EVENTS HASH JOIN [LEFT ANTI JOIN]
-|  |  row-size=36B cardinality=10
+|  |  row-size=36B cardinality=6
 |  |
 |  |--01:SCAN HDFS [functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files-POSITION-DELETE-01 functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files-position-delete]
 |  |     HDFS partitions=1/1 files=2 size=5.33KB
@@ -274,11 +274,11 @@ PLAN-ROOT SINK
 |  |
 |  00:SCAN HDFS [functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files]
 |     HDFS partitions=1/1 files=2 size=1.22KB
-|     row-size=36B cardinality=10
+|     row-size=36B cardinality=6
 |
 03:SCAN HDFS [functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files]
    HDFS partitions=1/1 files=2 size=1.22KB
-   row-size=36B cardinality=10
+   row-size=36B cardinality=4
 ---- DISTRIBUTEDPLAN
 PLAN-ROOT SINK
 |
@@ -286,10 +286,10 @@ PLAN-ROOT SINK
 |
 04:UNION
 |  pass-through-operands: all
-|  row-size=36B cardinality=20
+|  row-size=36B cardinality=10
 |
 |--02:DELETE EVENTS HASH JOIN [LEFT ANTI JOIN, PARTITIONED]
-|  |  row-size=36B cardinality=10
+|  |  row-size=36B cardinality=6
 |  |
 |  |--06:EXCHANGE [HASH(functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files-position-delete.pos,functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files-position-delete.file_path)]
 |  |  |
@@ -301,11 +301,11 @@ PLAN-ROOT SINK
 |  |
 |  00:SCAN HDFS [functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files]
 |     HDFS partitions=1/1 files=2 size=1.22KB
-|     row-size=36B cardinality=10
+|     row-size=36B cardinality=6
 |
 03:SCAN HDFS [functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files]
    HDFS partitions=1/1 files=2 size=1.22KB
-   row-size=36B cardinality=10
+   row-size=36B cardinality=4
 ====
 SELECT * from iceberg_v2_positional_update_all_rows
 ---- PLAN
@@ -313,10 +313,10 @@ PLAN-ROOT SINK
 |
 04:UNION
 |  pass-through-operands: all
-|  row-size=36B cardinality=12
+|  row-size=36B cardinality=6
 |
 |--02:DELETE EVENTS HASH JOIN [LEFT ANTI JOIN]
-|  |  row-size=36B cardinality=6
+|  |  row-size=36B cardinality=3
 |  |
 |  |--01:SCAN HDFS [functional_parquet.iceberg_v2_positional_update_all_rows-POSITION-DELETE-01 functional_parquet.iceberg_v2_positional_update_all_rows-position-delete]
 |  |     HDFS partitions=1/1 files=1 size=2.60KB
@@ -324,11 +324,11 @@ PLAN-ROOT SINK
 |  |
 |  00:SCAN HDFS [functional_parquet.iceberg_v2_positional_update_all_rows]
 |     HDFS partitions=1/1 files=1 size=625B
-|     row-size=36B cardinality=6
+|     row-size=36B cardinality=3
 |
 03:SCAN HDFS [functional_parquet.iceberg_v2_positional_update_all_rows]
    HDFS partitions=1/1 files=1 size=625B
-   row-size=36B cardinality=6
+   row-size=36B cardinality=3
 ---- DISTRIBUTEDPLAN
 PLAN-ROOT SINK
 |
@@ -336,10 +336,10 @@ PLAN-ROOT SINK
 |
 04:UNION
 |  pass-through-operands: all
-|  row-size=36B cardinality=12
+|  row-size=36B cardinality=6
 |
 |--02:DELETE EVENTS HASH JOIN [LEFT ANTI JOIN, BROADCAST]
-|  |  row-size=36B cardinality=6
+|  |  row-size=36B cardinality=3
 |  |
 |  |--05:EXCHANGE [BROADCAST]
 |  |  |
@@ -349,11 +349,11 @@ PLAN-ROOT SINK
 |  |
 |  00:SCAN HDFS [functional_parquet.iceberg_v2_positional_update_all_rows]
 |     HDFS partitions=1/1 files=1 size=625B
-|     row-size=36B cardinality=6
+|     row-size=36B cardinality=3
 |
 03:SCAN HDFS [functional_parquet.iceberg_v2_positional_update_all_rows]
    HDFS partitions=1/1 files=1 size=625B
-   row-size=36B cardinality=6
+   row-size=36B cardinality=3
 ====
 SELECT * from iceberg_v2_partitioned_position_deletes
 ---- PLAN
@@ -452,22 +452,22 @@ PLAN-ROOT SINK
 |
 07:HASH JOIN [LEFT ANTI JOIN]
 |  hash predicates: functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files.i IS NOT DISTINCT FROM functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files.i, functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files.s IS NOT DISTINCT FROM functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files.s
-|  row-size=16B cardinality=20
+|  row-size=16B cardinality=10
 |
 |--06:SCAN HDFS [functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files]
 |     HDFS partitions=1/1 files=1 size=625B
-|     row-size=16B cardinality=10
+|     row-size=16B cardinality=3
 |
 05:AGGREGATE [FINALIZE]
 |  group by: functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files.i, functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files.s
-|  row-size=16B cardinality=20
+|  row-size=16B cardinality=10
 |
 04:UNION
 |  pass-through-operands: all
-|  row-size=36B cardinality=20
+|  row-size=36B cardinality=10
 |
 |--02:DELETE EVENTS HASH JOIN [LEFT ANTI JOIN]
-|  |  row-size=36B cardinality=10
+|  |  row-size=36B cardinality=6
 |  |
 |  |--01:SCAN HDFS [functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files-POSITION-DELETE-01 functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files-position-delete]
 |  |     HDFS partitions=1/1 files=2 size=5.33KB
@@ -475,11 +475,11 @@ PLAN-ROOT SINK
 |  |
 |  00:SCAN HDFS [functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files]
 |     HDFS partitions=1/1 files=2 size=1.22KB
-|     row-size=36B cardinality=10
+|     row-size=36B cardinality=6
 |
 03:SCAN HDFS [functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files]
    HDFS partitions=1/1 files=2 size=1.22KB
-   row-size=36B cardinality=10
+   row-size=36B cardinality=4
 ---- DISTRIBUTEDPLAN
 PLAN-ROOT SINK
 |
@@ -487,30 +487,30 @@ PLAN-ROOT SINK
 |
 07:HASH JOIN [LEFT ANTI JOIN, PARTITIONED]
 |  hash predicates: functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files.i IS NOT DISTINCT FROM functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files.i, functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files.s IS NOT DISTINCT FROM functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files.s
-|  row-size=16B cardinality=20
+|  row-size=16B cardinality=10
 |
 |--12:EXCHANGE [HASH(functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files.i,functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files.s)]
 |  |
 |  06:SCAN HDFS [functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files]
 |     HDFS partitions=1/1 files=1 size=625B
-|     row-size=16B cardinality=10
+|     row-size=16B cardinality=3
 |
 11:AGGREGATE [FINALIZE]
 |  group by: functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files.i, functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files.s
-|  row-size=16B cardinality=20
+|  row-size=16B cardinality=10
 |
 10:EXCHANGE [HASH(functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files.i,functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files.s)]
 |
 05:AGGREGATE [STREAMING]
 |  group by: functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files.i, functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files.s
-|  row-size=16B cardinality=20
+|  row-size=16B cardinality=10
 |
 04:UNION
 |  pass-through-operands: all
-|  row-size=36B cardinality=20
+|  row-size=36B cardinality=10
 |
 |--02:DELETE EVENTS HASH JOIN [LEFT ANTI JOIN, PARTITIONED]
-|  |  row-size=36B cardinality=10
+|  |  row-size=36B cardinality=6
 |  |
 |  |--09:EXCHANGE [HASH(functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files-position-delete.pos,functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files-position-delete.file_path)]
 |  |  |
@@ -522,11 +522,11 @@ PLAN-ROOT SINK
 |  |
 |  00:SCAN HDFS [functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files]
 |     HDFS partitions=1/1 files=2 size=1.22KB
-|     row-size=36B cardinality=10
+|     row-size=36B cardinality=6
 |
 03:SCAN HDFS [functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files]
    HDFS partitions=1/1 files=2 size=1.22KB
-   row-size=36B cardinality=10
+   row-size=36B cardinality=4
 ====
 with v as (select i + 1000 as ii, upper(s) as ss from iceberg_v2_positional_not_all_data_files_have_delete_files)
 select * from v where ii > 1003;
@@ -592,7 +592,7 @@ PLAN-ROOT SINK
 11:HASH JOIN [LEFT SEMI JOIN]
 |  hash predicates: i = max(i)
 |  runtime filters: RF000 <- max(i)
-|  row-size=36B cardinality=20
+|  row-size=36B cardinality=10
 |
 |--10:AGGREGATE [FINALIZE]
 |  |  output: max(i)
@@ -600,10 +600,10 @@ PLAN-ROOT SINK
 |  |
 |  09:UNION
 |  |  pass-through-operands: all
-|  |  row-size=24B cardinality=12
+|  |  row-size=24B cardinality=6
 |  |
 |  |--07:DELETE EVENTS HASH JOIN [LEFT ANTI JOIN]
-|  |  |  row-size=24B cardinality=6
+|  |  |  row-size=24B cardinality=3
 |  |  |
 |  |  |--06:SCAN HDFS [functional_parquet.iceberg_v2_positional_update_all_rows-POSITION-DELETE-06 functional_parquet.iceberg_v2_positional_update_all_rows-position-delete]
 |  |  |     HDFS partitions=1/1 files=1 size=2.60KB
@@ -611,18 +611,18 @@ PLAN-ROOT SINK
 |  |  |
 |  |  05:SCAN HDFS [functional_parquet.iceberg_v2_positional_update_all_rows]
 |  |     HDFS partitions=1/1 files=1 size=625B
-|  |     row-size=24B cardinality=6
+|  |     row-size=24B cardinality=3
 |  |
 |  08:SCAN HDFS [functional_parquet.iceberg_v2_positional_update_all_rows]
 |     HDFS partitions=1/1 files=1 size=625B
-|     row-size=24B cardinality=6
+|     row-size=24B cardinality=3
 |
 04:UNION
 |  pass-through-operands: all
-|  row-size=36B cardinality=20
+|  row-size=36B cardinality=10
 |
 |--02:DELETE EVENTS HASH JOIN [LEFT ANTI JOIN]
-|  |  row-size=36B cardinality=10
+|  |  row-size=36B cardinality=6
 |  |
 |  |--01:SCAN HDFS [functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files-POSITION-DELETE-01 functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files-position-delete]
 |  |     HDFS partitions=1/1 files=2 size=5.33KB
@@ -631,12 +631,12 @@ PLAN-ROOT SINK
 |  00:SCAN HDFS [functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files]
 |     HDFS partitions=1/1 files=2 size=1.22KB
 |     runtime filters: RF000 -> i
-|     row-size=36B cardinality=10
+|     row-size=36B cardinality=6
 |
 03:SCAN HDFS [functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files]
    HDFS partitions=1/1 files=2 size=1.22KB
    runtime filters: RF000 -> i
-   row-size=36B cardinality=10
+   row-size=36B cardinality=4
 ---- DISTRIBUTEDPLAN
 PLAN-ROOT SINK
 |
@@ -645,7 +645,7 @@ PLAN-ROOT SINK
 11:HASH JOIN [LEFT SEMI JOIN, BROADCAST]
 |  hash predicates: i = max(i)
 |  runtime filters: RF000 <- max(i)
-|  row-size=36B cardinality=20
+|  row-size=36B cardinality=10
 |
 |--17:EXCHANGE [BROADCAST]
 |  |
@@ -661,10 +661,10 @@ PLAN-ROOT SINK
 |  |
 |  09:UNION
 |  |  pass-through-operands: all
-|  |  row-size=24B cardinality=12
+|  |  row-size=24B cardinality=6
 |  |
 |  |--07:DELETE EVENTS HASH JOIN [LEFT ANTI JOIN, BROADCAST]
-|  |  |  row-size=24B cardinality=6
+|  |  |  row-size=24B cardinality=3
 |  |  |
 |  |  |--14:EXCHANGE [BROADCAST]
 |  |  |  |
@@ -674,18 +674,18 @@ PLAN-ROOT SINK
 |  |  |
 |  |  05:SCAN HDFS [functional_parquet.iceberg_v2_positional_update_all_rows]
 |  |     HDFS partitions=1/1 files=1 size=625B
-|  |     row-size=24B cardinality=6
+|  |     row-size=24B cardinality=3
 |  |
 |  08:SCAN HDFS [functional_parquet.iceberg_v2_positional_update_all_rows]
 |     HDFS partitions=1/1 files=1 size=625B
-|     row-size=24B cardinality=6
+|     row-size=24B cardinality=3
 |
 04:UNION
 |  pass-through-operands: all
-|  row-size=36B cardinality=20
+|  row-size=36B cardinality=10
 |
 |--02:DELETE EVENTS HASH JOIN [LEFT ANTI JOIN, PARTITIONED]
-|  |  row-size=36B cardinality=10
+|  |  row-size=36B cardinality=6
 |  |
 |  |--13:EXCHANGE [HASH(functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files-position-delete.pos,functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files-position-delete.file_path)]
 |  |  |
@@ -698,10 +698,205 @@ PLAN-ROOT SINK
 |  00:SCAN HDFS [functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files]
 |     HDFS partitions=1/1 files=2 size=1.22KB
 |     runtime filters: RF000 -> i
-|     row-size=36B cardinality=10
+|     row-size=36B cardinality=6
 |
 03:SCAN HDFS [functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files]
    HDFS partitions=1/1 files=2 size=1.22KB
    runtime filters: RF000 -> i
-   row-size=36B cardinality=10
+   row-size=36B cardinality=4
+====
+select * from iceberg_v2_partitioned_position_deletes where action = 'download';
+---- PLAN
+PLAN-ROOT SINK
+|
+02:DELETE EVENTS HASH JOIN [LEFT ANTI JOIN]
+|  row-size=64B cardinality=6
+|
+|--01:SCAN HDFS [functional_parquet.iceberg_v2_partitioned_position_deletes-POSITION-DELETE-01 functional_parquet.iceberg_v2_partitioned_position_deletes-position-delete]
+|     HDFS partitions=1/1 files=1 size=3.18KB
+|     row-size=185B cardinality=2
+|
+00:SCAN HDFS [functional_parquet.iceberg_v2_partitioned_position_deletes]
+   HDFS partitions=1/1 files=1 size=1.17KB
+   predicates: action = 'download'
+   row-size=64B cardinality=6
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+04:EXCHANGE [UNPARTITIONED]
+|
+02:DELETE EVENTS HASH JOIN [LEFT ANTI JOIN, BROADCAST]
+|  row-size=64B cardinality=6
+|
+|--03:EXCHANGE [BROADCAST]
+|  |
+|  01:SCAN HDFS [functional_parquet.iceberg_v2_partitioned_position_deletes-POSITION-DELETE-01 functional_parquet.iceberg_v2_partitioned_position_deletes-position-delete]
+|     HDFS partitions=1/1 files=1 size=3.18KB
+|     row-size=185B cardinality=2
+|
+00:SCAN HDFS [functional_parquet.iceberg_v2_partitioned_position_deletes]
+   HDFS partitions=1/1 files=1 size=1.17KB
+   predicates: action = 'download'
+   row-size=64B cardinality=6
+====
+select * from iceberg_v2_partitioned_position_deletes
+where action = 'download' and user = 'Lisa';
+---- PLAN
+PLAN-ROOT SINK
+|
+02:DELETE EVENTS HASH JOIN [LEFT ANTI JOIN]
+|  row-size=64B cardinality=1
+|
+|--01:SCAN HDFS [functional_parquet.iceberg_v2_partitioned_position_deletes-POSITION-DELETE-01 functional_parquet.iceberg_v2_partitioned_position_deletes-position-delete]
+|     HDFS partitions=1/1 files=1 size=3.18KB
+|     row-size=185B cardinality=2
+|
+00:SCAN HDFS [functional_parquet.iceberg_v2_partitioned_position_deletes]
+   HDFS partitions=1/1 files=1 size=1.17KB
+   predicates: `user` = 'Lisa', action = 'download'
+   row-size=64B cardinality=1
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+04:EXCHANGE [UNPARTITIONED]
+|
+02:DELETE EVENTS HASH JOIN [LEFT ANTI JOIN, BROADCAST]
+|  row-size=64B cardinality=1
+|
+|--03:EXCHANGE [BROADCAST]
+|  |
+|  01:SCAN HDFS [functional_parquet.iceberg_v2_partitioned_position_deletes-POSITION-DELETE-01 functional_parquet.iceberg_v2_partitioned_position_deletes-position-delete]
+|     HDFS partitions=1/1 files=1 size=3.18KB
+|     row-size=185B cardinality=2
+|
+00:SCAN HDFS [functional_parquet.iceberg_v2_partitioned_position_deletes]
+   HDFS partitions=1/1 files=1 size=1.17KB
+   predicates: `user` = 'Lisa', action = 'download'
+   row-size=64B cardinality=1
+====
+select event_time, action from iceberg_partitioned where action = 'click' or action = 'view';
+---- PLAN
+PLAN-ROOT SINK
+|
+00:SCAN HDFS [functional_parquet.iceberg_partitioned]
+   HDFS partitions=1/1 files=14 size=15.93KB
+   predicates: action IN ('click', 'view')
+   row-size=28B cardinality=14
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+01:EXCHANGE [UNPARTITIONED]
+|
+00:SCAN HDFS [functional_parquet.iceberg_partitioned]
+   HDFS partitions=1/1 files=14 size=15.93KB
+   predicates: action IN ('click', 'view')
+   row-size=28B cardinality=14
+====
+select event_time, action from iceberg_partitioned where action in ('click', 'view');
+---- PLAN
+PLAN-ROOT SINK
+|
+00:SCAN HDFS [functional_parquet.iceberg_partitioned]
+   HDFS partitions=1/1 files=14 size=15.93KB
+   predicates: action IN ('click', 'view')
+   row-size=28B cardinality=14
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+01:EXCHANGE [UNPARTITIONED]
+|
+00:SCAN HDFS [functional_parquet.iceberg_partitioned]
+   HDFS partitions=1/1 files=14 size=15.93KB
+   predicates: action IN ('click', 'view')
+   row-size=28B cardinality=14
+====
+select event_time, action from iceberg_partitioned where event_time='2020-01-01 11:00:00' or action = 'click';
+---- PLAN
+PLAN-ROOT SINK
+|
+00:SCAN HDFS [functional_parquet.iceberg_partitioned]
+   HDFS partitions=1/1 files=6 size=6.85KB
+   predicates: event_time = TIMESTAMP '2020-01-01 11:00:00' OR action = 'click'
+   row-size=28B cardinality=6
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+01:EXCHANGE [UNPARTITIONED]
+|
+00:SCAN HDFS [functional_parquet.iceberg_partitioned]
+   HDFS partitions=1/1 files=6 size=6.85KB
+   predicates: event_time = TIMESTAMP '2020-01-01 11:00:00' OR action = 'click'
+   row-size=28B cardinality=6
+====
+select event_time, action from iceberg_partitioned where event_time='2020-01-01 11:00:00' or action = 'click' or action = 'view';
+---- PLAN
+PLAN-ROOT SINK
+|
+00:SCAN HDFS [functional_parquet.iceberg_partitioned]
+   HDFS partitions=1/1 files=14 size=15.93KB
+   predicates: event_time = TIMESTAMP '2020-01-01 11:00:00' OR action = 'click' OR action = 'view'
+   row-size=28B cardinality=14
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+01:EXCHANGE [UNPARTITIONED]
+|
+00:SCAN HDFS [functional_parquet.iceberg_partitioned]
+   HDFS partitions=1/1 files=14 size=15.93KB
+   predicates: event_time = TIMESTAMP '2020-01-01 11:00:00' OR action = 'click' OR action = 'view'
+   row-size=28B cardinality=14
+====
+select event_time, action from iceberg_partitioned where event_time='2020-01-01 11:00:00' or action in ('click', 'view');
+---- PLAN
+PLAN-ROOT SINK
+|
+00:SCAN HDFS [functional_parquet.iceberg_partitioned]
+   HDFS partitions=1/1 files=14 size=15.93KB
+   predicates: event_time = TIMESTAMP '2020-01-01 11:00:00' OR action IN ('click', 'view')
+   row-size=28B cardinality=14
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+01:EXCHANGE [UNPARTITIONED]
+|
+00:SCAN HDFS [functional_parquet.iceberg_partitioned]
+   HDFS partitions=1/1 files=14 size=15.93KB
+   predicates: event_time = TIMESTAMP '2020-01-01 11:00:00' OR action IN ('click', 'view')
+   row-size=28B cardinality=14
+====
+select event_time, action from iceberg_partitioned where event_time='2020-01-01 11:00:00' or action > 'a';
+---- PLAN
+PLAN-ROOT SINK
+|
+00:SCAN HDFS [functional_parquet.iceberg_partitioned]
+   HDFS partitions=1/1 files=20 size=22.90KB
+   predicates: event_time = TIMESTAMP '2020-01-01 11:00:00' OR action > 'a'
+   row-size=28B cardinality=20
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+01:EXCHANGE [UNPARTITIONED]
+|
+00:SCAN HDFS [functional_parquet.iceberg_partitioned]
+   HDFS partitions=1/1 files=20 size=22.90KB
+   predicates: event_time = TIMESTAMP '2020-01-01 11:00:00' OR action > 'a'
+   row-size=28B cardinality=20
+====
+select event_time, action from iceberg_partitioned where event_time='2020-01-01 11:00:00';
+---- PLAN
+PLAN-ROOT SINK
+|
+00:SCAN HDFS [functional_parquet.iceberg_partitioned]
+   HDFS partitions=1/1 files=0 size=0B
+   predicates: event_time = TIMESTAMP '2020-01-01 11:00:00'
+   row-size=28B cardinality=0
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+01:EXCHANGE [UNPARTITIONED]
+|
+00:SCAN HDFS [functional_parquet.iceberg_partitioned]
+   HDFS partitions=1/1 files=0 size=0B
+   predicates: event_time = TIMESTAMP '2020-01-01 11:00:00'
+   row-size=28B cardinality=0
 ====
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/tablesample.test b/testdata/workloads/functional-planner/queries/PlannerTest/tablesample.test
index 56b9ae535..201f72371 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/tablesample.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/tablesample.test
@@ -263,7 +263,7 @@ PLAN-ROOT SINK
      columns: unavailable
    extrapolated-rows=disabled max-scan-range-rows=6
    mem-estimate=64.00MB mem-reservation=32.00KB thread-reservation=1
-   tuple-ids=0 row-size=44B cardinality=2
+   tuple-ids=0 row-size=44B cardinality=3
    in pipelines: 00(GETNEXT)
 ====
 # Sampling Iceberg tables. Count(*) is not optimized.
@@ -288,7 +288,7 @@ PLAN-ROOT SINK
      columns: all
    extrapolated-rows=disabled max-scan-range-rows=6
    mem-estimate=32.00MB mem-reservation=8.00KB thread-reservation=1
-   tuple-ids=0 row-size=0B cardinality=2
+   tuple-ids=0 row-size=0B cardinality=3
    in pipelines: 00(GETNEXT)
 ====
 # Sampling partitioned Iceberg tables.
@@ -331,7 +331,7 @@ PLAN-ROOT SINK
    parquet statistics predicates: action = 'click'
    parquet dictionary predicates: action = 'click'
    mem-estimate=64.00MB mem-reservation=32.00KB thread-reservation=1
-   tuple-ids=0 row-size=44B cardinality=1
+   tuple-ids=0 row-size=44B cardinality=4
    in pipelines: 00(GETNEXT)
 ====
 # Sampling Iceberg V2 tables. Delete files are not sampled, only the data files. So we
@@ -348,13 +348,13 @@ PLAN-ROOT SINK
 04:UNION
 |  pass-through-operands: all
 |  mem-estimate=0B mem-reservation=0B thread-reservation=0
-|  tuple-ids=0 row-size=36B cardinality=4.85K
+|  tuple-ids=0 row-size=36B cardinality=4
 |  in pipelines: 03(GETNEXT), 00(GETNEXT)
 |
 |--02:DELETE EVENTS HASH JOIN [LEFT ANTI JOIN]
 |  |  hash predicates: functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files.file__position = functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files-position-delete.pos, functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files.input__file__name = functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files-position-delete.file_path
 |  |  mem-estimate=1.94MB mem-reservation=1.94MB spill-buffer=64.00KB thread-reservation=0
-|  |  tuple-ids=0 row-size=36B cardinality=2.42K
+|  |  tuple-ids=0 row-size=36B cardinality=3
 |  |  in pipelines: 00(GETNEXT), 01(OPEN)
 |  |
 |  |--01:SCAN HDFS [functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files-POSITION-DELETE-01 functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files-position-delete]
@@ -374,7 +374,7 @@ PLAN-ROOT SINK
 |       columns missing stats: i, s
 |     extrapolated-rows=disabled max-scan-range-rows=10
 |     mem-estimate=64.00MB mem-reservation=32.00KB thread-reservation=1
-|     tuple-ids=0 row-size=36B cardinality=1
+|     tuple-ids=0 row-size=36B cardinality=3
 |     in pipelines: 00(GETNEXT)
 |
 03:SCAN HDFS [functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files]


[impala] 01/02: IMPALA-11339: Add Iceberg LOAD DATA INPATH statement

Posted by bo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

boroknagyz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 05a4b778d395c8813988610b78b71bcd920be037
Author: Tamas Mate <tm...@apache.org>
AuthorDate: Mon Nov 28 12:07:00 2022 +0100

    IMPALA-11339: Add Iceberg LOAD DATA INPATH statement
    
    Extend LOAD DATA INPATH statement to support Iceberg tables. Native
    parquet tables need Iceberg field ids, therefore to add files this
    change uses child queries to load and rewrite the data. The child
    queries create > insert > drop the temporary table over the specified
    directory.
    
    The create part depends on LIKE PARQUET/ORC clauses to infer the file
    format. This requires identifying a file in the directory and using that
    to create the temporary table.
    
    The target file or directory is moved to a staging directory before
    ingestion similar to native file formats. In case of a query failure the
    files are moved back to the original location. Child query executor will
    return the error message of the failing query and the child query
    profiles will be available through the WebUI.
    
    At this point the PARTITION clause it not supported because it would
    require analysis of the PartitionSpec (IMPALA-11750).
    
    Testing:
     - Added e2e tests
     - Added fe unit tests
    
    Change-Id: I8499945fa57ea0499f65b455976141dcd6d789eb
    Reviewed-on: http://gerrit.cloudera.org:8080/19145
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/service/client-request-state.cc             |  53 +++++++
 be/src/service/client-request-state.h              |   3 +
 common/thrift/Frontend.thrift                      |  23 +++
 .../org/apache/impala/analysis/LoadDataStmt.java   | 105 ++++++++++++--
 .../apache/impala/analysis/QueryStringBuilder.java | 155 +++++++++++++++++++++
 .../org/apache/impala/common/FileSystemUtil.java   |  22 ++-
 .../java/org/apache/impala/service/Frontend.java   |  50 ++++++-
 .../apache/impala/analysis/AnalyzeStmtsTest.java   |  16 +++
 .../queries/QueryTest/iceberg-load.test            | 142 +++++++++++++++++++
 tests/query_test/test_iceberg.py                   |  52 ++++++-
 10 files changed, 607 insertions(+), 14 deletions(-)

diff --git a/be/src/service/client-request-state.cc b/be/src/service/client-request-state.cc
index f03a454f1..a641600b9 100644
--- a/be/src/service/client-request-state.cc
+++ b/be/src/service/client-request-state.cc
@@ -789,6 +789,9 @@ void ClientRequestState::ExecLoadDataRequestImpl(bool exec_in_worker_thread) {
 
   TLoadDataResp response;
   Status status = frontend_->LoadData(exec_request_->load_data_request, &response);
+  if (exec_request_->load_data_request.iceberg_tbl) {
+    ExecLoadIcebergDataRequestImpl(response);
+  }
   {
     lock_guard<mutex> l(lock_);
     RETURN_VOID_IF_ERROR(UpdateQueryStatus(status));
@@ -838,6 +841,56 @@ void ClientRequestState::ExecLoadDataRequestImpl(bool exec_in_worker_thread) {
   }
 }
 
+void ClientRequestState::ExecLoadIcebergDataRequestImpl(TLoadDataResp response) {
+  TLoadDataReq load_data_req = exec_request_->load_data_request;
+  RuntimeProfile* child_profile =
+      RuntimeProfile::Create(&profile_pool_, "Child Queries");
+  profile_->AddChild(child_profile);
+  // Add child queries for computing table and column stats.
+  vector<ChildQuery> child_queries;
+  // Prepare CREATE
+  RuntimeProfile* create_profile =
+      RuntimeProfile::Create(&profile_pool_, "Create table query");
+  child_profile->AddChild(create_profile);
+  child_queries.emplace_back(response.create_tmp_tbl_query, this, parent_server_,
+      create_profile, &profile_pool_);
+  // Prepare INSERT
+  RuntimeProfile* insert_profile =
+      RuntimeProfile::Create(&profile_pool_, "Insert query");
+  child_profile->AddChild(insert_profile);
+  child_queries.emplace_back(load_data_req.insert_into_dst_tbl_query, this,
+      parent_server_, insert_profile, &profile_pool_);
+  // Prepare DROP
+  RuntimeProfile* drop_profile =
+      RuntimeProfile::Create(&profile_pool_, "Drop table query");
+  child_profile->AddChild(drop_profile);
+  child_queries.emplace_back(load_data_req.drop_tmp_tbl_query, this,
+      parent_server_, drop_profile, &profile_pool_);
+  // Execute queries
+  RETURN_VOID_IF_ERROR(child_query_executor_->ExecAsync(move(child_queries)));
+  vector<ChildQuery*>* completed_queries = new vector<ChildQuery*>();
+  Status query_status = child_query_executor_->WaitForAll(completed_queries);
+  hdfsFS fs = hdfsConnect("default", 0);
+  if (query_status.ok()) {
+    if (hdfsDelete(fs, response.create_location.c_str(), 1)) {
+      Status hdfs_ret("Failed to remove staging data under '" + response.create_location
+          + "' after query failure: " + query_status.msg().msg());
+      lock_guard<mutex> l(lock_);
+      RETURN_VOID_IF_ERROR(UpdateQueryStatus(hdfs_ret));
+    }
+  } else {
+    for (string path : response.loaded_files) {
+      if (hdfsMove(fs, path.c_str(), fs, load_data_req.source_path.c_str())) {
+        Status hdfs_ret("Failed to revert data movement, some files might still be in '"
+            + response.create_location + "' after query failure: "
+            + query_status.msg().msg());
+        lock_guard<mutex> l(lock_);
+        RETURN_VOID_IF_ERROR(UpdateQueryStatus(hdfs_ret));
+      }
+    }
+  }
+}
+
 
 Status ClientRequestState::ExecLoadDataRequest() {
   if (exec_request_->query_options.enable_async_load_data_execution) {
diff --git a/be/src/service/client-request-state.h b/be/src/service/client-request-state.h
index 69430df16..c0458c72b 100644
--- a/be/src/service/client-request-state.h
+++ b/be/src/service/client-request-state.h
@@ -716,6 +716,9 @@ class ClientRequestState {
   /// Core logic of executing a load data statement.
   void ExecLoadDataRequestImpl(bool exec_in_worker_thread);
 
+  /// Executes LOAD DATA related sub-queries for Iceberg tables.
+  void ExecLoadIcebergDataRequestImpl(TLoadDataResp response);
+
   /// Executes a shut down request.
   Status ExecShutdownRequest() WARN_UNUSED_RESULT;
 
diff --git a/common/thrift/Frontend.thrift b/common/thrift/Frontend.thrift
index 23ffd69ba..d4aa0104a 100644
--- a/common/thrift/Frontend.thrift
+++ b/common/thrift/Frontend.thrift
@@ -372,6 +372,22 @@ struct TLoadDataReq {
   // An optional partition spec. Set if this operation should apply to a specific
   // partition rather than the base table.
   4: optional list<CatalogObjects.TPartitionKeyValue> partition_spec
+
+  // True if the destination table is an Iceberg table, in this case we need to insert
+  // data to the Iceberg table based on the given files.
+  5: optional bool iceberg_tbl
+
+  // For Iceberg data load. Query template to create a temporary with table location
+  // pointing to the new files. The table location is unknown during planning, these are
+  // filled during execution.
+  6: optional string create_tmp_tbl_query_template
+
+  // For Iceberg data load. Query to insert into the destination table from the
+  // temporary table.
+  7: optional string insert_into_dst_tbl_query
+
+  // For Iceberg data load. Query to drop the temporary table.
+  8: optional string drop_tmp_tbl_query
 }
 
 // Response of a LOAD DATA statement.
@@ -385,6 +401,13 @@ struct TLoadDataResp {
 
   // This is needed to issue TUpdateCatalogRequest
   3: string partition_name = ""
+
+  // For Iceberg data load. The query template after the required fields are substituted.
+  4: optional string create_tmp_tbl_query
+
+  // For Iceberg data load. The temporary table location, used to restore data in case of
+  // query failure.
+  5: optional string create_location
 }
 
 enum TCatalogOpType {
diff --git a/fe/src/main/java/org/apache/impala/analysis/LoadDataStmt.java b/fe/src/main/java/org/apache/impala/analysis/LoadDataStmt.java
index 4ff13ec9e..d23d1951d 100644
--- a/fe/src/main/java/org/apache/impala/analysis/LoadDataStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/LoadDataStmt.java
@@ -20,18 +20,24 @@ package org.apache.impala.analysis;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.util.List;
+import java.util.UUID;
 
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.impala.authorization.Privilege;
 import org.apache.impala.catalog.FeFsTable;
+import org.apache.impala.catalog.FeIcebergTable;
 import org.apache.impala.catalog.FeTable;
 import org.apache.impala.common.AnalysisException;
 import org.apache.impala.common.FileSystemUtil;
 import org.apache.impala.thrift.TLoadDataReq;
 import org.apache.impala.thrift.TTableName;
 import org.apache.impala.util.FsPermissionChecker;
+import org.apache.orc.OrcFile;
+import org.apache.parquet.hadoop.ParquetFileWriter;
 
 import com.google.common.base.Preconditions;
 
@@ -48,6 +54,14 @@ import com.google.common.base.Preconditions;
  * (preserving the extension).
  * Loading hidden files is not supported and any hidden files in the source or
  * destination are preserved, even if OVERWRITE is true.
+ *
+ * Notes on Iceberg data loading:
+ * Iceberg files have specific field ids, therefore native parquet tables cannot be added
+ * to the data directory with only moving the files. To rewrite the files with the
+ * necessary metadata the LOAD DATA operation will be executed with 3 sub-queries:
+ *  1. CREATE temporary table
+ *  2. INSERT INTO from the temporary table to the target table
+ *  3. DROP temporary table
  */
 public class LoadDataStmt extends StatementBase {
   private final TableName tableName_;
@@ -57,6 +71,10 @@ public class LoadDataStmt extends StatementBase {
 
   // Set during analysis
   private String dbName_;
+  private FeTable table_;
+  private String createTmpTblQuery_;
+  private String insertTblQuery_;
+  private String dropTmpTblQuery_;
 
   public LoadDataStmt(TableName tableName, HdfsUri sourceDataPath, boolean overwrite,
       PartitionSpec partitionSpec) {
@@ -99,27 +117,35 @@ public class LoadDataStmt extends StatementBase {
   @Override
   public void analyze(Analyzer analyzer) throws AnalysisException {
     dbName_ = analyzer.getTargetDbName(tableName_);
-    FeTable table = analyzer.getTable(tableName_, Privilege.INSERT);
-    if (!(table instanceof FeFsTable)) {
+    table_ = analyzer.getTable(tableName_, Privilege.INSERT);
+    if (!(table_ instanceof FeFsTable)) {
       throw new AnalysisException("LOAD DATA only supported for HDFS tables: " +
           dbName_ + "." + getTbl());
     }
-    analyzer.checkTableCapability(table, Analyzer.OperationType.WRITE);
-    analyzer.ensureTableNotTransactional(table, "LOAD DATA");
+    analyzer.checkTableCapability(table_, Analyzer.OperationType.WRITE);
+    analyzer.ensureTableNotTransactional(table_, "LOAD DATA");
 
     // Analyze the partition spec, if one was specified.
     if (partitionSpec_ != null) {
-      partitionSpec_.setTableName(tableName_);
-      partitionSpec_.setPartitionShouldExist();
-      partitionSpec_.setPrivilegeRequirement(Privilege.INSERT);
-      partitionSpec_.analyze(analyzer);
+      if (table_ instanceof FeIcebergTable) {
+        throw new AnalysisException("PARTITION clause is not supported for Iceberg " +
+            "tables.");
+      } else {
+        partitionSpec_.setTableName(tableName_);
+        partitionSpec_.setPartitionShouldExist();
+        partitionSpec_.setPrivilegeRequirement(Privilege.INSERT);
+        partitionSpec_.analyze(analyzer);
+      }
     } else {
-      if (table.getMetaStoreTable().getPartitionKeysSize() > 0) {
+      if (table_.getMetaStoreTable().getPartitionKeysSize() > 0) {
         throw new AnalysisException("Table is partitioned but no partition spec was " +
             "specified: " + dbName_ + "." + getTbl());
       }
     }
-    analyzePaths(analyzer, (FeFsTable) table);
+    analyzePaths(analyzer, (FeFsTable) table_);
+    if (table_ instanceof FeIcebergTable) {
+      analyzeLoadIntoIcebergTable();
+    }
   }
 
   /**
@@ -207,6 +233,59 @@ public class LoadDataStmt extends StatementBase {
     }
   }
 
+  /**
+   * Creates the child queries that are used to load data into Iceberg tables:
+   *   1. a temporary table is created which points to the data directory
+   *   2. an insert query copies the data from the source table to the destination table
+   *   3. a drop table deletes the temporary table and the data
+   */
+  private void analyzeLoadIntoIcebergTable() throws AnalysisException {
+    Path sourcePath = sourceDataPath_.getPath();
+    String tmpTableName = dbName_ + "." + tableName_ + "_tmp" +
+        UUID.randomUUID().toString().substring(0, 8);
+    QueryStringBuilder.Create createTableQueryBuilder =
+        new QueryStringBuilder.Create().table(tmpTableName, true);
+    try {
+      FileSystem fs = sourcePath.getFileSystem(FileSystemUtil.getConfiguration());
+      Path filePathForLike = sourcePath;
+      // LIKE <file format> clause needs a file to infer schema, for directories: list
+      // the files under the directory and pick the first one.
+      if (fs.isDirectory(sourcePath)) {
+        RemoteIterator<? extends FileStatus> fileStatuses = FileSystemUtil.listFiles(fs,
+            sourcePath, true, "");
+        while (fileStatuses.hasNext()) {
+          FileStatus fileStatus = fileStatuses.next();
+          if (fileStatus.isFile()) {
+            filePathForLike = fileStatus.getPath();
+            break;
+          }
+        }
+      }
+      String magicString = FileSystemUtil.readMagicString(filePathForLike);
+      if (magicString.substring(0, 4).equals(ParquetFileWriter.MAGIC_STR)) {
+        createTableQueryBuilder.like("PARQUET", "%s").storedAs("PARQUET");
+      } else if (magicString.substring(0, 3).equals(OrcFile.MAGIC)) {
+        createTableQueryBuilder.like("ORC", "%s").storedAs("ORC");
+      } else {
+        throw new AnalysisException(String.format("INPATH contains unsupported LOAD "
+            + "format, file '%s' has '%s' magic string.", filePathForLike, magicString));
+      }
+      createTableQueryBuilder.tableLocation("%s");
+    } catch (IOException e) {
+      throw new AnalysisException("Failed to generate CREATE TABLE subquery "
+          + "statement. ", e);
+    }
+    createTmpTblQuery_ = createTableQueryBuilder.build();
+    QueryStringBuilder.Insert insertTblQueryBuilder =
+        new QueryStringBuilder.Insert().overwrite(overwrite_)
+        .table(tableName_.toString());
+    QueryStringBuilder.Select insertSelectTblQueryBuilder =
+        new QueryStringBuilder.Select().selectList("*").from(tmpTableName);
+    insertTblQueryBuilder.select(insertSelectTblQueryBuilder);
+    insertTblQuery_ = insertTblQueryBuilder.build();
+    dropTmpTblQuery_ = new QueryStringBuilder.Drop().table(tmpTableName).build();
+  }
+
   public TLoadDataReq toThrift() {
     TLoadDataReq loadDataReq = new TLoadDataReq();
     loadDataReq.setTable_name(new TTableName(getDb(), getTbl()));
@@ -215,6 +294,12 @@ public class LoadDataStmt extends StatementBase {
     if (partitionSpec_ != null) {
       loadDataReq.setPartition_spec(partitionSpec_.toThrift());
     }
+    if (table_ instanceof FeIcebergTable) {
+      loadDataReq.setIceberg_tbl(true);
+      loadDataReq.setCreate_tmp_tbl_query_template(createTmpTblQuery_);
+      loadDataReq.setInsert_into_dst_tbl_query(insertTblQuery_);
+      loadDataReq.setDrop_tmp_tbl_query(dropTmpTblQuery_);
+    }
     return loadDataReq;
   }
 }
diff --git a/fe/src/main/java/org/apache/impala/analysis/QueryStringBuilder.java b/fe/src/main/java/org/apache/impala/analysis/QueryStringBuilder.java
new file mode 100644
index 000000000..05131b69e
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/analysis/QueryStringBuilder.java
@@ -0,0 +1,155 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.analysis;
+
+/**
+ * This class standardizes the query string building process. At this point only used for
+ * child query creation for Iceberg LOAD DATA INPATH queries. Each inner class is
+ * responsible for a specific query type, while the outer class can be used to instantiate
+ * the inner classes. The methods of the inner classes are supposed to be chainable.
+ */
+public class QueryStringBuilder {
+
+  public static class Create {
+    private String tableName_;
+    private Boolean external_;
+    private Boolean like_;
+    private String likeFileFormat_;
+    private String likeLocation_;
+    private String storedAsFileFormat_;
+    private String tableLocation_;
+
+    public Create() {}
+
+    public Create table(String tableName, Boolean external) {
+      tableName_ = tableName;
+      external_ = external;
+      return this;
+    }
+
+    public Create like(String fileFormat, String location) {
+      like_ = true;
+      likeFileFormat_ = fileFormat.toUpperCase();
+      likeLocation_ = location;
+      return this;
+    }
+
+    public Create storedAs(String fileFormat) {
+      storedAsFileFormat_ = fileFormat.toUpperCase();
+      return this;
+    }
+
+    public Create tableLocation(String location) {
+      tableLocation_ = location;
+      return this;
+    }
+
+    public String build() {
+      StringBuilder builder = new StringBuilder();
+      if (!external_) {
+        builder.append("CREATE TABLE ");
+      } else {
+        builder.append("CREATE EXTERNAL TABLE ");
+      }
+      builder.append(tableName_ + " ");
+      if (like_) {
+        builder.append("LIKE " + likeFileFormat_ + " '" + likeLocation_ + "' ");
+      }
+      builder.append("STORED AS " + storedAsFileFormat_ + " ");
+      builder.append("LOCATION '" + tableLocation_ + "'");
+      return builder.toString();
+    }
+  }
+
+  public static class Insert {
+    private String tableName_;
+    private Boolean overwrite_;
+    private Select select_;
+
+    public Insert() {}
+
+    public Insert table(String tableName) {
+      tableName_ = tableName + " ";
+      return this;
+    }
+
+    public Insert overwrite(Boolean overwrite) {
+      overwrite_ = overwrite;
+      return this;
+    }
+
+    public Insert select(Select select) {
+      select_ = select;
+      return this;
+    }
+
+    public String build() {
+      StringBuilder builder = new StringBuilder();
+      if (overwrite_) {
+        builder.append("INSERT OVERWRITE ");
+      } else {
+        builder.append("INSERT INTO ");
+      }
+      builder.append(tableName_);
+      builder.append(select_.build());
+      return builder.toString();
+    }
+  }
+
+  public static class Select {
+    private String selectList_;
+    private String tableName_;
+
+    public Select() {}
+
+    public Select selectList(String selectList) {
+      selectList_ = selectList;
+      return this;
+    }
+
+    public Select from(String tableName) {
+      tableName_ = tableName;
+      return this;
+    }
+
+    public String build() {
+      StringBuilder builder = new StringBuilder();
+      builder.append("SELECT " + selectList_ + " ");
+      builder.append("FROM " + tableName_ + " ");
+      return builder.toString();
+    }
+  }
+
+  public static class Drop {
+    private String tableName_;
+
+    public Drop() {}
+
+    public Drop table(String tableName) {
+      tableName_ = tableName;
+      return this;
+    }
+
+    public String build() {
+      StringBuilder builder = new StringBuilder();
+      builder.append("DROP TABLE ");
+      builder.append(tableName_);
+      return builder.toString();
+    }
+  }
+}
diff --git a/fe/src/main/java/org/apache/impala/common/FileSystemUtil.java b/fe/src/main/java/org/apache/impala/common/FileSystemUtil.java
index 2bd64549a..88401d203 100644
--- a/fe/src/main/java/org/apache/impala/common/FileSystemUtil.java
+++ b/fe/src/main/java/org/apache/impala/common/FileSystemUtil.java
@@ -303,8 +303,9 @@ public class FileSystemUtil {
    * destination location. Instead, a UUID will be appended to the base file name,
    * preserving the existing file extension. If renameIfAlreadyExists is false, an
    * IOException will be thrown if there is a file name conflict.
+   * Returns the Path that points to the destination file.
    */
-  public static void relocateFile(Path sourceFile, Path dest,
+  public static Path relocateFile(Path sourceFile, Path dest,
       boolean renameIfAlreadyExists) throws IOException {
     FileSystem destFs = dest.getFileSystem(CONF);
 
@@ -338,7 +339,7 @@ public class FileSystemUtil {
         throw new IOException(String.format(
             "Failed to move '%s' to '%s'", sourceFile, destFile));
       }
-      return;
+      return destFile;
     }
     Preconditions.checkState(!doRename);
     if (destIsDfs && sameBucket) {
@@ -355,6 +356,7 @@ public class FileSystemUtil {
     }
     FileSystem sourceFs = sourceFile.getFileSystem(CONF);
     FileUtil.copy(sourceFs, sourceFile, destFs, destFile, true, true, CONF);
+    return destFile;
   }
 
   /**
@@ -370,6 +372,22 @@ public class FileSystemUtil {
     }
   }
 
+  /**
+   * Reads the first 4 bytes of the file and returns it as a string. It is used to
+   * identify the file format.
+   */
+  public static String readMagicString(Path file) throws IOException {
+    FileSystem fs = file.getFileSystem(CONF);
+    InputStream fileStream = fs.open(file);
+    byte[] buffer = new byte[4];
+    try {
+      IOUtils.read(fileStream, buffer, 0, 4);
+      return  new String(buffer);
+    } finally {
+      IOUtils.closeQuietly(fileStream);
+    }
+  }
+
   /**
    * Builds a new file name based on a base file name. This is done by inserting
    * the given appendStr into the base file name, preserving the file extension (if
diff --git a/fe/src/main/java/org/apache/impala/service/Frontend.java b/fe/src/main/java/org/apache/impala/service/Frontend.java
index 398f1e123..c28398e77 100644
--- a/fe/src/main/java/org/apache/impala/service/Frontend.java
+++ b/fe/src/main/java/org/apache/impala/service/Frontend.java
@@ -857,7 +857,10 @@ public class Frontend {
         String.format("load table data %s.%s", tableName.db_name, tableName.table_name));
     while (true) {
       try {
-        return doLoadTableData(request);
+        if (!request.iceberg_tbl)
+          return doLoadTableData(request);
+        else
+          return doLoadIcebergTableData(request);
       } catch(InconsistentMetadataFetchException e) {
         retries.handleRetryOrThrow(e);
       }
@@ -925,6 +928,51 @@ public class Frontend {
     return response;
   }
 
+  private TLoadDataResp doLoadIcebergTableData(TLoadDataReq request)
+      throws ImpalaException, IOException {
+    TLoadDataResp response = new TLoadDataResp();
+    TableName tableName = TableName.fromThrift(request.getTable_name());
+    FeCatalog catalog = getCatalog();
+    String destPathString = catalog.getTable(tableName.getDb(), tableName.getTbl())
+        .getMetaStoreTable().getSd().getLocation();
+    Path destPath = new Path(destPathString);
+    Path sourcePath = new Path(request.source_path);
+    FileSystem sourceFs = sourcePath.getFileSystem(FileSystemUtil.getConfiguration());
+
+    // Create a temporary directory within the final destination directory to stage the
+    // file move.
+    Path tmpDestPath = FileSystemUtil.makeTmpSubdirectory(destPath);
+
+    int numFilesLoaded = 0;
+    List<Path> filesLoaded = new ArrayList<>();
+    String destFile;
+    if (sourceFs.isDirectory(sourcePath)) {
+      numFilesLoaded = FileSystemUtil.relocateAllVisibleFiles(sourcePath,
+          tmpDestPath, filesLoaded);
+      destFile = filesLoaded.get(0).toString();
+    } else {
+      Path destFilePath = FileSystemUtil.relocateFile(sourcePath, tmpDestPath,
+          true);
+      filesLoaded.add(new Path(tmpDestPath.toString() + Path.SEPARATOR
+          + sourcePath.getName()));
+      numFilesLoaded = 1;
+      destFile = destFilePath.toString();
+    }
+
+    String createTmpTblQuery = String.format(request.create_tmp_tbl_query_template,
+        destFile, tmpDestPath.toString());
+
+    TColumnValue col = new TColumnValue();
+    String loadMsg = String.format("Loaded %d file(s).", numFilesLoaded);
+    col.setString_val(loadMsg);
+    response.setLoaded_files(filesLoaded.stream().map(Path::toString)
+        .collect(Collectors.toList()));
+    response.setLoad_summary(new TResultRow(Lists.newArrayList(col)));
+    response.setCreate_tmp_tbl_query(createTmpTblQuery);
+    response.setCreate_location(tmpDestPath.toString());
+    return response;
+  }
+
   /**
    * Parses and plans a query in order to generate its explain string. This method does
    * not increase the query id counter.
diff --git a/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java b/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java
index 9fbbd1d72..15cc8a77b 100644
--- a/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java
+++ b/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java
@@ -4025,6 +4025,22 @@ public class AnalyzeStmtsTest extends AnalyzerTest {
     }
   }
 
+  @Test
+  public void TestIcebergLoadData() throws AnalysisException {
+    AnalyzesOk("load data inpath "
+        + "'/test-warehouse/iceberg_test/iceberg_non_partitioned/data' into table "
+        + "functional_parquet.iceberg_non_partitioned");
+    AnalyzesOk("load data inpath "
+        + "'/test-warehouse/iceberg_test/iceberg_non_partitioned/data' overwrite into "
+        + "table functional_parquet.iceberg_non_partitioned");
+    AnalysisError("load data inpath "
+        + "'/test-warehouse/iceberg_test/iceberg_partitioned/data/"
+        + "event_time_hour=2020-01-01-08/action=view/' into table "
+        + "functional_parquet.iceberg_partitioned partition "
+        + "(event_time_hour='2020-01-01-08', action='view');", "PARTITION clause is not "
+        + "supported for Iceberg tables.");
+  }
+
   @Test
   public void TestInsert() throws AnalysisException {
     for (String qualifier: ImmutableList.of("INTO", "OVERWRITE")) {
diff --git a/testdata/workloads/functional-query/queries/QueryTest/iceberg-load.test b/testdata/workloads/functional-query/queries/QueryTest/iceberg-load.test
new file mode 100644
index 000000000..a0c87aa65
--- /dev/null
+++ b/testdata/workloads/functional-query/queries/QueryTest/iceberg-load.test
@@ -0,0 +1,142 @@
+====
+---- QUERY
+create table test_iceberg_load_parquet like iceberg_mixed_file_format_test
+stored as iceberg;
+====
+---- QUERY
+# Test 1-2: Recovery from child query failure, first file then directory location
+set mem_limit=1;
+load data inpath '/tmp/$DATABASE/parquet/00000-0-data-gfurnstahl_20220906113044_157fc172-f5d3-4c70-8653-fff150b6136a-job_16619542960420_0002-1-00001.parquet'
+into table test_iceberg_load_parquet;
+---- CATCH
+minimum memory reservation is greater than memory available to the query for buffer reservations
+====
+---- QUERY
+select count(*) from test_iceberg_load_parquet;
+---- RESULTS
+0
+---- TYPES
+BIGINT
+====
+---- QUERY
+set mem_limit=1;
+load data inpath '/tmp/$DATABASE/parquet/' into table test_iceberg_load_parquet;
+---- CATCH
+minimum memory reservation is greater than memory available to the query for buffer reservations
+====
+---- QUERY
+select count(*) from test_iceberg_load_parquet;
+---- RESULTS
+0
+---- TYPES
+BIGINT
+====
+---- QUERY
+# Test 3-4: Load a parquet file then a directory into test table
+load data inpath '/tmp/$DATABASE/parquet/00000-0-data-gfurnstahl_20220906113044_157fc172-f5d3-4c70-8653-fff150b6136a-job_16619542960420_0002-1-00001.parquet'
+into table test_iceberg_load_parquet;
+---- RESULTS
+'Loaded 1 file(s).'
+---- TYPES
+STRING
+====
+---- QUERY
+select count(*) from test_iceberg_load_parquet;
+---- RESULTS
+1
+---- TYPES
+BIGINT
+====
+---- QUERY
+load data inpath '/tmp/$DATABASE/parquet/' into table test_iceberg_load_parquet;
+---- RESULTS
+'Loaded 1 file(s).'
+---- TYPES
+STRING
+====
+---- QUERY
+select count(*) from test_iceberg_load_parquet;
+---- RESULTS
+2
+---- TYPES
+BIGINT
+====
+---- QUERY
+# Test 5-6: Load an orc file then a directory into test table
+create table test_iceberg_load_orc like iceberg_mixed_file_format_test
+stored as iceberg;
+====
+---- QUERY
+load data inpath '/tmp/$DATABASE/orc/00000-0-data-gfurnstahl_20220906113255_8d49367d-e338-4996-ade5-ee500a19c1d1-job_16619542960420_0003-1-00001.orc'
+into table test_iceberg_load_orc;
+---- RESULTS
+'Loaded 1 file(s).'
+---- TYPES
+STRING
+====
+---- QUERY
+select count(*) from test_iceberg_load_orc;
+---- RESULTS
+1
+---- TYPES
+BIGINT
+====
+---- QUERY
+load data inpath '/tmp/$DATABASE/orc/' into table test_iceberg_load_orc;
+---- RESULTS
+'Loaded 1 file(s).'
+---- TYPES
+STRING
+====
+---- QUERY
+select count(*) from test_iceberg_load_orc;
+---- RESULTS
+2
+---- TYPES
+BIGINT
+====
+---- QUERY
+# Test 7: Overwrite the existing table from Test 1
+load data inpath '/tmp/$DATABASE/overwrite/' overwrite into table test_iceberg_load_parquet;
+---- RESULTS
+'Loaded 1 file(s).'
+---- TYPES
+STRING
+====
+---- QUERY
+select count(*) from test_iceberg_load_parquet;
+---- RESULTS
+1
+---- TYPES
+BIGINT
+====
+---- QUERY
+# Test 8: Mismatching target table and file schema
+create table test_iceberg_load_schema_mismatch (i int)
+stored as iceberg;
+====
+---- QUERY
+load data inpath '/tmp/$DATABASE/mismatching_schema/' overwrite into table test_iceberg_load_schema_mismatch;
+---- CATCH
+AnalysisException: Target table 'test_load_a61184e9.test_iceberg_load_schema_mismatch' has fewer columns (1) than the SELECT / VALUES clause returns (4)
+====
+---- QUERY
+# Test 9: Partitioned Iceberg table
+---- QUERY
+create table test_iceberg_load_partitioned like functional_parquet.iceberg_partitioned
+stored as iceberg;
+====
+---- QUERY
+load data inpath '/tmp/$DATABASE/partitioned/' overwrite into table test_iceberg_load_partitioned;
+---- RESULTS
+'Loaded 1 file(s).'
+---- TYPES
+STRING
+====
+---- QUERY
+select count(*) from test_iceberg_load_partitioned;
+---- RESULTS
+1
+---- TYPES
+BIGINT
+====
\ No newline at end of file
diff --git a/tests/query_test/test_iceberg.py b/tests/query_test/test_iceberg.py
index 07e3f309c..38b246637 100644
--- a/tests/query_test/test_iceberg.py
+++ b/tests/query_test/test_iceberg.py
@@ -31,7 +31,7 @@ import json
 
 from tests.beeswax.impala_beeswax import ImpalaBeeswaxException
 from tests.common.iceberg_test_suite import IcebergTestSuite
-from tests.common.skip import SkipIf, SkipIfDockerizedCluster
+from tests.common.skip import SkipIf, SkipIfDockerizedCluster, SkipIfLocal
 from tests.common.file_utils import (
   create_iceberg_table_from_directory,
   create_table_from_parquet)
@@ -792,6 +792,56 @@ class TestIcebergTable(IcebergTestSuite):
     self.run_test_case('QueryTest/iceberg-mixed-file-format', vector,
                       unique_database)
 
+  @SkipIfLocal.hdfs_client
+  def test_load(self, vector, unique_database):
+    """Test LOAD DATA INPATH for Iceberg tables, the first part of this method inits the
+    target directory, copies existing test data to HDFS. The second part runs the test
+    cases then cleans up the test directory.
+    """
+    # Test 1-6 init: target orc/parquet file and directory
+    SRC_DIR = os.path.join(os.environ['IMPALA_HOME'],
+        "testdata/data/iceberg_test/iceberg_mixed_file_format_test/data/{0}")
+    DST_DIR = "/tmp/" + unique_database + "/parquet/"
+    self.hdfs_client.make_dir(DST_DIR, permission=777)
+    file_parq1 = "00000-0-data-gfurnstahl_20220906113044_157fc172-f5d3-4c70-8653-" \
+        "fff150b6136a-job_16619542960420_0002-1-00001.parquet"
+    file_parq2 = "00000-0-data-gfurnstahl_20220906114830_907f72c7-36ac-4135-8315-" \
+        "27ff880faff0-job_16619542960420_0004-1-00001.parquet"
+    self.hdfs_client.copy_from_local(SRC_DIR.format(file_parq1), DST_DIR)
+    self.hdfs_client.copy_from_local(SRC_DIR.format(file_parq2), DST_DIR)
+    DST_DIR = "/tmp/" + unique_database + "/orc/"
+    self.hdfs_client.make_dir(DST_DIR, permission=777)
+    file_orc1 = "00000-0-data-gfurnstahl_20220906113255_8d49367d-e338-4996-ade5-" \
+        "ee500a19c1d1-job_16619542960420_0003-1-00001.orc"
+    file_orc2 = "00000-0-data-gfurnstahl_20220906114900_9c1b7b46-5643-428f-a007-" \
+        "519c5500ed04-job_16619542960420_0004-1-00001.orc"
+    self.hdfs_client.copy_from_local(SRC_DIR.format(file_orc1), DST_DIR)
+    self.hdfs_client.copy_from_local(SRC_DIR.format(file_orc2), DST_DIR)
+    # Test 7 init: overwrite
+    DST_DIR = "/tmp/" + unique_database + "/overwrite/"
+    self.hdfs_client.make_dir(DST_DIR, permission=777)
+    self.hdfs_client.copy_from_local(SRC_DIR.format(file_parq1), DST_DIR)
+    # Test 8 init: mismatching parquet schema format
+    SRC_DIR = os.path.join(os.environ['IMPALA_HOME'], "testdata/data/iceberg_test/"
+        "iceberg_partitioned/data/event_time_hour=2020-01-01-08/action=view/{0}")
+    DST_DIR = "/tmp/" + unique_database + "/mismatching_schema/"
+    self.hdfs_client.make_dir(DST_DIR, permission=777)
+    file = "00001-1-b975a171-0911-47c2-90c8-300f23c28772-00000.parquet"
+    self.hdfs_client.copy_from_local(SRC_DIR.format(file), DST_DIR)
+    # Test 9 init: partitioned
+    DST_DIR = "/tmp/" + unique_database + "/partitioned/"
+    self.hdfs_client.make_dir(DST_DIR, permission=777)
+    self.hdfs_client.copy_from_local(SRC_DIR.format(file), DST_DIR)
+
+    # Init test table
+    create_iceberg_table_from_directory(self.client, unique_database,
+        "iceberg_mixed_file_format_test", "parquet")
+
+    # Execute tests
+    self.run_test_case('QueryTest/iceberg-load', vector, use_db=unique_database)
+    # Clean up temporary directory
+    self.hdfs_client.delete_file_dir("/tmp/{0}".format(unique_database), True)
+
   def test_table_sampling(self, vector):
     self.run_test_case('QueryTest/iceberg-tablesample', vector,
         use_db="functional_parquet")